ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.14 by dl, Sun Apr 18 12:54:57 2010 UTC vs.
Revision 1.15 by dl, Thu May 27 16:47:21 2010 UTC

# Line 155 | Line 155 | public abstract class ForkJoinTask<V> im
155       * initially zero, and takes on nonnegative values until
156       * completed, upon which status holds COMPLETED. CANCELLED, or
157       * EXCEPTIONAL, which use the top 3 bits.  Tasks undergoing
158 <     * blocking waits by other threads have SIGNAL_MASK bits set --
159 <     * bit 15 for external (nonFJ) waits, and the rest a count of
160 <     * waiting FJ threads.  (This representation relies on
161 <     * ForkJoinPool max thread limits). Signal counts are not directly
162 <     * incremented by ForkJoinTask methods, but instead via a call to
163 <     * requestSignal within ForkJoinPool.preJoin, once their need is
164 <     * established.
165 <     *
166 <     * Completion of a stolen task with SIGNAL_MASK bits set awakens
167 <     * any waiters via notifyAll. Even though suboptimal for some
168 <     * purposes, we use basic builtin wait/notify to take advantage of
169 <     * "monitor inflation" in JVMs that we would otherwise need to
170 <     * emulate to avoid adding further per-task bookkeeping overhead.
171 <     * We want these monitors to be "fat", i.e., not use biasing or
172 <     * thin-lock techniques, so use some odd coding idioms that tend
173 <     * to avoid them.
158 >     * blocking waits by other threads have the SIGNAL bit set.
159       *
160 <     * Note that bits 16-28 are currently unused. Also value
160 >     * Completion of a stolen task with SIGNAL set awakens any waiters
161 >     * via notifyAll. Even though suboptimal for some purposes, we use
162 >     * basic builtin wait/notify to take advantage of "monitor
163 >     * inflation" in JVMs that we would otherwise need to emulate to
164 >     * avoid adding further per-task bookkeeping overhead.  We want
165 >     * these monitors to be "fat", i.e., not use biasing or thin-lock
166 >     * techniques, so use some odd coding idioms that tend to avoid
167 >     * them.
168 >     *
169 >     * Note that bits 1-28 are currently unused. Also value
170       * 0x80000000 is available as spare completion value.
171       */
172      volatile int status; // accessed directly by pool and workers
# Line 181 | Line 175 | public abstract class ForkJoinTask<V> im
175      private static final int NORMAL               = 0xe0000000; // == mask
176      private static final int CANCELLED            = 0xc0000000;
177      private static final int EXCEPTIONAL          = 0xa0000000;
178 <    private static final int SIGNAL_MASK          = 0x0000ffff;
185 <    private static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
186 <    private static final int EXTERNAL_SIGNAL      = 0x00008000;
178 >    private static final int SIGNAL               = 0x00000001;
179  
180      /**
181       * Table of exceptions thrown by tasks, to enable reporting by
# Line 204 | Line 196 | public abstract class ForkJoinTask<V> im
196       * also clearing signal request bits.
197       *
198       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
199 +     * @return status on exit
200       */
201 <    private void setCompletion(int completion) {
201 >    private int setCompletion(int completion) {
202          int s;
203          while ((s = status) >= 0) {
204              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
205 <                if ((s & SIGNAL_MASK) != 0) {
213 <                    Thread t = Thread.currentThread();
214 <                    if (t instanceof ForkJoinWorkerThread)
215 <                        ((ForkJoinWorkerThread) t).pool.updateRunningCount
216 <                            (s & INTERNAL_SIGNAL_MASK);
205 >                if ((s & SIGNAL) != 0)
206                      synchronized (this) { notifyAll(); }
207 <                }
219 <                return;
207 >                return completion;
208              }
209          }
210 +        return s;
211      }
212  
213      /**
214       * Record exception and set exceptional completion
215 +     * @return status on exit
216       */
217 <    private void setDoneExceptionally(Throwable rex) {
217 >    private int setExceptionalCompletion(Throwable rex) {
218          exceptionMap.put(this, rex);
219 <        setCompletion(EXCEPTIONAL);
230 <    }
231 <
232 <    /**
233 <     * Main internal execution method: Unless done, calls exec and
234 <     * records completion.
235 <     *
236 <     * @return true if ran and completed normally
237 <     */
238 <    final boolean tryExec() {
239 <        try {
240 <            if (status < 0 || !exec())
241 <                return false;
242 <        } catch (Throwable rex) {
243 <            setDoneExceptionally(rex);
244 <            return false;
245 <        }
246 <        setCompletion(NORMAL); // must be outside try block
247 <        return true;
248 <    }
249 <
250 <    /**
251 <     * Increments internal signal count (thus requesting signal upon
252 <     * completion) unless already done.  Call only once per join.
253 <     * Used by ForkJoinPool.preJoin.
254 <     *
255 <     * @return status
256 <     */
257 <    final int requestSignal() {
258 <        int s;
259 <        do {} while ((s = status) >= 0 &&
260 <                     !UNSAFE.compareAndSwapInt(this, statusOffset, s, s + 1));
261 <        return s;
219 >        return setCompletion(EXCEPTIONAL);
220      }
221  
222      /**
223 <     * Sets external signal request unless already done.
266 <     *
267 <     * @return status
223 >     * Blocks a worker thread until completion. Called only by pool.
224       */
225 <    private int requestExternalSignal() {
225 >    final void internalAwaitDone() {
226          int s;
227 <        do {} while ((s = status) >= 0 &&
228 <                     !UNSAFE.compareAndSwapInt(this, statusOffset,
229 <                                               s, s | EXTERNAL_SIGNAL));
230 <        return s;
231 <    }
276 <
277 <    /*
278 <     * Awaiting completion. The four versions, internal vs external X
279 <     * untimed vs timed, have the same overall structure but differ
280 <     * from each other enough to defy simple integration.
281 <     */
282 <
283 <    /**
284 <     * Blocks a worker until this task is done, also maintaining pool
285 <     * and signal counts
286 <     */
287 <    private void awaitDone(ForkJoinWorkerThread w) {
288 <        if (status >= 0) {
289 <            w.pool.preJoin(this);
290 <            while (status >= 0) {
291 <                try { // minimize lock scope
292 <                    synchronized(this) {
293 <                        if (status >= 0)
227 >        while ((s = status) >= 0) {
228 >            synchronized(this) {
229 >                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
230 >                    do {
231 >                        try {
232                              wait();
233 <                        else { // help release; also helps avoid lock-biasing
234 <                            notifyAll();
297 <                            break;
233 >                        } catch (InterruptedException ie) {
234 >                            cancelIfTerminating();
235                          }
236 <                    }
237 <                } catch (InterruptedException ie) {
301 <                    cancelIfTerminating();
236 >                    } while (status >= 0);
237 >                    break;
238                  }
239              }
240          }
241      }
242  
243      /**
244 <     * Blocks a non-ForkJoin thread until this task is done.
244 >     * Blocks a non-worker-thread until completion.
245 >     * @return status on exit
246       */
247 <    private void externalAwaitDone() {
248 <        if (requestExternalSignal() >= 0) {
249 <            boolean interrupted = false;
250 <            while (status >= 0) {
251 <                try {
252 <                    synchronized(this) {
253 <                        if (status >= 0)
247 >    private int externalAwaitDone() {
248 >        int s;
249 >        while ((s = status) >= 0) {
250 >            synchronized(this) {
251 >                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
252 >                    boolean interrupted = false;
253 >                    do {
254 >                        try {
255                              wait();
256 <                        else {
257 <                            notifyAll();
320 <                            break;
256 >                        } catch (InterruptedException ie) {
257 >                            interrupted = true;
258                          }
259 <                    }
260 <                } catch (InterruptedException ie) {
261 <                    interrupted = true;
259 >                    } while ((s = status) >= 0);
260 >                    if (interrupted)
261 >                        Thread.currentThread().interrupt();
262 >                    break;
263                  }
264              }
327            if (interrupted)
328                Thread.currentThread().interrupt();
265          }
266 +        return s;
267      }
268  
269      /**
270 <     * Blocks a worker until this task is done or timeout elapses
270 >     * Unless done, calls exec and records status if completed, but
271 >     * doesn't wait for completion otherwise.
272       */
273 <    private void timedAwaitDone(ForkJoinWorkerThread w, long nanos) {
274 <        if (status >= 0) {
275 <            long startTime = System.nanoTime();
276 <            ForkJoinPool pool = w.pool;
277 <            pool.preJoin(this);
278 <            while (status >= 0) {
279 <                long nt = nanos - (System.nanoTime() - startTime);
342 <                if (nt > 0) {
343 <                    long ms = nt / 1000000;
344 <                    int ns = (int) (nt % 1000000);
345 <                    try {
346 <                        synchronized(this) { if (status >= 0) wait(ms, ns); }
347 <                    } catch (InterruptedException ie) {
348 <                        cancelIfTerminating();
349 <                    }
350 <                }
351 <                else {
352 <                    int s; // adjust running count on timeout
353 <                    while ((s = status) >= 0 &&
354 <                           (s & INTERNAL_SIGNAL_MASK) != 0) {
355 <                        if (UNSAFE.compareAndSwapInt(this, statusOffset,
356 <                                                     s, s - 1)) {
357 <                            pool.updateRunningCount(1);
358 <                            break;
359 <                        }
360 <                    }
361 <                    break;
362 <                }
363 <            }
273 >    final void tryExec() {
274 >        try {
275 >            if (status < 0 || !exec())
276 >                return;
277 >        } catch (Throwable rex) {
278 >            setExceptionalCompletion(rex);
279 >            return;
280          }
281 +        setCompletion(NORMAL); // must be outside try block
282      }
283  
284      /**
285 <     * Blocks a non-ForkJoin thread until this task is done or timeout elapses
286 <     */
287 <    private void externalTimedAwaitDone(long nanos) {
288 <        if (requestExternalSignal() >= 0) {
289 <            long startTime = System.nanoTime();
290 <            boolean interrupted = false;
291 <            while (status >= 0) {
292 <                long nt = nanos - (System.nanoTime() - startTime);
293 <                if (nt <= 0)
294 <                    break;
295 <                long ms = nt / 1000000;
296 <                int ns = (int) (nt % 1000000);
285 >     * If not done and this task is next in worker queue, runs it,
286 >     * else waits for it.
287 >     * @return status on exit
288 >     */
289 >    private int waitingJoin() {
290 >        int s = status;
291 >        if (s < 0)
292 >            return s;
293 >        Thread t = Thread.currentThread();
294 >        if (t instanceof ForkJoinWorkerThread) {
295 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
296 >            if (w.unpushTask(this)) {
297 >                boolean completed;
298                  try {
299 <                    synchronized(this) { if (status >= 0) wait(ms, ns); }
300 <                } catch (InterruptedException ie) {
301 <                    interrupted = true;
299 >                    completed = exec();
300 >                } catch (Throwable rex) {
301 >                    return setExceptionalCompletion(rex);
302                  }
303 +                if (completed)
304 +                    return setCompletion(NORMAL);
305              }
306 <            if (interrupted)
387 <                Thread.currentThread().interrupt();
306 >            return w.pool.awaitJoin(this);
307          }
308 +        else
309 +            return externalAwaitDone();
310      }
311  
391    // reporting results
392
312      /**
313 <     * Returns result or throws the exception associated with status.
314 <     * Uses Unsafe as a workaround for javac not allowing rethrow of
315 <     * unchecked exceptions.
313 >     * Unless done, calls exec and records status if completed, or
314 >     * waits for completion otherwise.
315 >     * @return status on exit
316       */
317 <    private V reportResult() {
318 <        if ((status & COMPLETION_MASK) < NORMAL) {
319 <            Throwable ex = getException();
320 <            if (ex != null)
321 <                UNSAFE.throwException(ex);
317 >    private int waitingInvoke() {
318 >        int s = status;
319 >        if (s < 0)
320 >            return s;
321 >        boolean completed;
322 >        try {
323 >            completed = exec();
324 >        } catch (Throwable rex) {
325 >            return setExceptionalCompletion(rex);
326          }
327 <        return getRawResult();
327 >        if (completed)
328 >            return setCompletion(NORMAL);
329 >        return waitingJoin();
330      }
331  
332      /**
333 <     * Returns result or throws exception using j.u.c.Future conventions.
334 <     * Only call when {@code isDone} known to be true or thread known
335 <     * to be interrupted.
333 >     * If this task is next in worker queue, runs it, else processes other
334 >     * tasks until complete.
335 >     * @return status on exit
336       */
337 <    private V reportFutureResult()
338 <        throws InterruptedException, ExecutionException {
339 <        if (Thread.interrupted())
340 <            throw new InterruptedException();
341 <        int s = status & COMPLETION_MASK;
342 <        if (s < NORMAL) {
343 <            Throwable ex;
344 <            if (s == CANCELLED)
345 <                throw new CancellationException();
346 <            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
347 <                throw new ExecutionException(ex);
337 >    private int busyJoin() {
338 >        int s = status;
339 >        if (s < 0)
340 >            return s;
341 >        ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
342 >        if (w.unpushTask(this)) {
343 >            boolean completed;
344 >            try {
345 >                completed = exec();
346 >            } catch (Throwable rex) {
347 >                return setExceptionalCompletion(rex);
348 >            }
349 >            if (completed)
350 >                return setCompletion(NORMAL);
351          }
352 <        return getRawResult();
352 >        return w.execWhileJoining(this);
353      }
354  
355      /**
356 <     * Returns result or throws exception using j.u.c.Future conventions
357 <     * with timeouts.
356 >     * Returns result or throws exception associated with given status.
357 >     * @param s the status
358       */
359 <    private V reportTimedFutureResult()
432 <        throws InterruptedException, ExecutionException, TimeoutException {
433 <        if (Thread.interrupted())
434 <            throw new InterruptedException();
359 >    private V reportResult(int s) {
360          Throwable ex;
361 <        int s = status & COMPLETION_MASK;
362 <        if (s == NORMAL)
363 <            return getRawResult();
439 <        else if (s == CANCELLED)
440 <            throw new CancellationException();
441 <        else if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
442 <            throw new ExecutionException(ex);
443 <        else
444 <            throw new TimeoutException();
361 >        if (s < NORMAL && (ex = getException()) != null)
362 >            UNSAFE.throwException(ex);
363 >        return getRawResult();
364      }
365  
366      // public methods
# Line 479 | Line 398 | public abstract class ForkJoinTask<V> im
398       * @return the computed result
399       */
400      public final V join() {
401 <        quietlyJoin();
483 <        return reportResult();
401 >        return reportResult(waitingJoin());
402      }
403  
404      /**
# Line 491 | Line 409 | public abstract class ForkJoinTask<V> im
409       * @return the computed result
410       */
411      public final V invoke() {
412 <        if (!tryExec())
495 <            quietlyJoin();
496 <        return reportResult();
412 >        return reportResult(waitingInvoke());
413      }
414  
415      /**
# Line 551 | Line 467 | public abstract class ForkJoinTask<V> im
467              }
468              else if (i != 0)
469                  t.fork();
470 <            else {
471 <                t.quietlyInvoke();
556 <                if (ex == null)
557 <                    ex = t.getException();
558 <            }
470 >            else if (t.waitingInvoke() < NORMAL && ex == null)
471 >                ex = t.getException();
472          }
473          for (int i = 1; i <= last; ++i) {
474              ForkJoinTask<?> t = tasks[i];
475              if (t != null) {
476                  if (ex != null)
477                      t.cancel(false);
478 <                else {
479 <                    t.quietlyJoin();
567 <                    if (ex == null)
568 <                        ex = t.getException();
569 <                }
478 >                else if (t.waitingJoin() < NORMAL && ex == null)
479 >                    ex = t.getException();
480              }
481          }
482          if (ex != null)
# Line 613 | Line 523 | public abstract class ForkJoinTask<V> im
523              }
524              else if (i != 0)
525                  t.fork();
526 <            else {
527 <                t.quietlyInvoke();
618 <                if (ex == null)
619 <                    ex = t.getException();
620 <            }
526 >            else if (t.waitingInvoke() < NORMAL && ex == null)
527 >                ex = t.getException();
528          }
529          for (int i = 1; i <= last; ++i) {
530              ForkJoinTask<?> t = ts.get(i);
531              if (t != null) {
532                  if (ex != null)
533                      t.cancel(false);
534 <                else {
535 <                    t.quietlyJoin();
629 <                    if (ex == null)
630 <                        ex = t.getException();
631 <                }
534 >                else if (t.waitingJoin() < NORMAL && ex == null)
535 >                    ex = t.getException();
536              }
537          }
538          if (ex != null)
# Line 747 | Line 651 | public abstract class ForkJoinTask<V> im
651       * thrown will be a {@code RuntimeException} with cause {@code ex}.
652       */
653      public void completeExceptionally(Throwable ex) {
654 <        setDoneExceptionally((ex instanceof RuntimeException) ||
655 <                             (ex instanceof Error) ? ex :
656 <                             new RuntimeException(ex));
654 >        setExceptionalCompletion((ex instanceof RuntimeException) ||
655 >                                 (ex instanceof Error) ? ex :
656 >                                 new RuntimeException(ex));
657      }
658  
659      /**
# Line 768 | Line 672 | public abstract class ForkJoinTask<V> im
672          try {
673              setRawResult(value);
674          } catch (Throwable rex) {
675 <            setDoneExceptionally(rex);
675 >            setExceptionalCompletion(rex);
676              return;
677          }
678          setCompletion(NORMAL);
679      }
680  
681      public final V get() throws InterruptedException, ExecutionException {
682 <        quietlyJoin();
683 <        return reportFutureResult();
682 >        int s = waitingJoin() & COMPLETION_MASK;
683 >        if (Thread.interrupted())
684 >            throw new InterruptedException();
685 >        if (s < NORMAL) {
686 >            Throwable ex;
687 >            if (s == CANCELLED)
688 >                throw new CancellationException();
689 >            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
690 >                throw new ExecutionException(ex);
691 >        }
692 >        return getRawResult();
693      }
694  
695      public final V get(long timeout, TimeUnit unit)
696          throws InterruptedException, ExecutionException, TimeoutException {
784        long nanos = unit.toNanos(timeout);
697          Thread t = Thread.currentThread();
698 +        ForkJoinPool pool;
699          if (t instanceof ForkJoinWorkerThread) {
700              ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
701 <            if (!w.unpushTask(this) || !tryExec())
702 <                timedAwaitDone(w, nanos);
701 >            if (status >= 0 && w.unpushTask(this))
702 >                tryExec();
703 >            pool = w.pool;
704          }
705          else
706 <            externalTimedAwaitDone(nanos);
707 <        return reportTimedFutureResult();
706 >            pool = null;
707 >        /*
708 >         * Timed wait loop intermixes cases for fj (pool != null) and
709 >         * non FJ threads. For FJ, decrement pool count but don't try
710 >         * for replacement; increment count on completion. For non-FJ,
711 >         * deal with interrupts. This is messy, but a little less so
712 >         * than is splitting the FJ and nonFJ cases.
713 >         */
714 >        boolean interrupted = false;
715 >        boolean dec = false; // true if pool count decremented
716 >        for (;;) {
717 >            if (Thread.interrupted() && pool == null) {
718 >                interrupted = true;
719 >                break;
720 >            }
721 >            int s = status;
722 >            if (s < 0)
723 >                break;
724 >            if (UNSAFE.compareAndSwapInt(this, statusOffset,
725 >                                         s, s | SIGNAL)) {
726 >                long startTime = System.nanoTime();
727 >                long nanos = unit.toNanos(timeout);
728 >                long nt; // wait time
729 >                while (status >= 0 &&
730 >                       (nt = nanos - (System.nanoTime() - startTime)) > 0) {
731 >                    if (pool != null && !dec)
732 >                        dec = pool.tryDecrementRunningCount();
733 >                    else {
734 >                        long ms = nt / 1000000;
735 >                        int ns = (int) (nt % 1000000);
736 >                        try {
737 >                            synchronized(this) {
738 >                                if (status >= 0)
739 >                                    wait(ms, ns);
740 >                            }
741 >                        } catch (InterruptedException ie) {
742 >                            if (pool != null)
743 >                                cancelIfTerminating();
744 >                            else {
745 >                                interrupted = true;
746 >                                break;
747 >                            }
748 >                        }
749 >                    }
750 >                }
751 >                break;
752 >            }
753 >        }
754 >        if (pool != null && dec)
755 >            pool.updateRunningCount(1);
756 >        if (interrupted)
757 >            throw new InterruptedException();
758 >        int es = status & COMPLETION_MASK;
759 >        if (es != NORMAL) {
760 >            Throwable ex;
761 >            if (es == CANCELLED)
762 >                throw new CancellationException();
763 >            if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
764 >                throw new ExecutionException(ex);
765 >            throw new TimeoutException();
766 >        }
767 >        return getRawResult();
768      }
769  
770      /**
# Line 811 | Line 785 | public abstract class ForkJoinTask<V> im
785       * @return the computed result
786       */
787      public final V helpJoin() {
788 <        quietlyHelpJoin();
815 <        return reportResult();
788 >        return reportResult(busyJoin());
789      }
790  
791      /**
# Line 828 | Line 801 | public abstract class ForkJoinTask<V> im
801       * ClassCastException}.
802       */
803      public final void quietlyHelpJoin() {
804 <        ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
832 <        if (!w.unpushTask(this) || !tryExec()) {
833 <            for (;;) {
834 <                ForkJoinTask<?> t;
835 <                if (status < 0)
836 <                    return;
837 <                else if ((t = w.scanWhileJoining(this)) != null)
838 <                    t.tryExec();
839 <                else if (status < 0)
840 <                    return;
841 <                else if (w.pool.preBlockHelpingJoin(this)) {
842 <                    while (status >= 0) { // variant of awaitDone
843 <                        try {
844 <                            synchronized(this) {
845 <                                if (status >= 0)
846 <                                    wait();
847 <                                else {
848 <                                    notifyAll();
849 <                                    break;
850 <                                }
851 <                            }
852 <                        } catch (InterruptedException ie) {
853 <                            cancelIfTerminating();
854 <                        }
855 <                    }
856 <                    return;
857 <                }
858 <            }
859 <        }
804 >        busyJoin();
805      }
806  
807      /**
# Line 866 | Line 811 | public abstract class ForkJoinTask<V> im
811       * known to have aborted.
812       */
813      public final void quietlyJoin() {
814 <        Thread t = Thread.currentThread();
870 <        if (t instanceof ForkJoinWorkerThread) {
871 <            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
872 <            if (!w.unpushTask(this) || !tryExec())
873 <                awaitDone(w);
874 <        }
875 <        else
876 <            externalAwaitDone();
814 >        waitingJoin();
815      }
816  
817      /**
# Line 884 | Line 822 | public abstract class ForkJoinTask<V> im
822       * known to have aborted.
823       */
824      public final void quietlyInvoke() {
825 <        if (!tryExec())
888 <            quietlyJoin();
825 >        waitingInvoke();
826      }
827  
828      /**
# Line 1227 | Line 1164 | public abstract class ForkJoinTask<V> im
1164      private void readObject(java.io.ObjectInputStream s)
1165          throws java.io.IOException, ClassNotFoundException {
1166          s.defaultReadObject();
1167 <        status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1231 <        status |= EXTERNAL_SIGNAL; // conservatively set external signal
1167 >        status |= SIGNAL; // conservatively set external signal
1168          Object ex = s.readObject();
1169          if (ex != null)
1170 <            setDoneExceptionally((Throwable) ex);
1170 >            setExceptionalCompletion((Throwable) ex);
1171      }
1172  
1173      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines