ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.112 by jsr166, Sat May 3 21:05:50 2014 UTC vs.
Revision 1.113 by dl, Sat May 24 16:51:14 2014 UTC

# Line 93 | Line 93 | public class CompletableFuture<T> implem
93       * AltResult is used to box null as a result, as well as to hold
94       * exceptions.  Using a single field makes completion simple to
95       * detect and trigger.  Encoding and decoding is straightforward
96 <     * but adds vertical sprawl. One minor simplification relies on
97 <     * the (static) NIL (to box null results) being the only AltResult
98 <     * with a null exception field, so we don't usually need explicit
99 <     * comparisons with NIL.  Exception propagation mechanics
100 <     * surrounding decoding rely on unchecked casts of decoded results
101 <     * really being unchecked, and user type errors being caught at
102 <     * point of use, as is currently the case in Java. These are
103 <     * highlighted by using SuppressWarnings annotated temporaries.
96 >     * but adds to the sprawl of trapping and associating exceptions
97 >     * with targets.  Minor simplifications rely on (static) NIL (to
98 >     * box null results) being the only AltResult with a null
99 >     * exception field, so we don't usually need explicit comparisons.
100 >     * Even though some of the generics casts are unchecked (see
101 >     * SuppressWarnings annotations), they are placed to be
102 >     * appropriate even if checked.
103       *
104       * Dependent actions are represented by Completion objects linked
105 <     * as Treiber stacks headed by field completions. There are four
106 <     * kinds of Completions: single-source (UniCompletion), two-source
107 <     * (BiCompletion), shared (CoCompletion, used by the second
108 <     * source of a BiCompletion), and Signallers that unblock waiters.
109 <     *
110 <     * The same patterns of methods and classes are used for each form
111 <     * of Completion (apply, combine, etc), and are written in a
112 <     * similar style.  For each form X there is, when applicable:
113 <     *
114 <     * * Method nowX (for example nowApply) that immediately executes
115 <     *   a supplied function and sets result
116 <     * * Class AsyncX class (for example AsyncApply) that calls nowX
117 <     *   from another task,
118 <     * * Class DelayedX (for example DelayedApply) that holds
119 <     *   arguments and calls nowX when ready.
120 <     *
121 <     * For each public CompletionStage method M* (for example
122 <     * thenApply{Async}), there is a method doM (for example
123 <     * doThenApply) that creates and/or invokes the appropriate form.
124 <     * Each deals with three cases that can arise when adding a
125 <     * dependent completion to CompletableFuture f:
126 <     *
127 <     * * f is already complete, so the dependent action is run
128 <     *   immediately, via  a "now" method, which, if async,
129 <     *   starts the action in a new task.
130 <     * * f is not complete, so a Completion action is created and
131 <     *   pushed to f's completions. It is triggered via
132 <     *   f.postComplete when f completes.
133 <     * * f is not complete, but completes while adding the completion
134 <     *   action, so we try to trigger it upon adding (see method
135 <     *   unipush and derivatives) to cover races.
136 <     *
137 <     * Methods with two sources (for example thenCombine) must deal
138 <     * with races across both while pushing actions.  The second
139 <     * completion is a CoCompletion pointing to the first, shared to
140 <     * ensure that at most one claims and performs the action.  The
141 <     * multiple-arity method allOf does this pairwise to form a tree
142 <     * of completions. (Method anyOf just uses a depth-one Or tree.)
143 <     *
144 <     * Upon setting results, method postComplete is called unless
145 <     * the target is guaranteed not to be observable (i.e., not yet
146 <     * returned or linked). Multiple threads can call postComplete,
147 <     * which atomically pops each dependent action, and tries to
148 <     * trigger it via method tryAct. Any such action must be performed
149 <     * only once, even if called from several threads, so Completions
150 <     * maintain status via CAS, and on success run one of the "now"
151 <     * methods.  Triggering can propagate recursively, so tryAct
152 <     * returns a completed dependent (if one exists) for further
153 <     * processing by its caller.
105 >     * as Treiber stacks headed by field "stack". There are Completion
106 >     * classes for each kind of action, grouped into single-input
107 >     * (UniCompletion), two-input (BiCompletion), projected
108 >     * (BiCompletions using either (not both) of two inputs), shared
109 >     * (CoCompletion, used by the second of two sources), zero-input
110 >     * source actions, and Signallers that unblock waiters. Class
111 >     * Completion extends ForkJoinTask to enable async execution
112 >     * (adding no space overhead because we exploit its "tag" methods
113 >     * to maintain claims). It is also declared as Runnable to allow
114 >     * usage with arbitrary executors.
115 >     *
116 >     * Support for each kind of CompletionStage relies on a separate
117 >     * class, along with two CompletableFuture methods:
118 >     *
119 >     * * A Completion class with name X corresponding to function,
120 >     *   prefaced with "Uni", "Bi", or "Or". Each class contains
121 >     *   fields for source(s), actions, and dependent. They are
122 >     *   boringly similar, differing from others only with respect to
123 >     *   underlying functional forms. We do this so that users don't
124 >     *   encounter layers of adaptors in common usages. We also
125 >     *   include "Relay" classes/methods that don't correspond to user
126 >     *   methods; they copy results from one stage to another.
127 >     *
128 >     * * Boolean CompletableFuture method x(...) (for example
129 >     *   uniApply) takes all of the arguments needed to check that an
130 >     *   action is triggerable, and then either runs the action or
131 >     *   arranges its async execution by executing its Completion
132 >     *   argument, if present. The method returns true if known to be
133 >     *   complete.
134 >     *
135 >     * * Completion method tryFire(int mode) invokes the associated x
136 >     *   method with its held arguments, and on success cleans up.
137 >     *   The mode argument allows exec to be called twice (SYNC, then
138 >     *   ASYNC); the first to screen and trap exceptions while
139 >     *   arranging to execute, and the second when called from a
140 >     *   task. (A few classes are not used async so take slightly
141 >     *   different forms.)  The claim() callback suppresses function
142 >     *   invocation if already claimed by another thread.
143 >     *
144 >     * * CompletableFuture method xStage(...) is called from a public
145 >     *   stage method of CompletableFuture x. It screens user
146 >     *   arguments and invokes and/or creates the stage object.  If
147 >     *   not async and x is already complete, the action is run
148 >     *   immediately.  Otherwise a Completion c is created, pushed to
149 >     *   x's stack (unless done), and started or triggered via
150 >     *   c.tryFire.  This also covers races possible if x completes
151 >     *   while pushing.  Classes with two inputs (for example BiApply)
152 >     *   deal with races across both while pushing actions.  The
153 >     *   second completion is a CoCompletion pointing to the first,
154 >     *   shared so that at most one performs the action.  The
155 >     *   multiple-arity methods allOf and anyOf do this pairwise to
156 >     *   form trees of completions.
157 >     *
158 >     * Note that the generic type parameters of methods vary according
159 >     * to whether "this" is a source, dependent, or completion.
160 >     *
161 >     * Method postComplete is called upon completion unless the target
162 >     * is guaranteed not to be observable (i.e., not yet returned or
163 >     * linked). Multiple threads can call postComplete, which
164 >     * atomically pops each dependent action, and tries to trigger it
165 >     * via method exec. Triggering can propagate recursively, so exec
166 >     * in NESTED mode returns its completed dependent (if one exists)
167 >     * for further processing by its caller (see method postFire).
168       *
169       * Blocking methods get() and join() rely on Signaller Completions
170       * that wake up waiting threads.  The mechanics are similar to
# Line 160 | Line 173 | public class CompletableFuture<T> implem
173       * algorithmic details.
174       *
175       * Without precautions, CompletableFutures would be prone to
176 <     * garbage accumulation as chains of completions build up, each
177 <     * pointing back to its sources. So we detach (null out) most
178 <     * Completion fields as soon as possible.  To support this,
179 <     * internal methods check for and harmlessly ignore null arguments
176 >     * garbage accumulation as chains of Completions build up, each
177 >     * pointing back to its sources. So we null out fields as soon as
178 >     * possible (see especially method Completion.detach). The
179 >     * screening checks needed anyway harmlessly ignore null arguments
180       * that may have been obtained during races with threads nulling
181 <     * out fields. (Some of these checked cases cannot currently
182 <     * happen.)  Fields of Async classes can be but currently are not
183 <     * fully detached, because they do not in general form cycles.
181 >     * out fields.  We also try to unlink fired Completions from
182 >     * stacks that might never be popped (see method postFire).
183 >     * Completion fields need not be declared as final or volatile
184 >     * because they are only visible to other threads upon safe
185 >     * publication.
186       */
187  
188 <    volatile Object result;             // Either the result or boxed AltResult
189 <    volatile Completion completions;    // Treiber stack of dependent actions
188 >    volatile Object result;       // Either the result or boxed AltResult
189 >    volatile Completion stack;    // Top of Treiber stack of dependent actions
190  
191      final boolean internalComplete(Object r) { // CAS from null to r
192          return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
193      }
194  
195 <    final boolean casCompletions(Completion cmp, Completion val) {
196 <        return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
195 >    final boolean casStack(Completion cmp, Completion val) {
196 >        return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
197      }
198  
199      /* ------------- Encoding and decoding outcomes -------------- */
# Line 194 | Line 209 | public class CompletableFuture<T> implem
209       * Returns the encoding of the given (non-null) exception as a
210       * wrapped CompletionException unless it is one already.
211       */
212 <    static AltResult altThrowable(Throwable x) {
212 >    static AltResult encodeThrowable(Throwable x) {
213          return new AltResult((x instanceof CompletionException) ? x :
214                               new CompletionException(x));
215      }
216  
217      /**
218       * Returns the encoding of the given arguments: if the exception
219 <     * is non-null, encodes as altThrowable.  Otherwise uses the given
219 >     * is non-null, encodes as AltResult.  Otherwise uses the given
220       * value, boxed as NIL if null.
221       */
222      static Object encodeOutcome(Object v, Throwable x) {
223 <        return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
223 >        return (x == null) ? (v == null) ? NIL : v : encodeThrowable(x);
224      }
225  
226      /**
227 <     * Decodes outcome to return result or throw unchecked exception.
227 >     * Returns the encoding of a copied outcome, If exceptional,
228 >     * rewraps as a CompletionException, else returns argument.
229       */
230 <    private static <T> T reportJoin(Object r) {
231 <        if (r instanceof AltResult) {
232 <            Throwable x;
233 <            if ((x = ((AltResult)r).ex) == null)
234 <                return null;
235 <            if (x instanceof CancellationException)
220 <                throw (CancellationException)x;
221 <            if (x instanceof CompletionException)
222 <                throw (CompletionException)x;
223 <            throw new CompletionException(x);
224 <        }
225 <        @SuppressWarnings("unchecked") T tr = (T) r;
226 <        return tr;
230 >    static Object encodeRelay(Object r) {
231 >        Throwable x;
232 >        return (((r instanceof AltResult) &&
233 >                 (x = ((AltResult)r).ex) != null &&
234 >                 !(x instanceof CompletionException)) ?
235 >                new AltResult(new CompletionException(x)) : r);
236      }
237  
238      /**
# Line 244 | Line 253 | public class CompletableFuture<T> implem
253                  x = cause;
254              throw new ExecutionException(x);
255          }
256 <        @SuppressWarnings("unchecked") T tr = (T) r;
257 <        return tr;
256 >        @SuppressWarnings("unchecked") T t = (T) r;
257 >        return t;
258 >    }
259 >
260 >    /**
261 >     * Decodes outcome to return result or throw unchecked exception.
262 >     */
263 >    private static <T> T reportJoin(Object r) {
264 >        if (r instanceof AltResult) {
265 >            Throwable x;
266 >            if ((x = ((AltResult)r).ex) == null)
267 >                return null;
268 >            if (x instanceof CancellationException)
269 >                throw (CancellationException)x;
270 >            if (x instanceof CompletionException)
271 >                throw (CompletionException)x;
272 >            throw new CompletionException(x);
273 >        }
274 >        @SuppressWarnings("unchecked") T t = (T) r;
275 >        return t;
276      }
277  
278 <    /* ------------- Async Tasks -------------- */
278 >    /* ------------- Async task preliminaries  -------------- */
279  
280      /**
281       * A marker interface identifying asynchronous tasks produced by
# Line 261 | Line 288 | public class CompletableFuture<T> implem
288      }
289  
290      /**
264     * Base class for tasks that can act as either FJ or plain
265     * Runnables. Abstract method compute calls an associated "now"
266     * method.  Method exec calls compute if its CompletableFuture is
267     * not already done, and runs completions if done. Fields are not
268     * in general final and can be nulled out after use (but most
269     * currently are not).  Classes include serialVersionUIDs even
270     * though they are currently never serialized.
271     */
272    abstract static class Async<T> extends ForkJoinTask<Void>
273        implements Runnable, AsynchronousCompletionTask {
274        CompletableFuture<T> dep; // the CompletableFuture to trigger
275        Async(CompletableFuture<T> dep) { this.dep = dep; }
276
277        abstract void compute(); // call the associated "now" method
278
279        public final boolean exec() {
280            CompletableFuture<T> d;
281            if ((d = dep) != null) {
282                if (d.result == null) // suppress if cancelled
283                    compute();
284                if (d.result != null)
285                    d.postComplete();
286                dep = null; // detach
287            }
288            return true;
289        }
290        public final Void getRawResult() { return null; }
291        public final void setRawResult(Void v) { }
292        public final void run() { exec(); }
293        private static final long serialVersionUID = 5232453952276885070L;
294    }
295
296    /**
291       * Default executor -- ForkJoinPool.commonPool() unless it cannot
292       * support parallelism.
293       */
# Line 315 | Line 309 | public class CompletableFuture<T> implem
309          return (e == ForkJoinPool.commonPool()) ? asyncPool : e;
310      }
311  
312 <    /* ------------- Completions -------------- */
312 >    // Modes for Completion.exec. Signedness matters.
313 >    static final int SYNC   =  0;
314 >    static final int ASYNC  =  1;
315 >    static final int NESTED = -1;
316 >
317 >    /* ------------- Base Completion classes and operations -------------- */
318  
319 <    abstract static class Completion { // See above
319 >    @SuppressWarnings("serial")
320 >    abstract static class Completion extends ForkJoinTask<Void>
321 >        implements Runnable, AsynchronousCompletionTask {
322          volatile Completion next;      // Treiber stack link
323  
324          /**
325 <         * Performs completion action if enabled, returning a
326 <         * completed dependent CompletableFuture, if one exists.
325 >         * Performs completion action if triggered, returning a
326 >         * dependent that may need propagation, if one exists.
327 >         *
328 >         * @param mode SYNC, ASYNC, or NESTED
329           */
330 <        abstract CompletableFuture<?> tryAct();
330 >        abstract CompletableFuture<?> tryFire(int mode);
331 >
332 >        /** Return true if possibly still triggerable. Used by cleanStack. */
333 >        abstract boolean isLive();
334 >
335 >        public final void run()                { tryFire(ASYNC); }
336 >        public final boolean exec()            { tryFire(ASYNC); return true; }
337 >        public final Void getRawResult()       { return null; }
338 >        public final void setRawResult(Void v) {}
339      }
340  
341      /**
342 <     * Triggers all reachable enabled dependents.  Call only when
343 <     * known to be done.
342 >     * Pops and tries to trigger all reachable dependents.  Call only
343 >     * when known to be done.
344       */
345      final void postComplete() {
346          /*
347 <         * On each step, variable f holds current completions to pop
347 >         * On each step, variable f holds current dependents to pop
348           * and run.  It is extended along only one path at a time,
349 <         * pushing others to avoid StackOverflowErrors on recursion.
349 >         * pushing others to avoid unbounded recursion.
350           */
351          CompletableFuture<?> f = this; Completion h;
352 <        while ((h = f.completions) != null ||
353 <               (f != this && (h = (f = this).completions) != null)) {
352 >        while ((h = f.stack) != null ||
353 >               (f != this && (h = (f = this).stack) != null)) {
354              CompletableFuture<?> d; Completion t;
355 <            if (f.casCompletions(h, t = h.next)) {
355 >            if (f.casStack(h, t = h.next)) {
356                  if (t != null) {
357                      if (f != this) {  // push
358 <                        do {} while (!casCompletions(h.next = completions, h));
358 >                        do {} while (!casStack(h.next = stack, h));
359                          continue;
360                      }
361                      h.next = null;    // detach
362                  }
363 <                f = (d = h.tryAct()) == null ? this : d;
363 >                f = (d = h.tryFire(NESTED)) == null ? this : d;
364              }
365          }
366      }
367  
368 <    /* ------------- One-source Completions -------------- */
368 >    /** Traverses stack and unlinks dead Completions. */
369 >    final void cleanStack() {
370 >        for (Completion p = null, q = stack; q != null;) {
371 >            Completion s = q.next;
372 >            if (q.isLive()) {
373 >                p = q;
374 >                q = s;
375 >            }
376 >            else if (p == null) {
377 >                casStack(q, s);
378 >                q = stack;
379 >            }
380 >            else {
381 >                p.next = s;
382 >                if (p.isLive())
383 >                    q = s;
384 >                else {
385 >                    p = null;  // restart
386 >                    q = stack;
387 >                }
388 >            }
389 >        }
390 >    }
391  
392 <    /**
393 <     * A Completion with a source and dependent.  The "dep" field acts
394 <     * as a claim, nulled out to disable further attempts to
395 <     * trigger. Fields can only be observed by other threads upon
363 <     * successful push; and should be nulled out after claim.
364 <     */
392 >    /* ------------- One-input Completions -------------- */
393 >
394 >    /** A Completion with a source, dependent, and executor. */
395 >    @SuppressWarnings("serial")
396      abstract static class UniCompletion<T> extends Completion {
397 <        Executor async;                    // executor to use (null if none)
397 >        Executor executor;                 // executor to use (null if none)
398          CompletableFuture<T> dep;          // the dependent to complete
399 <        CompletableFuture<?> src;          // source of value for tryAct
399 >        CompletableFuture<?> src;          // source for action
400  
401 <        UniCompletion(Executor async, CompletableFuture<T> dep,
401 >        UniCompletion(Executor executor, CompletableFuture<T> dep,
402                        CompletableFuture<?> src) {
403 <            this.async = async; this.dep = dep; this.src = src;
373 <        }
374 <
375 <        /** Tries to claim completion action by CASing dep to null */
376 <        final boolean claim(CompletableFuture<T> d) {
377 <            return UNSAFE.compareAndSwapObject(this, DEP, d, null);
403 >            this.executor = executor; this.dep = dep; this.src = src;
404          }
405  
406 <        private static final sun.misc.Unsafe UNSAFE;
407 <        private static final long DEP;
408 <        static {
409 <            try {
410 <                UNSAFE = sun.misc.Unsafe.getUnsafe();
411 <                Class<?> k = UniCompletion.class;
412 <                DEP = UNSAFE.objectFieldOffset
413 <                    (k.getDeclaredField("dep"));
414 <            } catch (Exception x) {
415 <                throw new Error(x);
406 >        /**
407 >         * Returns true if action can be run. Call only when known to
408 >         * be triggerable. Uses FJ tag bit to ensure that only one
409 >         * thread claims ownership.  If async, starts as task -- a
410 >         * later call to tryFire will run action.
411 >         */
412 >        final boolean claim() {
413 >            Executor e = executor;
414 >            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
415 >                if (e == null)
416 >                    return true;
417 >                executor = null; // disable
418 >                e.execute(this);
419              }
420 +            return false;
421          }
422 +
423 +        final boolean isLive() { return dep != null; }
424      }
425  
426 <    /** Pushes c on to completions, and triggers c if done. */
427 <    private void unipush(Completion c) {
426 >    /** Pushes the given completion (if it exists) unless done. */
427 >    final void push(UniCompletion<?> c) {
428          if (c != null) {
429 <            CompletableFuture<?> d;
430 <            while (result == null && !casCompletions(c.next = completions, c))
399 <                c.next = null;            // clear on CAS failure
400 <            if ((d = c.tryAct()) != null) // cover races
401 <                d.postComplete();
402 <            if (result != null)           // clean stack
403 <                postComplete();
429 >            while (result == null && !casStack(c.next = stack, c))
430 >                c.next = null; // clear on failure
431          }
432      }
433  
434 <    // Immediate, async, delayed, and routing support for Function/apply
435 <
436 <    static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
437 <                               Function<? super T,? extends U> f) {
438 <        if (d != null && f != null) {
439 <            T t; U u; Throwable x;
440 <            if (r instanceof AltResult) {
441 <                t = null;
442 <                x = ((AltResult)r).ex;
416 <            }
417 <            else {
418 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
419 <                x = null;
420 <            }
421 <            if (x == null) {
422 <                try {
423 <                    if (e != null) {
424 <                        e.execute(new AsyncApply<T,U>(d, t, f));
425 <                        return;
426 <                    }
427 <                    u = f.apply(t);
428 <                } catch (Throwable ex) {
429 <                    x = ex;
430 <                    u = null;
431 <                }
432 <            }
434 >    /**
435 >     * Post-processing by dependent after successful UniCompletion
436 >     * tryFire.  Tries to clean stack of source a, and then either runs
437 >     * postComplete or returns this to caller, depending on mode.
438 >     */
439 >    final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
440 >        if (a != null && a.stack != null) {
441 >            if (mode < 0 || a.result == null)
442 >                a.cleanStack();
443              else
444 <                u = null;
435 <            d.internalComplete(encodeOutcome(u, x));
444 >                a.postComplete();
445          }
446 <    }
447 <
448 <    static final class AsyncApply<T,U> extends Async<U> {
449 <        T arg;  Function<? super T,? extends U> fn;
450 <        AsyncApply(CompletableFuture<U> dep, T arg,
442 <                   Function<? super T,? extends U> fn) {
443 <            super(dep); this.arg = arg; this.fn = fn;
446 >        if (result != null && stack != null) {
447 >            if (mode < 0)
448 >                return this;
449 >            else
450 >                postComplete();
451          }
452 <        final void compute() { nowApply(null, dep, arg, fn); }
446 <        private static final long serialVersionUID = 5232453952276885070L;
452 >        return null;
453      }
454  
455 <    static final class DelayedApply<T,U> extends UniCompletion<U> {
455 >    @SuppressWarnings("serial")
456 >    static final class UniApply<T,U> extends UniCompletion<U> {
457          Function<? super T,? extends U> fn;
458 <        DelayedApply(Executor async, CompletableFuture<U> dep,
459 <                     CompletableFuture<?> src,
460 <                     Function<? super T,? extends U> fn) {
461 <            super(async, dep, src); this.fn = fn;
462 <        }
463 <        final CompletableFuture<?> tryAct() {
464 <            CompletableFuture<U> d; CompletableFuture<?> a; Object r;
465 <            if ((d = dep) != null && (a = src) != null &&
466 <                (r = a.result) != null && claim(d)) {
467 <                nowApply(async, d, r, fn);
468 <                src = null; fn = null;
462 <                if (d.result != null) return d;
463 <            }
464 <            return null;
458 >        UniApply(Executor executor, CompletableFuture<U> dep,
459 >                 CompletableFuture<?> src, Function<? super T,? extends U> fn) {
460 >            super(executor, dep, src); this.fn = fn;
461 >        }
462 >        final CompletableFuture<?> tryFire(int mode) {
463 >            CompletableFuture<U> d; CompletableFuture<?> a;
464 >            if ((d = dep) == null ||
465 >                !d.uniApply(a = src, fn, mode > 0 ? null : this))
466 >                return null;
467 >            dep = null; src = null; fn = null;
468 >            return d.postFire(a, mode);
469          }
470      }
471  
472 <    private <U> CompletableFuture<U> doThenApply(
473 <        Function<? super T,? extends U> fn, Executor e) {
474 <        if (fn == null) throw new NullPointerException();
475 <        CompletableFuture<U> d = new CompletableFuture<U>();
476 <        Object r = result;
477 <        if (r == null)
478 <            unipush(new DelayedApply<T,U>(e, d, this, fn));
479 <        else
480 <            nowApply(e, d, r, fn);
481 <        return d;
482 <    }
479 <
480 <    // Consumer/accept
481 <
482 <    static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
483 <                                Object r, Consumer<? super T> f) {
484 <        if (d != null && f != null) {
485 <            T t; Throwable x;
486 <            if (r instanceof AltResult) {
487 <                t = null;
488 <                x = ((AltResult)r).ex;
489 <            }
490 <            else {
491 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
492 <                x = null;
493 <            }
494 <            if (x == null) {
495 <                try {
496 <                    if (e != null) {
497 <                        e.execute(new AsyncAccept<T,U>(d, t, f));
498 <                        return;
499 <                    }
500 <                    f.accept(t);
501 <                } catch (Throwable ex) {
502 <                    x = ex;
472 >    final <S> boolean uniApply(CompletableFuture<?> a,
473 >                               Function<? super S,? extends T> f,
474 >                               UniApply<S,T> c) {
475 >        Object r; T u; Throwable x;
476 >        if (a == null || (r = a.result) == null || f == null)
477 >            return false;
478 >        if (result == null) {
479 >            try {
480 >                if (r instanceof AltResult) {
481 >                    x = ((AltResult)r).ex;
482 >                    r = null;
483                  }
484 +                else
485 +                    x = null;
486 +                if (x != null)
487 +                    u = null;
488 +                else if (c != null && !c.claim())
489 +                    return false;
490 +                else {
491 +                    @SuppressWarnings("unchecked") S t = (S) r;
492 +                    u = f.apply(t);
493 +                }
494 +            } catch (Throwable ex) {
495 +                x = ex;
496 +                u = null;
497              }
498 <            d.internalComplete(encodeOutcome(null, x));
498 >            internalComplete(encodeOutcome(u, x));
499          }
500 +        return true;
501      }
502  
503 <    static final class AsyncAccept<T,U> extends Async<U> {
504 <        T arg; Consumer<? super T> fn;
505 <        AsyncAccept(CompletableFuture<U> dep, T arg,
506 <                    Consumer<? super T> fn) {
507 <            super(dep); this.arg = arg; this.fn = fn;
503 >    private <U> CompletableFuture<U> uniApplyStage(
504 >        Executor e, Function<? super T,? extends U> f) {
505 >        if (f == null) throw new NullPointerException();
506 >        CompletableFuture<U> d =  new CompletableFuture<U>();
507 >        if (e != null || !d.uniApply(this, f, null)) {
508 >            UniApply<T,U> c = new UniApply<T,U>(e, d, this, f);
509 >            push(c);
510 >            c.tryFire(SYNC);
511          }
512 <        final void compute() { nowAccept(null, dep, arg, fn); }
516 <        private static final long serialVersionUID = 5232453952276885070L;
512 >        return d;
513      }
514  
515 <    static final class DelayedAccept<T> extends UniCompletion<Void> {
515 >    @SuppressWarnings("serial")
516 >    static final class UniAccept<T> extends UniCompletion<Void> {
517          Consumer<? super T> fn;
518 <        DelayedAccept(Executor async, CompletableFuture<Void> dep,
519 <                      CompletableFuture<?> src, Consumer<? super T> fn) {
520 <            super(async, dep, src); this.fn = fn;
521 <        }
522 <        final CompletableFuture<?> tryAct() {
523 <            CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
524 <            if ((d = dep) != null && (a = src) != null &&
525 <                (r = a.result) != null && claim(d)) {
526 <                nowAccept(async, d, r, fn);
527 <                src = null; fn = null;
528 <                if (d.result != null) return d;
532 <            }
533 <            return null;
518 >        UniAccept(Executor executor, CompletableFuture<Void> dep,
519 >                  CompletableFuture<?> src, Consumer<? super T> fn) {
520 >            super(executor, dep, src); this.fn = fn;
521 >        }
522 >        final CompletableFuture<?> tryFire(int mode) {
523 >            CompletableFuture<Void> d; CompletableFuture<?> a;
524 >            if ((d = dep) == null ||
525 >                !d.uniAccept(a = src, fn, mode > 0 ? null : this))
526 >                return null;
527 >            dep = null; src = null; fn = null;
528 >            return d.postFire(a, mode);
529          }
530      }
531  
532 <    private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
533 <                                                 Executor e) {
534 <        if (fn == null) throw new NullPointerException();
535 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
536 <        Object r = result;
537 <        if (r == null)
538 <            unipush(new DelayedAccept<T>(e, d, this, fn));
539 <        else
540 <            nowAccept(e, d, r, fn);
541 <        return d;
542 <    }
543 <
544 <    // Runnable/run
545 <
546 <    static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
547 <                           Runnable f) {
548 <        if (d != null && f != null) {
549 <            Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
550 <            if (x == null) {
556 <                try {
557 <                    if (e != null) {
558 <                        e.execute(new AsyncRun<T>(d, f));
559 <                        return;
532 >    final <S> boolean uniAccept(CompletableFuture<?> a,
533 >                                Consumer<? super S> f, UniAccept<S> c) {
534 >        Object r; Throwable x;
535 >        if (a == null || (r = a.result) == null || f == null)
536 >            return false;
537 >        if (result == null) {
538 >            try {
539 >                if (r instanceof AltResult) {
540 >                    x = ((AltResult)r).ex;
541 >                    r = null;
542 >                }
543 >                else
544 >                    x = null;
545 >                if (x == null) {
546 >                    if (c != null && !c.claim())
547 >                        return false;
548 >                    else {
549 >                        @SuppressWarnings("unchecked") S t = (S) r;
550 >                        f.accept(t);
551                      }
561                    f.run();
562                } catch (Throwable ex) {
563                    x = ex;
552                  }
553 +            } catch (Throwable ex) {
554 +                x = ex;
555              }
556 <            d.internalComplete(encodeOutcome(null, x));
556 >            internalComplete(encodeOutcome(null, x));
557          }
558 +        return true;
559      }
560  
561 <    static final class AsyncRun<T> extends Async<T> {
562 <        Runnable fn;
563 <        AsyncRun(CompletableFuture<T> dep, Runnable fn) {
564 <            super(dep); this.fn = fn;
561 >    private CompletableFuture<Void> uniAcceptStage(Executor e,
562 >                                                   Consumer<? super T> f) {
563 >        if (f == null) throw new NullPointerException();
564 >        CompletableFuture<Void> d = new CompletableFuture<Void>();
565 >        if (e != null || !d.uniAccept(this, f, null)) {
566 >            UniAccept<T> c = new UniAccept<T>(e, d, this, f);
567 >            push(c);
568 >            c.tryFire(SYNC);
569          }
570 <        final void compute() { nowRun(null, dep, null, fn); }
576 <        private static final long serialVersionUID = 5232453952276885070L;
570 >        return d;
571      }
572  
573 <    static final class DelayedRun extends UniCompletion<Void> {
573 >    @SuppressWarnings("serial")
574 >    static final class UniRun extends UniCompletion<Void> {
575          Runnable fn;
576 <        DelayedRun(Executor async, CompletableFuture<Void> dep,
577 <                   CompletableFuture<?> src, Runnable fn) {
578 <            super(async, dep, src); this.fn = fn;
579 <        }
580 <        final CompletableFuture<?> tryAct() {
581 <            CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
582 <            if ((d = dep) != null && (a = src) != null &&
583 <                (r = a.result) != null && claim(d)) {
584 <                nowRun(async, d, r, fn);
585 <                src = null; fn = null; // clear refs
586 <                if (d.result != null) return d;
592 <            }
593 <            return null;
576 >        UniRun(Executor executor, CompletableFuture<Void> dep,
577 >               CompletableFuture<?> src, Runnable fn) {
578 >            super(executor, dep, src); this.fn = fn;
579 >        }
580 >        final CompletableFuture<?> tryFire(int mode) {
581 >            CompletableFuture<Void> d; CompletableFuture<?> a;
582 >            if ((d = dep) == null ||
583 >                !d.uniRun(a = src, fn, mode > 0 ? null : this))
584 >                return null;
585 >            dep = null; src = null; fn = null;
586 >            return d.postFire(a, mode);
587          }
588      }
589  
590 <    private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
591 <        if (fn == null) throw new NullPointerException();
592 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
593 <        Object r = result;
594 <        if (r == null)
602 <            unipush(new DelayedRun(e, d, this, fn));
603 <        else
604 <            nowRun(e, d, r, fn);
605 <        return d;
606 <    }
607 <
608 <    // Supplier/get
609 <
610 <    static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
611 <        if (d != null && f != null) {
612 <            T t; Throwable x;
590 >    final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun c) {
591 >        Object r; Throwable x;
592 >        if (a == null || (r = a.result) == null || f == null)
593 >            return false;
594 >        if (result == null) {
595              try {
596 <                t = f.get();
597 <                x = null;
596 >                if (r instanceof AltResult)
597 >                    x = ((AltResult)r).ex;
598 >                else
599 >                    x = null;
600 >                if (x == null) {
601 >                    if (c != null && !c.claim())
602 >                        return false;
603 >                    else
604 >                        f.run();
605 >                }
606              } catch (Throwable ex) {
607                  x = ex;
618                t = null;
608              }
609 <            d.internalComplete(encodeOutcome(t, x));
609 >            internalComplete(encodeOutcome(null, x));
610          }
611 +        return true;
612      }
613  
614 <    static final class AsyncSupply<T> extends Async<T> {
615 <        Supplier<T> fn;
616 <        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
617 <            super(dep); this.fn = fn;
614 >    private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
615 >        if (f == null) throw new NullPointerException();
616 >        CompletableFuture<Void> d = new CompletableFuture<Void>();
617 >        if (e != null || !d.uniRun(this, f, null)) {
618 >            UniRun c = new UniRun(e, d, this, f);
619 >            push(c);
620 >            c.tryFire(SYNC);
621          }
622 <        final void compute() { nowSupply(dep, fn); }
630 <        private static final long serialVersionUID = 5232453952276885070L;
622 >        return d;
623      }
624  
625 <    // WhenComplete
625 >    @SuppressWarnings("serial")
626 >    static final class UniWhenComplete<T> extends UniCompletion<T> {
627 >        BiConsumer<? super T, ? super Throwable> fn;
628 >        UniWhenComplete(Executor executor, CompletableFuture<T> dep,
629 >                        CompletableFuture<?> src,
630 >                        BiConsumer<? super T, ? super Throwable> fn) {
631 >            super(executor, dep, src); this.fn = fn;
632 >        }
633 >        final CompletableFuture<?> tryFire(int mode) {
634 >            CompletableFuture<T> d; CompletableFuture<?> a;
635 >            if ((d = dep) == null ||
636 >                !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this))
637 >                return null;
638 >            dep = null; src = null; fn = null;
639 >            return d.postFire(a, mode);
640 >        }
641 >    }
642  
643 <    static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
644 <                            BiConsumer<? super T,? super Throwable> f) {
645 <        if (d != null && f != null) {
646 <            T t; Throwable x, dx;
647 <            if (r instanceof AltResult) {
648 <                t = null;
649 <                x = ((AltResult)r).ex;
650 <            }
643 <            else {
644 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
645 <                x = null;
646 <            }
643 >    final boolean uniWhenComplete(CompletableFuture<?> a,
644 >                                  BiConsumer<? super T,? super Throwable> f,
645 >                                  UniWhenComplete<T> c) {
646 >        Object r;
647 >        if (a == null || (r = a.result) == null || f == null)
648 >            return false;
649 >        if (result == null) {
650 >            Throwable x = null, y = null;
651              try {
652 <                if (e != null) {
653 <                    e.execute(new AsyncWhen<T>(d, r, f));
654 <                    return;
652 >                if (r instanceof AltResult) {
653 >                    x = ((AltResult)r).ex;
654 >                    r = null;
655 >                }
656 >                if (c != null && !c.claim())
657 >                    return false;
658 >                else {
659 >                    @SuppressWarnings("unchecked") T t = (T) r;
660 >                    f.accept(t, x);
661                  }
652                f.accept(t, x);
653                dx = null;
662              } catch (Throwable ex) {
663 <                dx = ex;
663 >                y = ex;
664              }
665 <            d.internalComplete(encodeOutcome(t, x != null ? x : dx));
665 >            internalComplete(encodeOutcome(r, x != null ? x : y));
666          }
667 +        return true;
668      }
669  
670 <    static final class AsyncWhen<T> extends Async<T> {
671 <        Object arg; BiConsumer<? super T,? super Throwable> fn;
672 <        AsyncWhen(CompletableFuture<T> dep, Object arg,
673 <                  BiConsumer<? super T,? super Throwable> fn) {
674 <            super(dep); this.arg = arg; this.fn = fn;
670 >    private CompletableFuture<T> uniWhenCompleteStage(
671 >        Executor e, BiConsumer<? super T, ? super Throwable> f) {
672 >        if (f == null) throw new NullPointerException();
673 >        CompletableFuture<T> d = new CompletableFuture<T>();
674 >        if (e != null || !d.uniWhenComplete(this, f, null)) {
675 >            UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
676 >            push(c);
677 >            c.tryFire(SYNC);
678          }
679 <        final void compute() { nowWhen(null, dep, arg, fn); }
668 <        private static final long serialVersionUID = 5232453952276885070L;
679 >        return d;
680      }
681  
682 <    static final class DelayedWhen<T> extends UniCompletion<T> {
683 <        BiConsumer<? super T, ? super Throwable> fn;
684 <        DelayedWhen(Executor async, CompletableFuture<T> dep,
685 <                    CompletableFuture<?> src,
686 <                    BiConsumer<? super T, ? super Throwable> fn) {
687 <            super(async, dep, src); this.fn = fn;
688 <        }
689 <        final CompletableFuture<?> tryAct() {
690 <            CompletableFuture<T> d; CompletableFuture<?> a; Object r;
691 <            if ((d = dep) != null && (a = src) != null &&
692 <                (r = a.result) != null && claim(d)) {
693 <                nowWhen(async, d, r, fn);
694 <                src = null; fn = null;
695 <                if (d.result != null) return d;
696 <            }
686 <            return null;
682 >    @SuppressWarnings("serial")
683 >    static final class UniHandle<T,U> extends UniCompletion<U> {
684 >        BiFunction<? super T, Throwable, ? extends U> fn;
685 >        UniHandle(Executor executor, CompletableFuture<U> dep,
686 >                  CompletableFuture<?> src,
687 >                  BiFunction<? super T, Throwable, ? extends U> fn) {
688 >            super(executor, dep, src); this.fn = fn;
689 >        }
690 >        final CompletableFuture<?> tryFire(int mode) {
691 >            CompletableFuture<U> d; CompletableFuture<?> a;
692 >            if ((d = dep) == null ||
693 >                !d.uniHandle(a = src, fn, mode > 0 ? null : this))
694 >                return null;
695 >            dep = null; src = null; fn = null;
696 >            return d.postFire(a, mode);
697          }
698      }
699  
700 <    private CompletableFuture<T> doWhenComplete(
701 <        BiConsumer<? super T, ? super Throwable> fn, Executor e) {
702 <        if (fn == null) throw new NullPointerException();
703 <        CompletableFuture<T> d = new CompletableFuture<T>();
704 <        Object r = result;
705 <        if (r == null)
706 <            unipush(new DelayedWhen<T>(e, d, this, fn));
697 <        else
698 <            nowWhen(e, d, r, fn);
699 <        return d;
700 <    }
701 <
702 <    // Handle
703 <
704 <    static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
705 <                                BiFunction<? super T, Throwable, ? extends U> f) {
706 <        if (d != null && f != null) {
707 <            T t; U u; Throwable x, dx;
708 <            if (r instanceof AltResult) {
709 <                t = null;
710 <                x = ((AltResult)r).ex;
711 <            }
712 <            else {
713 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
714 <                x = null;
715 <            }
700 >    final <S> boolean uniHandle(CompletableFuture<?> a,
701 >                                BiFunction<? super S, Throwable, ? extends T> f,
702 >                                UniHandle<S,T> c) {
703 >        Object r; T u; Throwable x, y;
704 >        if (a == null || (r = a.result) == null || f == null)
705 >            return false;
706 >        if (result == null) {
707              try {
708 <                if (e != null) {
709 <                    e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
710 <                    return;
708 >                if (r instanceof AltResult) {
709 >                    x = ((AltResult)r).ex;
710 >                    r = null;
711 >                }
712 >                else
713 >                    x = null;
714 >                if (c != null && !c.claim())
715 >                    return false;
716 >                else {
717 >                    @SuppressWarnings("unchecked") S t = (S) r;
718 >                    u = f.apply(t, x);
719 >                    y = null;
720                  }
721                u = f.apply(t, x);
722                dx = null;
721              } catch (Throwable ex) {
722 <                dx = ex;
722 >                y = ex;
723                  u = null;
724              }
725 <            d.internalComplete(encodeOutcome(u, dx));
728 <        }
729 <    }
730 <
731 <    static final class DelayedHandle<T,U> extends UniCompletion<U> {
732 <        BiFunction<? super T, Throwable, ? extends U> fn;
733 <        DelayedHandle(Executor async, CompletableFuture<U> dep,
734 <                      CompletableFuture<?> src,
735 <                      BiFunction<? super T, Throwable, ? extends U> fn) {
736 <            super(async, dep, src); this.fn = fn;
737 <        }
738 <        final CompletableFuture<?> tryAct() {
739 <            CompletableFuture<U> d; CompletableFuture<?> a; Object r;
740 <            if ((d = dep) != null && (a = src) != null &&
741 <                (r = a.result) != null && claim(d)) {
742 <                nowHandle(async, d, r, fn);
743 <                src = null; fn = null;
744 <                if (d.result != null) return d;
745 <            }
746 <            return null;
725 >            internalComplete(encodeOutcome(u, y));
726          }
727 +        return true;
728      }
729  
730 <    private <U> CompletableFuture<U> doHandle(
731 <        BiFunction<? super T, Throwable, ? extends U> fn,
732 <        Executor e) {
753 <        if (fn == null) throw new NullPointerException();
730 >    private <U> CompletableFuture<U> uniHandleStage(
731 >        Executor e, BiFunction<? super T, Throwable, ? extends U> f) {
732 >        if (f == null) throw new NullPointerException();
733          CompletableFuture<U> d = new CompletableFuture<U>();
734 <        Object r = result;
735 <        if (r == null)
736 <            unipush(new DelayedHandle<T,U>(e, d, this, fn));
737 <        else
759 <            nowHandle(e, d, r, fn);
760 <        return d;
761 <    }
762 <
763 <    // Exceptionally
764 <
765 <    static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
766 <                                     Function<? super Throwable, ? extends T> f) {
767 <        if (d != null && f != null) {
768 <            T t; Throwable x, dx;
769 <            if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
770 <                try {
771 <                    t = f.apply(x);
772 <                    dx = null;
773 <                } catch (Throwable ex) {
774 <                    dx = ex;
775 <                    t = null;
776 <                }
777 <            }
778 <            else {
779 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
780 <                dx = null;
781 <            }
782 <            d.internalComplete(encodeOutcome(t, dx));
734 >        if (e != null || !d.uniHandle(this, f, null)) {
735 >            UniHandle<T,U> c = new UniHandle<T,U>(e, d, this, f);
736 >            push(c);
737 >            c.tryFire(SYNC);
738          }
739 +        return d;
740      }
741  
742 <    static final class DelayedExceptionally<T> extends UniCompletion<T> {
742 >    @SuppressWarnings("serial")
743 >    static final class UniExceptionally<T> extends UniCompletion<T> {
744          Function<? super Throwable, ? extends T> fn;
745 <        DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
746 <                             Function<? super Throwable, ? extends T> fn) {
745 >        UniExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
746 >                         Function<? super Throwable, ? extends T> fn) {
747              super(null, dep, src); this.fn = fn;
748          }
749 <        final CompletableFuture<?> tryAct() {
750 <            CompletableFuture<T> d; CompletableFuture<?> a; Object r;
751 <            if ((d = dep) != null && (a = src) != null &&
752 <                (r = a.result) != null && claim(d)) {
753 <                nowExceptionally(d, r, fn);
754 <                src = null; fn = null;
798 <                if (d.result != null) return d;
799 <            }
800 <            return null;
749 >        final CompletableFuture<?> tryFire(int mode) { // never ASYNC
750 >            CompletableFuture<T> d; CompletableFuture<?> a;
751 >            if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this))
752 >                return null;
753 >            dep = null; src = null; fn = null;
754 >            return d.postFire(a, mode);
755          }
756      }
757  
758 <    private CompletableFuture<T> doExceptionally(
759 <        Function<Throwable, ? extends T> fn) {
760 <        if (fn == null) throw new NullPointerException();
761 <        CompletableFuture<T> d = new CompletableFuture<T>();
762 <        Object r = result;
763 <        if (r == null)
764 <            unipush(new DelayedExceptionally<T>(d, this, fn));
758 >    final boolean uniExceptionally(CompletableFuture<?> a,
759 >                                   Function<? super Throwable, ? extends T> f,
760 >                                   UniExceptionally<T> c) {
761 >        Object r; T t; Throwable x, y;
762 >        if (a == null || (r = a.result) == null || f == null)
763 >            return false;
764 >        if (r instanceof AltResult)
765 >            x = ((AltResult)r).ex;
766          else
767 <            nowExceptionally(d, r, fn);
768 <        return d;
767 >            x = null;
768 >        if (result == null) {
769 >            try {
770 >                if (c != null && !c.claim())
771 >                    return false;
772 >                else if (x == null) {
773 >                    @SuppressWarnings("unchecked") T tr = (T) r;
774 >                    t = tr;
775 >                    y = null;
776 >                }
777 >                else {
778 >                    t = f.apply(x);
779 >                    y = null;
780 >                }
781 >            } catch (Throwable ex) {
782 >                y = ex;
783 >                t = null;
784 >            }
785 >            internalComplete(encodeOutcome(t, y));
786 >        }
787 >        return true;
788      }
789  
790 <    // Identity function used by nowCompose and anyOf
791 <
792 <    static <T> void nowCopy(CompletableFuture<T> d, Object r) {
793 <        if (d != null && d.result == null) {
794 <            Throwable x;
795 <            d.internalComplete(((r instanceof AltResult) &&
796 <                                (x = ((AltResult)r).ex) != null &&
797 <                                !(x instanceof CompletionException)) ?
824 <                               new AltResult(new CompletionException(x)): r);
790 >    private CompletableFuture<T> uniExceptionallyStage(
791 >        Function<Throwable, ? extends T> f) {
792 >        if (f == null) throw new NullPointerException();
793 >        CompletableFuture<T> d = new CompletableFuture<T>();
794 >        if (!d.uniExceptionally(this, f, null)) {
795 >            UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
796 >            push(c);
797 >            c.tryFire(SYNC);
798          }
799 +        return d;
800      }
801  
802 <    static final class DelayedCopy<T> extends UniCompletion<T> {
803 <        DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
802 >    @SuppressWarnings("serial")
803 >    static final class UniRelay<T> extends UniCompletion<T> { // for Compose
804 >        UniRelay(CompletableFuture<T> dep, CompletableFuture<?> src) {
805              super(null, dep, src);
806          }
807 <        final CompletableFuture<?> tryAct() {
808 <            CompletableFuture<T> d; CompletableFuture<?> a; Object r;
809 <            if ((d = dep) != null && (a = src) != null &&
810 <                (r = a.result) != null && claim(d)) {
811 <                nowCopy(d, r);
812 <                src = null;
838 <                if (d.result != null) return d;
839 <            }
840 <            return null;
807 >        final CompletableFuture<?> tryFire(int mode) {
808 >            CompletableFuture<T> d; CompletableFuture<?> a;
809 >            if ((d = dep) == null || !d.uniRelay(a = src))
810 >                return null;
811 >            src = null; dep = null;
812 >            return d.postFire(a, mode);
813          }
814      }
815  
816 <    // Compose
816 >    final boolean uniRelay(CompletableFuture<?> a) {
817 >        Object r;
818 >        if (a == null || (r = a.result) == null)
819 >            return false;
820 >        Object s = encodeRelay(r);
821 >        if (result == null) // no need to claim
822 >            internalComplete(s);
823 >        return true;
824 >    }
825  
826 <    static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
827 <                                 Function<? super T, ? extends CompletionStage<U>> f) {
828 <        if (d != null && f != null) {
829 <            T t; Throwable x;
830 <            if (r instanceof AltResult) {
831 <                t = null;
832 <                x = ((AltResult)r).ex;
833 <            }
834 <            else {
835 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
836 <                x = null;
837 <            }
838 <            if (x == null) {
839 <                try {
840 <                    if (e != null)
841 <                        e.execute(new AsyncCompose<T,U>(d, t, f));
826 >    @SuppressWarnings("serial")
827 >    static final class UniCompose<T,U> extends UniCompletion<U> {
828 >        Function<? super T, ? extends CompletionStage<U>> fn;
829 >        UniCompose(Executor executor, CompletableFuture<U> dep,
830 >                   CompletableFuture<?> src,
831 >                   Function<? super T, ? extends CompletionStage<U>> fn) {
832 >            super(executor, dep, src); this.fn = fn;
833 >        }
834 >        final CompletableFuture<?> tryFire(int mode) {
835 >            CompletableFuture<U> d; CompletableFuture<?> a;
836 >            if ((d = dep) == null ||
837 >                !d.uniCompose(a = src, fn, mode > 0 ? null : this))
838 >                return null;
839 >            dep = null; src = null; fn = null;
840 >            return d.postFire(a, mode);
841 >        }
842 >    }
843 >
844 >    final <S> boolean uniCompose(
845 >        CompletableFuture<?> a,
846 >        Function<? super S, ? extends CompletionStage<T>> f,
847 >        UniCompose<S,T> c) {
848 >        Object r; Throwable x;
849 >        if (a == null || (r = a.result) == null || f == null)
850 >            return false;
851 >        if (result == null) {
852 >            try {
853 >                if (r instanceof AltResult) {
854 >                    x = ((AltResult)r).ex;
855 >                    r = null;
856 >                }
857 >                else
858 >                    x = null;
859 >                if (x == null) {
860 >                    if (c != null && !c.claim())
861 >                        return false;
862                      else {
863 <                        CompletableFuture<U> c =
863 >                        @SuppressWarnings("unchecked") S t = (S) r;
864 >                        CompletableFuture<T> g =
865                              f.apply(t).toCompletableFuture();
866 <                        Object s = c.result;
867 <                        if (s == null)
868 <                            c.unipush(new DelayedCopy<U>(d, c));
869 <                        else
870 <                            nowCopy(d, s);
866 >                        if (g.result == null || !uniRelay(g)) {
867 >                            UniRelay<T> copy = new UniRelay<T>(this, g);
868 >                            g.push(copy);
869 >                            copy.tryFire(SYNC);
870 >                            if (result == null)
871 >                                return false;
872 >                        }
873                      }
871                    return;
872                } catch (Throwable ex) {
873                    x = ex;
874                  }
875 +            } catch (Throwable ex) {
876 +                x = ex;
877              }
878 <            d.internalComplete(encodeOutcome(null, x));
879 <        }
878 <    }
879 <
880 <    static final class AsyncCompose<T,U> extends Async<U> {
881 <        T arg; Function<? super T, ? extends CompletionStage<U>> fn;
882 <        AsyncCompose(CompletableFuture<U> dep, T arg,
883 <                     Function<? super T, ? extends CompletionStage<U>> fn) {
884 <            super(dep); this.arg = arg; this.fn = fn;
878 >            if (x != null)
879 >                internalComplete(encodeOutcome(null, x));
880          }
881 <        final void compute() { nowCompose(null, dep, arg, fn); }
887 <        private static final long serialVersionUID = 5232453952276885070L;
881 >        return true;
882      }
883  
884 <    static final class DelayedCompose<T,U> extends UniCompletion<U> {
885 <        Function<? super T, ? extends CompletionStage<U>> fn;
886 <        DelayedCompose(Executor async, CompletableFuture<U> dep,
887 <                       CompletableFuture<?> src,
888 <                       Function<? super T, ? extends CompletionStage<U>> fn) {
889 <            super(async, dep, src); this.fn = fn;
890 <        }
891 <        final CompletableFuture<?> tryAct() {
892 <            CompletableFuture<U> d; CompletableFuture<?> a; Object r;
893 <            if ((d = dep) != null && (a = src) != null &&
894 <                (r = a.result) != null && claim(d)) {
895 <                nowCompose(async, d, r, fn);
896 <                src = null; fn = null;
897 <                if (d.result != null) return d;
884 >    private <U> CompletableFuture<U> uniComposeStage(
885 >        Executor e, Function<? super T, ? extends CompletionStage<U>> f) {
886 >        if (f == null) throw new NullPointerException();
887 >        Object r; Throwable x;
888 >        if (e == null && (r = result) != null) {
889 >            try {   // try to return function result
890 >                if (r instanceof AltResult) {
891 >                    x = ((AltResult)r).ex;
892 >                    r = null;
893 >                }
894 >                else
895 >                    x = null;
896 >                if (x == null) {
897 >                    @SuppressWarnings("unchecked") T t = (T) r;
898 >                    return f.apply(t).toCompletableFuture();
899 >                }
900 >            } catch (Throwable ex) {
901 >                x = ex;
902              }
905            return null;
903          }
904 <    }
905 <
906 <    private <U> CompletableFuture<U> doThenCompose(
907 <        Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
908 <        if (fn == null) throw new NullPointerException();
909 <        Object r = result;
910 <        if (r == null || e != null) {
914 <            CompletableFuture<U> d = new CompletableFuture<U>();
915 <            if (r == null)
916 <                unipush(new DelayedCompose<T,U>(e, d, this, fn));
917 <            else
918 <                nowCompose(e, d, r, fn);
919 <            return d;
904 >        else
905 >            x = null;
906 >        CompletableFuture<U> d = new CompletableFuture<U>();
907 >        if (x == null) {
908 >            UniCompose<T,U> c = new UniCompose<T,U>(e, d, this, f);
909 >            push(c);
910 >            c.tryFire(SYNC);
911          }
912 <        else { // try to return function result
922 <            T t; Throwable x;
923 <            if (r instanceof AltResult) {
924 <                t = null;
925 <                x = ((AltResult)r).ex;
926 <            }
927 <            else {
928 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
929 <                x = null;
930 <            }
931 <            if (x == null) {
932 <                try {
933 <                    return fn.apply(t).toCompletableFuture();
934 <                } catch (Throwable ex) {
935 <                    x = ex;
936 <                }
937 <            }
938 <            CompletableFuture<U> d = new CompletableFuture<U>();
912 >        else
913              d.result = encodeOutcome(null, x);
914 <            return d;
941 <        }
914 >        return d;
915      }
916  
917 <    /* ------------- Two-source Completions -------------- */
917 >    /* ------------- Two-input Completions -------------- */
918  
919 <    /** A Completion with two sources */
919 >    /** A Completion for an action with two sources */
920 >    @SuppressWarnings("serial")
921      abstract static class BiCompletion<T> extends UniCompletion<T> {
922 <        CompletableFuture<?> snd; // second source for tryAct
923 <        BiCompletion(Executor async, CompletableFuture<T> dep,
922 >        CompletableFuture<?> snd; // second source for action
923 >        BiCompletion(Executor executor, CompletableFuture<T> dep,
924                       CompletableFuture<?> src, CompletableFuture<?> snd) {
925 <            super(async, dep, src); this.snd = snd;
925 >            super(executor, dep, src); this.snd = snd;
926          }
927      }
928  
929 <    /** A Completion delegating to another Completion */
929 >    /** A Completion delegating to a BiCompletion */
930 >    @SuppressWarnings("serial")
931      static final class CoCompletion extends Completion {
932 <        Completion completion;
933 <        CoCompletion(Completion completion) {
934 <            this.completion = completion;
935 <        }
936 <        final CompletableFuture<?> tryAct() {
962 <            Completion c; CompletableFuture<?> d;
963 <            if ((c = completion) == null || (d = c.tryAct()) == null)
932 >        BiCompletion<?> base;
933 >        CoCompletion(BiCompletion<?> base) { this.base = base; }
934 >        final CompletableFuture<?> tryFire(int mode) {
935 >            BiCompletion<?> c; CompletableFuture<?> d;
936 >            if ((c = base) == null || (d = c.tryFire(mode)) == null)
937                  return null;
938 <            completion = null; // detach
938 >            base = null; // detach
939              return d;
940          }
941 +        final boolean isLive() {
942 +            BiCompletion<?> c;
943 +            return (c = base) != null && c.dep != null;
944 +        }
945      }
946  
947 <    /* ------------- Two-source Anded -------------- */
948 <
949 <    /** Pushes c on to completions and o's completions unless both done. */
950 <    private void bipushAnded(CompletableFuture<?> o, Completion c) {
951 <        if (c != null && o != null) {
975 <            Object r; CompletableFuture<?> d;
976 <            while ((r = result) == null &&
977 <                   !casCompletions(c.next = completions, c))
947 >    /** Pushes completion to this and b unless both done. */
948 >    final void bipush(CompletableFuture<?> b, BiCompletion<?> c) {
949 >        if (c != null) {
950 >            Object r;
951 >            while ((r = result) == null && !casStack(c.next = stack, c))
952                  c.next = null;
953 <            if (o.result == null) {
953 >            if (b != null && b != this && b.result == null) {
954                  Completion q = (r != null) ? c : new CoCompletion(c);
955 <                while (o.result == null &&
982 <                       !o.casCompletions(q.next = o.completions, q))
955 >                while (b.result == null && !b.casStack(q.next = b.stack, q))
956                      q.next = null;
957              }
985            if ((d = c.tryAct()) != null)
986                d.postComplete();
987            if (o.result != null)
988                o.postComplete();
989            if (result != null)
990                postComplete();
958          }
959      }
960  
961 <    // BiFunction/combine
962 <
963 <    static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
964 <                                   Object r, Object s,
965 <                                   BiFunction<? super T,? super U,? extends V> f) {
966 <        if (d != null && f != null) {
1000 <            T t; U u; V v; Throwable x;
1001 <            if (r instanceof AltResult) {
1002 <                t = null;
1003 <                x = ((AltResult)r).ex;
1004 <            }
1005 <            else {
1006 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1007 <                x = null;
1008 <            }
1009 <            if (x != null)
1010 <                u = null;
1011 <            else if (s instanceof AltResult) {
1012 <                x = ((AltResult)s).ex;
1013 <                u = null;
1014 <            }
1015 <            else {
1016 <                @SuppressWarnings("unchecked") U us = (U) s; u = us;
1017 <            }
1018 <            if (x == null) {
1019 <                try {
1020 <                    if (e != null) {
1021 <                        e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1022 <                        return;
1023 <                    }
1024 <                    v = f.apply(t, u);
1025 <                } catch (Throwable ex) {
1026 <                    x = ex;
1027 <                    v = null;
1028 <                }
1029 <            }
961 >    /** Post-processing after successful BiCompletion tryFire. */
962 >    final CompletableFuture<T> postFire(CompletableFuture<?> a,
963 >                                        CompletableFuture<?> b, int mode) {
964 >        if (b != null && b.stack != null) { // clean second source
965 >            if (mode < 0 || b.result == null)
966 >                b.cleanStack();
967              else
968 <                v = null;
1032 <            d.internalComplete(encodeOutcome(v, x));
968 >                b.postComplete();
969          }
970 +        return postFire(a, mode);
971      }
972  
973 <    static final class AsyncCombine<T,U,V> extends Async<V> {
974 <        T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
975 <        AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
976 <                     BiFunction<? super T,? super U,? extends V> fn) {
977 <            super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
973 >    @SuppressWarnings("serial")
974 >    static final class BiApply<T,U,V> extends BiCompletion<V> {
975 >        BiFunction<? super T,? super U,? extends V> fn;
976 >        BiApply(Executor executor, CompletableFuture<V> dep,
977 >                CompletableFuture<?> src, CompletableFuture<?> snd,
978 >                BiFunction<? super T,? super U,? extends V> fn) {
979 >            super(executor, dep, src, snd); this.fn = fn;
980 >        }
981 >        final CompletableFuture<?> tryFire(int mode) {
982 >            CompletableFuture<V> d; CompletableFuture<?> a, b;
983 >            if ((d = dep) == null ||
984 >                !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this))
985 >                return null;
986 >            dep = null; snd = src = null; fn = null;
987 >            return d.postFire(a, b, mode);
988          }
1042        final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1043        private static final long serialVersionUID = 5232453952276885070L;
989      }
990  
991 <    static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
992 <        BiFunction<? super T,? super U,? extends V> fn;
993 <        DelayedCombine(Executor async, CompletableFuture<V> dep,
994 <                       CompletableFuture<?> src, CompletableFuture<?> snd,
995 <                       BiFunction<? super T,? super U,? extends V> fn) {
996 <            super(async, dep, src, snd); this.fn = fn;
997 <        }
998 <        final CompletableFuture<?> tryAct() {
999 <            CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1000 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1001 <                (r = a.result) != null && (s = b.result) != null && claim(d)) {
1002 <                nowCombine(async, d, r, s, fn);
1003 <                src = null; snd = null; fn = null;
1004 <                if (d.result != null) return d;
991 >    final <R,S> boolean biApply(CompletableFuture<?> a, CompletableFuture<?> b,
992 >                                BiFunction<? super R,? super S,? extends T> f,
993 >                                BiApply<R,S,T> c) {
994 >        Object r, s; T v; Throwable x;
995 >        if (a == null || (r = a.result) == null ||
996 >            b == null || (s = b.result) == null || f == null)
997 >            return false;
998 >        if (result == null) {
999 >            try {
1000 >                if (r instanceof AltResult) {
1001 >                    x = ((AltResult)r).ex;
1002 >                    r = null;
1003 >                }
1004 >                else
1005 >                    x = null;
1006 >                if (x == null && (s instanceof AltResult)) {
1007 >                    x = ((AltResult)s).ex;
1008 >                    s = null;
1009 >                }
1010 >                if (x != null)
1011 >                    v = null;
1012 >                else if (c != null && !c.claim())
1013 >                    return false;
1014 >                else {
1015 >                    @SuppressWarnings("unchecked") R t = (R) r;
1016 >                    @SuppressWarnings("unchecked") S u = (S) s;
1017 >                    v = f.apply(t, u);
1018 >                }
1019 >            } catch (Throwable ex) {
1020 >                x = ex;
1021 >                v = null;
1022              }
1023 <            return null;
1023 >            internalComplete(encodeOutcome(v, x));
1024          }
1025 +        return true;
1026      }
1027  
1028 <    private <U,V> CompletableFuture<V> doThenCombine(
1029 <        CompletableFuture<? extends U> o,
1030 <        BiFunction<? super T,? super U,? extends V> fn,
1031 <        Executor e) {
1032 <        if (o == null || fn == null) throw new NullPointerException();
1028 >    private <U,V> CompletableFuture<V> biApplyStage(
1029 >        Executor e, CompletionStage<? extends U> o,
1030 >        BiFunction<? super T,? super U,? extends V> f) {
1031 >        CompletableFuture<?> b;
1032 >        if (f == null || (b = o.toCompletableFuture()) == null)
1033 >            throw new NullPointerException();
1034          CompletableFuture<V> d = new CompletableFuture<V>();
1035 <        Object r = result, s = o.result;
1036 <        if (r == null || s == null)
1037 <            bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1038 <        else
1039 <            nowCombine(e, d, r, s, fn);
1035 >        if (e != null || !d.biApply(this, b, f, null)) {
1036 >            BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
1037 >            bipush(b, c);
1038 >            c.tryFire(SYNC);
1039 >        }
1040          return d;
1041      }
1042  
1043 <    // BiConsumer/AcceptBoth
1043 >    @SuppressWarnings("serial")
1044 >    static final class BiAccept<T,U> extends BiCompletion<Void> {
1045 >        BiConsumer<? super T,? super U> fn;
1046 >        BiAccept(Executor executor, CompletableFuture<Void> dep,
1047 >                 CompletableFuture<?> src, CompletableFuture<?> snd,
1048 >                 BiConsumer<? super T,? super U> fn) {
1049 >            super(executor, dep, src, snd); this.fn = fn;
1050 >        }
1051 >        final CompletableFuture<?> tryFire(int mode) {
1052 >            CompletableFuture<Void> d; CompletableFuture<?> a, b;
1053 >            if ((d = dep) == null ||
1054 >                !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
1055 >                return null;
1056 >            dep = null; snd = src = null; fn = null;
1057 >            return d.postFire(a, b, mode);
1058 >        }
1059 >    }
1060  
1061 <    static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1062 <                                      Object r, Object s,
1063 <                                      BiConsumer<? super T,? super U> f) {
1064 <        if (d != null && f != null) {
1065 <            T t; U u; Throwable x;
1066 <            if (r instanceof AltResult) {
1067 <                t = null;
1068 <                x = ((AltResult)r).ex;
1069 <            }
1070 <            else {
1071 <                @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1072 <                x = null;
1073 <            }
1074 <            if (x != null)
1075 <                u = null;
1076 <            else if (s instanceof AltResult) {
1077 <                x = ((AltResult)s).ex;
1078 <                u = null;
1079 <            }
1080 <            else {
1081 <                @SuppressWarnings("unchecked") U us = (U) s; u = us;
1082 <            }
1083 <            if (x == null) {
1084 <                try {
1085 <                    if (e != null) {
1086 <                        e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1107 <                        return;
1061 >    final <R,S> boolean biAccept(CompletableFuture<?> a, CompletableFuture<?> b,
1062 >                                 BiConsumer<? super R,? super S> f,
1063 >                                 BiAccept<R,S> c) {
1064 >        Object r, s; Throwable x;
1065 >        if (a == null || (r = a.result) == null ||
1066 >            b == null || (s = b.result) == null || f == null)
1067 >            return false;
1068 >        if (result == null) {
1069 >            try {
1070 >                if (r instanceof AltResult) {
1071 >                    x = ((AltResult)r).ex;
1072 >                    r = null;
1073 >                }
1074 >                else
1075 >                    x = null;
1076 >                if (x == null && (s instanceof AltResult)) {
1077 >                    x = ((AltResult)s).ex;
1078 >                    s = null;
1079 >                }
1080 >                if (x == null) {
1081 >                    if (c != null && !c.claim())
1082 >                        return false;
1083 >                    else {
1084 >                        @SuppressWarnings("unchecked") R t = (R) r;
1085 >                        @SuppressWarnings("unchecked") S u = (S) s;
1086 >                        f.accept(t, u);
1087                      }
1109                    f.accept(t, u);
1110                } catch (Throwable ex) {
1111                    x = ex;
1088                  }
1089 +            } catch (Throwable ex) {
1090 +                x = ex;
1091              }
1092 <            d.internalComplete(encodeOutcome(null, x));
1092 >            internalComplete(encodeOutcome(null, x));
1093          }
1094 +        return true;
1095      }
1096  
1097 <    static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1098 <        T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1099 <        AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1100 <                        BiConsumer<? super T,? super U> fn) {
1101 <            super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1097 >    private <U> CompletableFuture<Void> biAcceptStage(
1098 >        Executor e, CompletionStage<? extends U> o,
1099 >        BiConsumer<? super T,? super U> f) {
1100 >        CompletableFuture<?> b;
1101 >        if (f == null || (b = o.toCompletableFuture()) == null)
1102 >            throw new NullPointerException();
1103 >        CompletableFuture<Void> d = new CompletableFuture<Void>();
1104 >        if (e != null || !d.biAccept(this, b, f, null)) {
1105 >            BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
1106 >            bipush(b, c);
1107 >            c.tryFire(SYNC);
1108          }
1109 <        final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1125 <        private static final long serialVersionUID = 5232453952276885070L;
1109 >        return d;
1110      }
1111  
1112 <    static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1113 <        BiConsumer<? super T,? super U> fn;
1114 <        DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1115 <                          CompletableFuture<?> src, CompletableFuture<?> snd,
1116 <                          BiConsumer<? super T,? super U> fn) {
1117 <            super(async, dep, src, snd); this.fn = fn;
1118 <        }
1119 <        final CompletableFuture<?> tryAct() {
1120 <            CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1121 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1122 <                (r = a.result) != null && (s = b.result) != null && claim(d)) {
1123 <                nowAcceptBoth(async, d, r, s, fn);
1124 <                src = null; snd = null; fn = null;
1125 <                if (d.result != null) return d;
1126 <            }
1143 <            return null;
1112 >    @SuppressWarnings("serial")
1113 >    static final class BiRun extends BiCompletion<Void> {
1114 >        Runnable fn;
1115 >        BiRun(Executor executor, CompletableFuture<Void> dep,
1116 >              CompletableFuture<?> src, CompletableFuture<?> snd,
1117 >              Runnable fn) {
1118 >            super(executor, dep, src, snd); this.fn = fn;
1119 >        }
1120 >        final CompletableFuture<?> tryFire(int mode) {
1121 >            CompletableFuture<Void> d; CompletableFuture<?> a, b;
1122 >            if ((d = dep) == null ||
1123 >                !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this))
1124 >                return null;
1125 >            dep = null; snd = src = null; fn = null;
1126 >            return d.postFire(a, b, mode);
1127          }
1128      }
1129  
1130 <    private <U> CompletableFuture<Void> doThenAcceptBoth(
1131 <        CompletableFuture<? extends U> o,
1132 <        BiConsumer<? super T, ? super U> fn,
1133 <        Executor e) {
1134 <        if (o == null || fn == null) throw new NullPointerException();
1135 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1136 <        Object r = result, s = o.result;
1137 <        if (r == null || s == null)
1138 <            bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1139 <        else
1140 <            nowAcceptBoth(e, d, r, s, fn);
1141 <        return d;
1142 <    }
1143 <
1144 <    // Runnable/both
1145 <
1146 <    static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1147 <        Runnable fn;
1148 <        DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1149 <                            CompletableFuture<?> src, CompletableFuture<?> snd,
1150 <                            Runnable fn) {
1151 <            super(async, dep, src, snd); this.fn = fn;
1169 <        }
1170 <        final CompletableFuture<?> tryAct() {
1171 <            CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1172 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1173 <                (r = a.result) != null && (s = b.result) != null && claim(d)) {
1174 <                Throwable x = (r instanceof AltResult) ?
1175 <                    ((AltResult)r).ex : null;
1176 <                nowRun(async, d, (x == null) ? s : r, fn);
1177 <                src = null; snd = null; fn = null;
1178 <                if (d.result != null) return d;
1130 >    final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b,
1131 >                        Runnable f, BiRun c) {
1132 >        Object r, s; Throwable x;
1133 >        if (a == null || (r = a.result) == null ||
1134 >            b == null || (s = b.result) == null || f == null)
1135 >            return false;
1136 >        if (result == null) {
1137 >            try {
1138 >                if (r instanceof AltResult)
1139 >                    x = ((AltResult)r).ex;
1140 >                else
1141 >                    x = null;
1142 >                if (x == null && (s instanceof AltResult))
1143 >                    x = ((AltResult)s).ex;
1144 >                if (x == null) {
1145 >                    if (c != null && !c.claim())
1146 >                        return false;
1147 >                    else
1148 >                        f.run();
1149 >                }
1150 >            } catch (Throwable ex) {
1151 >                x = ex;
1152              }
1153 <            return null;
1153 >            internalComplete(encodeOutcome(null, x));
1154          }
1155 +        return true;
1156      }
1157  
1158 <    private CompletableFuture<Void> doRunAfterBoth(
1159 <        CompletableFuture<?> o, Runnable fn, Executor e) {
1160 <        if (o == null || fn == null) throw new NullPointerException();
1158 >    private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1159 >                                               Runnable f) {
1160 >        CompletableFuture<?> b;
1161 >        if (f == null || (b = o.toCompletableFuture()) == null)
1162 >            throw new NullPointerException();
1163          CompletableFuture<Void> d = new CompletableFuture<Void>();
1164 <        Object r = result, s = o.result;
1165 <        if (r == null || s == null)
1166 <            bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1167 <        else {
1192 <            Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1193 <            nowRun(e, d, (x == null) ? s : r, fn);
1164 >        if (e != null || !d.biRun(this, b, f, null)) {
1165 >            BiRun c = new BiRun(e, d, this, b, f);
1166 >            bipush(b, c);
1167 >            c.tryFire(SYNC);
1168          }
1169          return d;
1170      }
1171  
1172 <    // allOf
1173 <
1174 <    static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1175 <        if (d != null) {
1176 <            Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1177 <            if (x == null && (s instanceof AltResult))
1178 <                x = ((AltResult)s).ex;
1179 <            d.internalComplete(encodeOutcome(null, x));
1172 >    @SuppressWarnings("serial")
1173 >    static final class BiRelay extends BiCompletion<Void> { // for And
1174 >        BiRelay(CompletableFuture<Void> dep, CompletableFuture<?> src,
1175 >                CompletableFuture<?> snd) {
1176 >            super(null, dep, src, snd);
1177 >        }
1178 >        final CompletableFuture<?> tryFire(int mode) {
1179 >            CompletableFuture<Void> d; CompletableFuture<?> a, b;
1180 >            if ((d = dep) == null || !d.biRelay(a = src, b = snd))
1181 >                return null;
1182 >            snd = src = null; dep = null;
1183 >            return d.postFire(a, b, mode);
1184          }
1185      }
1186  
1187 <    static final class DelayedAnd extends BiCompletion<Void> {
1188 <        DelayedAnd(CompletableFuture<Void> dep,
1189 <                   CompletableFuture<?> src, CompletableFuture<?> snd) {
1190 <            super(null, dep, src, snd);
1191 <        }
1192 <        final CompletableFuture<?> tryAct() {
1193 <            CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1194 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1195 <                (r = a.result) != null && (s = b.result) != null && claim(d)) {
1196 <                nowAnd(d, r, s);
1197 <                src = null; snd = null;
1198 <                if (d.result != null) return d;
1199 <            }
1222 <            return null;
1187 >    private boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
1188 >        Object r, s; Throwable x;
1189 >        if (a == null || (r = a.result) == null ||
1190 >            b == null || (s = b.result) == null)
1191 >            return false;
1192 >        if (result == null) {
1193 >            if (r instanceof AltResult)
1194 >                x = ((AltResult)r).ex;
1195 >            else
1196 >                x = null;
1197 >            if (x == null && (s instanceof AltResult))
1198 >                x = ((AltResult)s).ex;
1199 >            internalComplete(encodeOutcome(null, x));
1200          }
1201 +        return true;
1202      }
1203  
1204 <    /** Recursively constructs a tree of And completions */
1205 <    private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1206 <                                                   int lo, int hi) {
1204 >    /** Recursively constructs a tree of completions */
1205 >    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1206 >                                           int lo, int hi) {
1207          CompletableFuture<Void> d = new CompletableFuture<Void>();
1208          if (lo > hi) // empty
1209              d.result = NIL;
1210          else {
1211 +            CompletableFuture<?> a, b;
1212              int mid = (lo + hi) >>> 1;
1213 <            CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1214 <                                        doAllOf(cfs, lo,    mid));
1215 <            CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1216 <                                        (hi == mid+1) ? cfs[hi] :
1217 <                                        doAllOf(cfs, mid+1, hi));
1218 <            Object r = fst.result, s = snd.result; // throw NPE if null elements
1219 <            if (r == null || s == null) {
1220 <                DelayedAnd a = new DelayedAnd(d, fst, snd);
1221 <                if (fst == snd)
1243 <                    fst.unipush(a);
1244 <                else
1245 <                    fst.bipushAnded(snd, a);
1213 >            if ((a = (lo == mid ? cfs[lo] :
1214 >                      andTree(cfs, lo,    mid))) == null ||
1215 >                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1216 >                      andTree(cfs, mid+1, hi)))  == null)
1217 >                throw new NullPointerException();
1218 >            if (!d.biRelay(a, b)) {
1219 >                BiRelay c = new BiRelay(d, a, b);
1220 >                a.bipush(b, c);
1221 >                c.tryFire(SYNC);
1222              }
1247            else
1248                nowAnd(d, r, s);
1223          }
1224          return d;
1225      }
1226  
1227 <    /* ------------- Two-source Ored -------------- */
1227 >    /* ------------- Projected (Ored) BiCompletions -------------- */
1228  
1229 <    /** Pushes c on to completions and o's completions unless either done. */
1230 <    private void bipushOred(CompletableFuture<?> o, Completion c) {
1231 <        if (c != null && o != null) {
1232 <            CompletableFuture<?> d;
1233 <            while (o.result == null && result == null) {
1234 <                if (casCompletions(c.next = completions, c)) {
1235 <                    CoCompletion q = new CoCompletion(c);
1236 <                    while (result == null && o.result == null &&
1237 <                           !o.casCompletions(q.next = o.completions, q))
1238 <                        q.next = null;
1229 >    /** Pushes completion to this and b unless either done. */
1230 >    final void orpush(CompletableFuture<?> b, BiCompletion<?> c) {
1231 >        if (c != null) {
1232 >            while ((b == null || b.result == null) && result == null) {
1233 >                if (casStack(c.next = stack, c)) {
1234 >                    if (b != null && b != this && b.result == null) {
1235 >                        Completion q = new CoCompletion(c);
1236 >                        while (result == null && b.result == null &&
1237 >                               !b.casStack(q.next = b.stack, q))
1238 >                            q.next = null;
1239 >                    }
1240                      break;
1241                  }
1242                  c.next = null;
1243              }
1269            if ((d = c.tryAct()) != null)
1270                d.postComplete();
1271            if (o.result != null)
1272                o.postComplete();
1273            if (result != null)
1274                postComplete();
1244          }
1245      }
1246  
1247 <    // Function/applyEither
1248 <
1280 <    static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1247 >    @SuppressWarnings("serial")
1248 >    static final class OrApply<T,U> extends BiCompletion<U> {
1249          Function<? super T,? extends U> fn;
1250 <        DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1251 <                             CompletableFuture<?> src, CompletableFuture<?> snd,
1252 <                             Function<? super T,? extends U> fn) {
1253 <            super(async, dep, src, snd); this.fn = fn;
1254 <        }
1255 <        final CompletableFuture<?> tryAct() {
1256 <            CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1257 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1258 <                ((r = a.result) != null || (r = b.result) != null) &&
1259 <                claim(d)) {
1260 <                nowApply(async, d, r, fn);
1261 <                src = null; snd = null; fn = null;
1262 <                if (d.result != null) return d;
1250 >        OrApply(Executor executor, CompletableFuture<U> dep,
1251 >                CompletableFuture<?> src, CompletableFuture<?> snd,
1252 >                Function<? super T,? extends U> fn) {
1253 >            super(executor, dep, src, snd); this.fn = fn;
1254 >        }
1255 >        final CompletableFuture<?> tryFire(int mode) {
1256 >            CompletableFuture<U> d; CompletableFuture<?> a, b;
1257 >            if ((d = dep) == null ||
1258 >                !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
1259 >                return null;
1260 >            dep = null; snd = src = null; fn = null;
1261 >            return d.postFire(a, b, mode);
1262 >        }
1263 >    }
1264 >
1265 >    final <S> boolean orApply(CompletableFuture<?> a, CompletableFuture<?> b,
1266 >                              Function<? super S, ? extends T> f,
1267 >                              OrApply<S,T> c) {
1268 >        Object r; T u; Throwable x;
1269 >        if (a == null || b == null ||
1270 >            ((r = a.result) == null && (r = b.result) == null) || f == null)
1271 >            return false;
1272 >        if (result == null) {
1273 >            try {
1274 >                if (r instanceof AltResult) {
1275 >                    x = ((AltResult)r).ex;
1276 >                    r = null;
1277 >                }
1278 >                else
1279 >                    x = null;
1280 >                if (x != null)
1281 >                    u = null;
1282 >                else if (c != null && !c.claim())
1283 >                    return false;
1284 >                else {
1285 >                    @SuppressWarnings("unchecked") S t = (S) r;
1286 >                    u = f.apply(t);
1287 >                }
1288 >            } catch (Throwable ex) {
1289 >                x = ex;
1290 >                u = null;
1291              }
1292 <            return null;
1292 >            internalComplete(encodeOutcome(u, x));
1293          }
1294 +        return true;
1295      }
1296  
1297 <    private <U> CompletableFuture<U> doApplyToEither(
1298 <        CompletableFuture<? extends T> o,
1299 <        Function<? super T, U> fn, Executor e) {
1300 <        if (o == null || fn == null) throw new NullPointerException();
1297 >    private <T,U> CompletableFuture<U> orApplyStage(
1298 >        Executor e, CompletionStage<? extends T> o, Function<? super T, U> f) {
1299 >        CompletableFuture<?> b;
1300 >        if (f == null || (b = o.toCompletableFuture()) == null)
1301 >            throw new NullPointerException();
1302          CompletableFuture<U> d = new CompletableFuture<U>();
1303 <        Object r = result;
1304 <        if (r == null && (r = o.result) == null)
1305 <            bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1306 <        else
1307 <            nowApply(e, d, r, fn);
1303 >        if (e != null || !d.orApply(this, b, f, null)) {
1304 >            OrApply<T,U> c = new OrApply<T,U>(e, d, this, b, f);
1305 >            orpush(b, c);
1306 >            c.tryFire(SYNC);
1307 >        }
1308          return d;
1309      }
1310  
1311 <    // Consumer/acceptEither
1312 <
1315 <    static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1311 >    @SuppressWarnings("serial")
1312 >    static final class OrAccept<T> extends BiCompletion<Void> {
1313          Consumer<? super T> fn;
1314 <        DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1315 <                            CompletableFuture<?> src, CompletableFuture<?> snd,
1316 <                            Consumer<? super T> fn) {
1317 <            super(async, dep, src, snd); this.fn = fn;
1318 <        }
1319 <        final CompletableFuture<?> tryAct() {
1320 <            CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1321 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1322 <                ((r = a.result) != null || (r = b.result) != null) &&
1323 <                claim(d)) {
1324 <                nowAccept(async, d, r, fn);
1325 <                src = null; snd = null; fn = null;
1326 <                if (d.result != null) return d;
1314 >        OrAccept(Executor executor, CompletableFuture<Void> dep,
1315 >                 CompletableFuture<?> src, CompletableFuture<?> snd,
1316 >                 Consumer<? super T> fn) {
1317 >            super(executor, dep, src, snd); this.fn = fn;
1318 >        }
1319 >        final CompletableFuture<?> tryFire(int mode) {
1320 >            CompletableFuture<Void> d; CompletableFuture<?> a, b;
1321 >            if ((d = dep) == null ||
1322 >                !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
1323 >                return null;
1324 >            dep = null; snd = src = null; fn = null;
1325 >            return d.postFire(a, b, mode);
1326 >        }
1327 >    }
1328 >
1329 >    final <S> boolean orAccept(CompletableFuture<?> a,
1330 >                               CompletableFuture<?> b,
1331 >                               Consumer<? super S> f, OrAccept<S> c) {
1332 >        Object r; Throwable x;
1333 >        if (a == null || b == null ||
1334 >            ((r = a.result) == null && (r = b.result) == null) || f == null)
1335 >            return false;
1336 >        if (result == null) {
1337 >            try {
1338 >                if (r instanceof AltResult) {
1339 >                    x = ((AltResult)r).ex;
1340 >                    r = null;
1341 >                }
1342 >                else
1343 >                    x = null;
1344 >                if (x == null) {
1345 >                    if (c != null && !c.claim())
1346 >                        return false;
1347 >                    else {
1348 >                        @SuppressWarnings("unchecked") S t = (S) r;
1349 >                        f.accept(t);
1350 >                    }
1351 >                }
1352 >            } catch (Throwable ex) {
1353 >                x = ex;
1354              }
1355 <            return null;
1355 >            internalComplete(encodeOutcome(null, x));
1356          }
1357 +        return true;
1358      }
1359  
1360 <    private CompletableFuture<Void> doAcceptEither(
1361 <        CompletableFuture<? extends T> o,
1362 <        Consumer<? super T> fn, Executor e) {
1363 <        if (o == null || fn == null) throw new NullPointerException();
1360 >    private CompletableFuture<Void> orAcceptStage(
1361 >        Executor e, CompletionStage<? extends T> o, Consumer<? super T> f) {
1362 >        CompletableFuture<?> b;
1363 >        if (f == null || (b = o.toCompletableFuture()) == null)
1364 >            throw new NullPointerException();
1365          CompletableFuture<Void> d = new CompletableFuture<Void>();
1366 <        Object r = result;
1367 <        if (r == null && (r = o.result) == null)
1368 <            bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1369 <        else
1370 <            nowAccept(e, d, r, fn);
1366 >        if (e != null || !d.orAccept(this, b, f, null)) {
1367 >            OrAccept<T> c = new OrAccept<T>(e, d, this, b, f);
1368 >            orpush(b, c);
1369 >            c.tryFire(SYNC);
1370 >        }
1371          return d;
1372      }
1373  
1374 <    // Runnable/runEither
1375 <
1350 <    static final class DelayedRunAfterEither extends BiCompletion<Void> {
1374 >    @SuppressWarnings("serial")
1375 >    static final class OrRun extends BiCompletion<Void> {
1376          Runnable fn;
1377 <        DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1378 <                              CompletableFuture<?> src,
1379 <                              CompletableFuture<?> snd, Runnable fn) {
1380 <            super(async, dep, src, snd); this.fn = fn;
1381 <        }
1382 <        final CompletableFuture<?> tryAct() {
1383 <            CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1384 <            if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1385 <                ((r = a.result) != null || (r = b.result) != null) &&
1386 <                claim(d)) {
1387 <                nowRun(async, d, r, fn);
1388 <                src = null; snd = null; fn = null;
1389 <                if (d.result != null) return d;
1377 >        OrRun(Executor executor, CompletableFuture<Void> dep,
1378 >              CompletableFuture<?> src, CompletableFuture<?> snd, Runnable fn) {
1379 >            super(executor, dep, src, snd); this.fn = fn;
1380 >        }
1381 >        final CompletableFuture<?> tryFire(int mode) {
1382 >            CompletableFuture<Void> d; CompletableFuture<?> a, b;
1383 >            if ((d = dep) == null ||
1384 >                !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this))
1385 >                return null;
1386 >            dep = null; snd = src = null; fn = null;
1387 >            return d.postFire(a, b, mode);
1388 >        }
1389 >    }
1390 >
1391 >    final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b,
1392 >                        Runnable f, OrRun c) {
1393 >        Object r; Throwable x;
1394 >        if (a == null || b == null ||
1395 >            ((r = a.result) == null && (r = b.result) == null) || f == null)
1396 >            return false;
1397 >        if (result == null) {
1398 >            try {
1399 >                if (r instanceof AltResult)
1400 >                    x = ((AltResult)r).ex;
1401 >                else
1402 >                    x = null;
1403 >                if (x == null) {
1404 >                    if (c != null && !c.claim())
1405 >                        return false;
1406 >                    else
1407 >                        f.run();
1408 >                }
1409 >            } catch (Throwable ex) {
1410 >                x = ex;
1411              }
1412 <            return null;
1412 >            internalComplete(encodeOutcome(null, x));
1413          }
1414 +        return true;
1415      }
1416  
1417 <    private CompletableFuture<Void> doRunAfterEither(
1418 <        CompletableFuture<?> o, Runnable fn, Executor e) {
1419 <        if (o == null || fn == null) throw new NullPointerException();
1417 >    private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1418 >                                               Runnable f) {
1419 >        CompletableFuture<?> b;
1420 >        if (f == null || (b = o.toCompletableFuture()) == null)
1421 >            throw new NullPointerException();
1422          CompletableFuture<Void> d = new CompletableFuture<Void>();
1423 <        Object r = result;
1424 <        if (r == null && (r = o.result) == null)
1425 <            bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1426 <        else
1427 <            nowRun(e, d, r, fn);
1423 >        if (e != null || !d.orRun(this, b, f, null)) {
1424 >            OrRun c = new OrRun(e, d, this, b, f);
1425 >            orpush(b, c);
1426 >            c.tryFire(SYNC);
1427 >        }
1428          return d;
1429      }
1430  
1431 <    /* ------------- Signallers -------------- */
1431 >    @SuppressWarnings("serial")
1432 >    static final class OrRelay extends BiCompletion<Object> { // for Or
1433 >        OrRelay(CompletableFuture<Object> dep, CompletableFuture<?> src,
1434 >                CompletableFuture<?> snd) {
1435 >            super(null, dep, src, snd);
1436 >        }
1437 >        final CompletableFuture<?> tryFire(int mode) {
1438 >            CompletableFuture<Object> d; CompletableFuture<?> a, b;
1439 >            if ((d = dep) == null || !d.orRelay(a = src, b = snd))
1440 >                return null;
1441 >            snd = src = null; dep = null;
1442 >            return d.postFire(a, b, mode);
1443 >        }
1444 >    }
1445  
1446 <    /**
1447 <     * Heuristic spin value for waitingGet() before blocking on
1448 <     * multiprocessors
1449 <     */
1450 <    static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1451 <                              1 << 8 : 0);
1446 >    final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
1447 >        Object r;
1448 >        if (a == null || b == null ||
1449 >            ((r = a.result) == null && (r = b.result) == null))
1450 >            return false;
1451 >        Object s = encodeRelay(r);
1452 >        if (result == null)
1453 >            internalComplete(s);
1454 >        return true;
1455 >    }
1456 >
1457 >    /** Recursively constructs a tree of completions */
1458 >    static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
1459 >                                            int lo, int hi) {
1460 >        CompletableFuture<Object> d = new CompletableFuture<Object>();
1461 >        if (lo <= hi) {
1462 >            CompletableFuture<?> a, b;
1463 >            int mid = (lo + hi) >>> 1;
1464 >            if ((a = (lo == mid ? cfs[lo] :
1465 >                      orTree(cfs, lo,    mid))) == null ||
1466 >                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1467 >                      orTree(cfs, mid+1, hi)))  == null)
1468 >                throw new NullPointerException();
1469 >            if (!d.orRelay(a, b)) {
1470 >                OrRelay c = new OrRelay(d, a, b);
1471 >                a.orpush(b, c);
1472 >                c.tryFire(SYNC);
1473 >            }
1474 >        }
1475 >        return d;
1476 >    }
1477 >
1478 >    /* ------------- Zero-input Async forms -------------- */
1479 >
1480 >    @SuppressWarnings("serial")
1481 >    static final class AsyncSupply<T> extends Completion {
1482 >        CompletableFuture<T> dep; Supplier<T> fn;
1483 >        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
1484 >            this.dep = dep; this.fn = fn;
1485 >        }
1486 >
1487 >        final CompletableFuture<?> tryFire(int alwaysAsync) {
1488 >            CompletableFuture<T> d; Supplier<T> f;
1489 >            if ((d = dep) != null && (f = fn) != null) {
1490 >                dep = null; fn = null;
1491 >                if (d.result == null) {
1492 >                    T t; Throwable x;
1493 >                    try {
1494 >                        t = f.get();
1495 >                        x = null;
1496 >                    } catch (Throwable ex) {
1497 >                        x = ex;
1498 >                        t = null;
1499 >                    }
1500 >                    d.internalComplete(encodeOutcome(t, x));
1501 >                }
1502 >                d.postComplete();
1503 >            }
1504 >            return d;
1505 >        }
1506 >        final boolean isLive() { return dep != null; }
1507 >    }
1508 >
1509 >    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1510 >                                                     Supplier<U> f) {
1511 >        if (f == null) throw new NullPointerException();
1512 >        CompletableFuture<U> d = new CompletableFuture<U>();
1513 >        e.execute(new AsyncSupply<U>(d, f));
1514 >        return d;
1515 >    }
1516 >
1517 >    @SuppressWarnings("serial")
1518 >    static final class AsyncRun extends Completion {
1519 >        CompletableFuture<Void> dep; Runnable fn;
1520 >        AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1521 >            this.dep = dep; this.fn = fn;
1522 >        }
1523 >
1524 >        final CompletableFuture<?> tryFire(int alwaysAsync) {
1525 >            CompletableFuture<Void> d; Runnable f;
1526 >            if ((d = dep) != null && (f = fn) != null) {
1527 >                dep = null; fn = null;
1528 >                if (d.result == null) {
1529 >                    Throwable x;
1530 >                    try {
1531 >                        f.run();
1532 >                        x = null;
1533 >                    } catch (Throwable ex) {
1534 >                        x = ex;
1535 >                    }
1536 >                    d.internalComplete(encodeOutcome(null, x));
1537 >                }
1538 >                d.postComplete();
1539 >            }
1540 >            return d;
1541 >        }
1542 >        final boolean isLive() { return dep != null; }
1543 >    }
1544 >
1545 >    static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1546 >        if (f == null) throw new NullPointerException();
1547 >        CompletableFuture<Void> d = new CompletableFuture<Void>();
1548 >        e.execute(new AsyncRun(d, f));
1549 >        return d;
1550 >    }
1551 >
1552 >    /* ------------- Signallers -------------- */
1553  
1554      /**
1555 <     * Completion for recording and releasing a waiting thread.  See
1556 <     * other classes such as Phaser and SynchronousQueue for more
1557 <     * detailed explanation. This class implements ManagedBlocker to
1395 <     * avoid starvation when blocking actions pile up in
1396 <     * ForkJoinPools.
1555 >     * Completion for recording and releasing a waiting thread.  This
1556 >     * class implements ManagedBlocker to avoid starvation when
1557 >     * blocking actions pile up in ForkJoinPools.
1558       */
1559 +    @SuppressWarnings("serial")
1560      static final class Signaller extends Completion
1561          implements ForkJoinPool.ManagedBlocker {
1562 <        long nanos;          // wait time if timed
1563 <        final long deadline; // non-zero if timed
1562 >        long nanos;                    // wait time if timed
1563 >        final long deadline;           // non-zero if timed
1564          volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1565          volatile Thread thread;
1566 +
1567          Signaller(boolean interruptible, long nanos, long deadline) {
1568              this.thread = Thread.currentThread();
1569              this.interruptControl = interruptible ? 1 : 0;
1570              this.nanos = nanos;
1571              this.deadline = deadline;
1572          }
1573 <        final CompletableFuture<?> tryAct() {
1574 <            Thread w = thread;
1575 <            if (w != null) {
1576 <                thread = null; // no need to CAS
1573 >        final CompletableFuture<?> tryFire(int ignore) {
1574 >            Thread w; // no need to atomically claim
1575 >            if ((w = thread) != null) {
1576 >                thread = null;
1577                  LockSupport.unpark(w);
1578              }
1579              return null;
# Line 1440 | Line 1603 | public class CompletableFuture<T> implem
1603                  LockSupport.parkNanos(this, nanos);
1604              return isReleasable();
1605          }
1606 +        final boolean isLive() { return thread != null; }
1607      }
1608  
1609      /**
# Line 1449 | Line 1613 | public class CompletableFuture<T> implem
1613      private Object waitingGet(boolean interruptible) {
1614          Signaller q = null;
1615          boolean queued = false;
1616 <        int spins = SPINS;
1616 >        int spins = -1;
1617          Object r;
1618          while ((r = result) == null) {
1619 <            if (spins > 0) {
1619 >            if (spins < 0)
1620 >                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1621 >                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
1622 >            else if (spins > 0) {
1623                  if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1624                      --spins;
1625              }
1626              else if (q == null)
1627                  q = new Signaller(interruptible, 0L, 0L);
1628              else if (!queued)
1629 <                queued = casCompletions(q.next = completions, q);
1629 >                queued = casStack(q.next = stack, q);
1630              else if (interruptible && q.interruptControl < 0) {
1631                  q.thread = null;
1632 <                removeCancelledSignallers();
1632 >                cleanStack();
1633                  return null;
1634              }
1635              else if (q.thread != null && result == null) {
# Line 1501 | Line 1668 | public class CompletableFuture<T> implem
1668          Object r;
1669          while ((r = result) == null) {
1670              if (!queued)
1671 <                queued = casCompletions(q.next = completions, q);
1671 >                queued = casStack(q.next = stack, q);
1672              else if (q.interruptControl < 0 || q.nanos <= 0L) {
1673                  q.thread = null;
1674 <                removeCancelledSignallers();
1674 >                cleanStack();
1675                  if (q.interruptControl < 0)
1676                      return null;
1677                  throw new TimeoutException();
# Line 1517 | Line 1684 | public class CompletableFuture<T> implem
1684                  }
1685              }
1686          }
1687 +        if (q.interruptControl < 0)
1688 +            r = null;
1689          q.thread = null;
1690          postComplete();
1691 <        return (q.interruptControl < 0) ? null : r;
1523 <    }
1524 <
1525 <    /**
1526 <     * Unlinks cancelled Signallers to avoid accumulating garbage.
1527 <     * Internal nodes are simply unspliced without CAS since it is
1528 <     * harmless if they are traversed anyway.  To avoid effects of
1529 <     * unsplicing from already removed nodes, the list is retraversed
1530 <     * in case of an apparent race.
1531 <     */
1532 <    private void removeCancelledSignallers() {
1533 <        for (Completion p = null, q = completions; q != null;) {
1534 <            Completion s = q.next;
1535 <            if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1536 <                if (p != null) {
1537 <                    p.next = s;
1538 <                    if (!(p instanceof Signaller) ||
1539 <                        ((Signaller)p).thread != null)
1540 <                        break;
1541 <                }
1542 <                else if (casCompletions(q, s))
1543 <                    break;
1544 <                p = null; // restart
1545 <                q = completions;
1546 <            }
1547 <            else {
1548 <                p = q;
1549 <                q = s;
1550 <            }
1551 <        }
1691 >        return r;
1692      }
1693  
1694      /* ------------- public methods -------------- */
# Line 1570 | Line 1710 | public class CompletableFuture<T> implem
1710       * @return the new CompletableFuture
1711       */
1712      public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1713 <        if (supplier == null) throw new NullPointerException();
1574 <        CompletableFuture<U> d = new CompletableFuture<U>();
1575 <        asyncPool.execute(new AsyncSupply<U>(d, supplier));
1576 <        return d;
1713 >        return asyncSupplyStage(asyncPool, supplier);
1714      }
1715  
1716      /**
# Line 1589 | Line 1726 | public class CompletableFuture<T> implem
1726       */
1727      public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1728                                                         Executor executor) {
1729 <        if (supplier == null) throw new NullPointerException();
1593 <        Executor e = screenExecutor(executor);
1594 <        CompletableFuture<U> d = new CompletableFuture<U>();
1595 <        e.execute(new AsyncSupply<U>(d, supplier));
1596 <        return d;
1729 >        return asyncSupplyStage(screenExecutor(executor), supplier);
1730      }
1731  
1732      /**
# Line 1606 | Line 1739 | public class CompletableFuture<T> implem
1739       * @return the new CompletableFuture
1740       */
1741      public static CompletableFuture<Void> runAsync(Runnable runnable) {
1742 <        if (runnable == null) throw new NullPointerException();
1610 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1611 <        asyncPool.execute(new AsyncRun<Void>(d, runnable));
1612 <        return d;
1742 >        return asyncRunStage(asyncPool, runnable);
1743      }
1744  
1745      /**
# Line 1624 | Line 1754 | public class CompletableFuture<T> implem
1754       */
1755      public static CompletableFuture<Void> runAsync(Runnable runnable,
1756                                                     Executor executor) {
1757 <        if (runnable == null) throw new NullPointerException();
1628 <        Executor e = screenExecutor(executor);
1629 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1630 <        e.execute(new AsyncRun<Void>(d, runnable));
1631 <        return d;
1757 >        return asyncRunStage(screenExecutor(executor), runnable);
1758      }
1759  
1760      /**
# Line 1755 | Line 1881 | public class CompletableFuture<T> implem
1881  
1882      public <U> CompletableFuture<U> thenApply(
1883          Function<? super T,? extends U> fn) {
1884 <        return doThenApply(fn, null);
1884 >        return uniApplyStage(null, fn);
1885      }
1886  
1887      public <U> CompletableFuture<U> thenApplyAsync(
1888          Function<? super T,? extends U> fn) {
1889 <        return doThenApply(fn, asyncPool);
1889 >        return uniApplyStage(asyncPool, fn);
1890      }
1891  
1892      public <U> CompletableFuture<U> thenApplyAsync(
1893          Function<? super T,? extends U> fn, Executor executor) {
1894 <        return doThenApply(fn, screenExecutor(executor));
1894 >        return uniApplyStage(screenExecutor(executor), fn);
1895      }
1896  
1897      public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1898 <        return doThenAccept(action, null);
1898 >        return uniAcceptStage(null, action);
1899      }
1900  
1901      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1902 <        return doThenAccept(action, asyncPool);
1902 >        return uniAcceptStage(asyncPool, action);
1903      }
1904  
1905 <    public CompletableFuture<Void> thenAcceptAsync(
1906 <        Consumer<? super T> action, Executor executor) {
1907 <        return doThenAccept(action, screenExecutor(executor));
1905 >    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
1906 >                                                   Executor executor) {
1907 >        return uniAcceptStage(screenExecutor(executor), action);
1908      }
1909  
1910      public CompletableFuture<Void> thenRun(Runnable action) {
1911 <        return doThenRun(action, null);
1911 >        return uniRunStage(null, action);
1912      }
1913  
1914      public CompletableFuture<Void> thenRunAsync(Runnable action) {
1915 <        return doThenRun(action, asyncPool);
1915 >        return uniRunStage(asyncPool, action);
1916      }
1917  
1918 <    public CompletableFuture<Void> thenRunAsync(
1919 <        Runnable action, Executor executor) {
1920 <        return doThenRun(action, screenExecutor(executor));
1918 >    public CompletableFuture<Void> thenRunAsync(Runnable action,
1919 >                                                Executor executor) {
1920 >        return uniRunStage(screenExecutor(executor), action);
1921      }
1922  
1923      public <U,V> CompletableFuture<V> thenCombine(
1924          CompletionStage<? extends U> other,
1925          BiFunction<? super T,? super U,? extends V> fn) {
1926 <        return doThenCombine(other.toCompletableFuture(), fn, null);
1926 >        return biApplyStage(null, other, fn);
1927      }
1928  
1929      public <U,V> CompletableFuture<V> thenCombineAsync(
1930          CompletionStage<? extends U> other,
1931          BiFunction<? super T,? super U,? extends V> fn) {
1932 <        return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1932 >        return biApplyStage(asyncPool, other, fn);
1933      }
1934  
1935      public <U,V> CompletableFuture<V> thenCombineAsync(
1936          CompletionStage<? extends U> other,
1937 <        BiFunction<? super T,? super U,? extends V> fn,
1938 <        Executor executor) {
1813 <        return doThenCombine(other.toCompletableFuture(), fn,
1814 <                             screenExecutor(executor));
1937 >        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
1938 >        return biApplyStage(screenExecutor(executor), other, fn);
1939      }
1940  
1941      public <U> CompletableFuture<Void> thenAcceptBoth(
1942          CompletionStage<? extends U> other,
1943          BiConsumer<? super T, ? super U> action) {
1944 <        return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1944 >        return biAcceptStage(null, other, action);
1945      }
1946  
1947      public <U> CompletableFuture<Void> thenAcceptBothAsync(
1948          CompletionStage<? extends U> other,
1949          BiConsumer<? super T, ? super U> action) {
1950 <        return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1950 >        return biAcceptStage(asyncPool, other, action);
1951      }
1952  
1953      public <U> CompletableFuture<Void> thenAcceptBothAsync(
1954          CompletionStage<? extends U> other,
1955 <        BiConsumer<? super T, ? super U> action,
1956 <        Executor executor) {
1833 <        return doThenAcceptBoth(other.toCompletableFuture(), action,
1834 <                                screenExecutor(executor));
1955 >        BiConsumer<? super T, ? super U> action, Executor executor) {
1956 >        return biAcceptStage(screenExecutor(executor), other, action);
1957      }
1958  
1959 <    public CompletableFuture<Void> runAfterBoth(
1960 <        CompletionStage<?> other, Runnable action) {
1961 <        return doRunAfterBoth(other.toCompletableFuture(), action, null);
1959 >    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
1960 >                                                Runnable action) {
1961 >        return biRunStage(null, other, action);
1962      }
1963  
1964 <    public CompletableFuture<Void> runAfterBothAsync(
1965 <        CompletionStage<?> other, Runnable action) {
1966 <        return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1964 >    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
1965 >                                                     Runnable action) {
1966 >        return biRunStage(asyncPool, other, action);
1967      }
1968  
1969 <    public CompletableFuture<Void> runAfterBothAsync(
1970 <        CompletionStage<?> other, Runnable action, Executor executor) {
1971 <        return doRunAfterBoth(other.toCompletableFuture(), action,
1972 <                              screenExecutor(executor));
1969 >    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
1970 >                                                     Runnable action,
1971 >                                                     Executor executor) {
1972 >        return biRunStage(screenExecutor(executor), other, action);
1973      }
1974  
1975      public <U> CompletableFuture<U> applyToEither(
1976          CompletionStage<? extends T> other, Function<? super T, U> fn) {
1977 <        return doApplyToEither(other.toCompletableFuture(), fn, null);
1977 >        return orApplyStage(null, other, fn);
1978      }
1979  
1980      public <U> CompletableFuture<U> applyToEitherAsync(
1981          CompletionStage<? extends T> other, Function<? super T, U> fn) {
1982 <        return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1982 >        return orApplyStage(asyncPool, other, fn);
1983      }
1984  
1985 <    public <U> CompletableFuture<U> applyToEitherAsync
1986 <        (CompletionStage<? extends T> other, Function<? super T, U> fn,
1987 <         Executor executor) {
1988 <        return doApplyToEither(other.toCompletableFuture(), fn,
1867 <                               screenExecutor(executor));
1985 >    public <U> CompletableFuture<U> applyToEitherAsync(
1986 >        CompletionStage<? extends T> other, Function<? super T, U> fn,
1987 >        Executor executor) {
1988 >        return orApplyStage(screenExecutor(executor), other, fn);
1989      }
1990  
1991      public CompletableFuture<Void> acceptEither(
1992          CompletionStage<? extends T> other, Consumer<? super T> action) {
1993 <        return doAcceptEither(other.toCompletableFuture(), action, null);
1993 >        return orAcceptStage(null, other, action);
1994      }
1995  
1996 <    public CompletableFuture<Void> acceptEitherAsync
1997 <        (CompletionStage<? extends T> other, Consumer<? super T> action) {
1998 <        return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1996 >    public CompletableFuture<Void> acceptEitherAsync(
1997 >        CompletionStage<? extends T> other, Consumer<? super T> action) {
1998 >        return orAcceptStage(asyncPool, other, action);
1999      }
2000  
2001      public CompletableFuture<Void> acceptEitherAsync(
2002          CompletionStage<? extends T> other, Consumer<? super T> action,
2003          Executor executor) {
2004 <        return doAcceptEither(other.toCompletableFuture(), action,
1884 <                              screenExecutor(executor));
2004 >        return orAcceptStage(screenExecutor(executor), other, action);
2005      }
2006  
2007 <    public CompletableFuture<Void> runAfterEither(
2008 <        CompletionStage<?> other, Runnable action) {
2009 <        return doRunAfterEither(other.toCompletableFuture(), action, null);
2007 >    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2008 >                                                  Runnable action) {
2009 >        return orRunStage(null, other, action);
2010      }
2011  
2012 <    public CompletableFuture<Void> runAfterEitherAsync(
2013 <        CompletionStage<?> other, Runnable action) {
2014 <        return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
2012 >    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2013 >                                                       Runnable action) {
2014 >        return orRunStage(asyncPool, other, action);
2015      }
2016  
2017 <    public CompletableFuture<Void> runAfterEitherAsync(
2018 <        CompletionStage<?> other, Runnable action, Executor executor) {
2019 <        return doRunAfterEither(other.toCompletableFuture(), action,
2020 <                                screenExecutor(executor));
2017 >    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2018 >                                                       Runnable action,
2019 >                                                       Executor executor) {
2020 >        return orRunStage(screenExecutor(executor), other, action);
2021      }
2022  
2023 <    public <U> CompletableFuture<U> thenCompose
2024 <        (Function<? super T, ? extends CompletionStage<U>> fn) {
2025 <        return doThenCompose(fn, null);
2023 >    public <U> CompletableFuture<U> thenCompose(
2024 >        Function<? super T, ? extends CompletionStage<U>> fn) {
2025 >        return uniComposeStage(null, fn);
2026      }
2027  
2028      public <U> CompletableFuture<U> thenComposeAsync(
2029          Function<? super T, ? extends CompletionStage<U>> fn) {
2030 <        return doThenCompose(fn, asyncPool);
2030 >        return uniComposeStage(asyncPool, fn);
2031      }
2032  
2033      public <U> CompletableFuture<U> thenComposeAsync(
2034          Function<? super T, ? extends CompletionStage<U>> fn,
2035          Executor executor) {
2036 <        return doThenCompose(fn, screenExecutor(executor));
2036 >        return uniComposeStage(screenExecutor(executor), fn);
2037      }
2038  
2039      public CompletableFuture<T> whenComplete(
2040          BiConsumer<? super T, ? super Throwable> action) {
2041 <        return doWhenComplete(action, null);
2041 >        return uniWhenCompleteStage(null, action);
2042      }
2043  
2044      public CompletableFuture<T> whenCompleteAsync(
2045          BiConsumer<? super T, ? super Throwable> action) {
2046 <        return doWhenComplete(action, asyncPool);
2046 >        return uniWhenCompleteStage(asyncPool, action);
2047      }
2048  
2049      public CompletableFuture<T> whenCompleteAsync(
2050          BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2051 <        return doWhenComplete(action, screenExecutor(executor));
2051 >        return uniWhenCompleteStage(screenExecutor(executor), action);
2052      }
2053  
2054      public <U> CompletableFuture<U> handle(
2055          BiFunction<? super T, Throwable, ? extends U> fn) {
2056 <        return doHandle(fn, null);
2056 >        return uniHandleStage(null, fn);
2057      }
2058  
2059      public <U> CompletableFuture<U> handleAsync(
2060          BiFunction<? super T, Throwable, ? extends U> fn) {
2061 <        return doHandle(fn, asyncPool);
2061 >        return uniHandleStage(asyncPool, fn);
2062      }
2063  
2064      public <U> CompletableFuture<U> handleAsync(
2065          BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2066 <        return doHandle(fn, screenExecutor(executor));
2066 >        return uniHandleStage(screenExecutor(executor), fn);
2067      }
2068  
2069      /**
# Line 1974 | Line 2094 | public class CompletableFuture<T> implem
2094       */
2095      public CompletableFuture<T> exceptionally(
2096          Function<Throwable, ? extends T> fn) {
2097 <        return doExceptionally(fn);
2097 >        return uniExceptionallyStage(fn);
2098      }
2099  
2100      /* ------------- Arbitrary-arity constructions -------------- */
# Line 2003 | Line 2123 | public class CompletableFuture<T> implem
2123       * {@code null}
2124       */
2125      public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2126 <        return doAllOf(cfs, 0, cfs.length - 1);
2126 >        return andTree(cfs, 0, cfs.length - 1);
2127      }
2128  
2129      /**
# Line 2022 | Line 2142 | public class CompletableFuture<T> implem
2142       * {@code null}
2143       */
2144      public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2145 <        CompletableFuture<Object> d = new CompletableFuture<Object>();
2026 <        for (int i = 0; i < cfs.length; ++i) {
2027 <            CompletableFuture<?> c = cfs[i];
2028 <            Object r = c.result; // throw NPE if null element
2029 <            if (d.result == null) {
2030 <                if (r == null)
2031 <                    c.unipush(new DelayedCopy<Object>(d, c));
2032 <                else
2033 <                    nowCopy(d, r);
2034 <            }
2035 <        }
2036 <        return d;
2145 >        return orTree(cfs, 0, cfs.length - 1);
2146      }
2147  
2148      /* ------------- Control and status methods -------------- */
# Line 2127 | Line 2236 | public class CompletableFuture<T> implem
2236       */
2237      public int getNumberOfDependents() {
2238          int count = 0;
2239 <        for (Completion p = completions; p != null; p = p.next)
2239 >        for (Completion p = stack; p != null; p = p.next)
2240              ++count;
2241          return count;
2242      }
# Line 2158 | Line 2267 | public class CompletableFuture<T> implem
2267      // Unsafe mechanics
2268      private static final sun.misc.Unsafe UNSAFE;
2269      private static final long RESULT;
2270 <    private static final long COMPLETIONS;
2270 >    private static final long STACK;
2271      static {
2272          try {
2273              UNSAFE = sun.misc.Unsafe.getUnsafe();
2274              Class<?> k = CompletableFuture.class;
2275              RESULT = UNSAFE.objectFieldOffset
2276                  (k.getDeclaredField("result"));
2277 <            COMPLETIONS = UNSAFE.objectFieldOffset
2278 <                (k.getDeclaredField("completions"));
2277 >            STACK = UNSAFE.objectFieldOffset
2278 >                (k.getDeclaredField("stack"));
2279          } catch (Exception x) {
2280              throw new Error(x);
2281          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines