ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.12 by jsr166, Tue Oct 27 23:21:32 2009 UTC vs.
Revision 1.13 by dl, Mon Apr 5 16:05:09 2010 UTC

# Line 137 | Line 137 | import java.util.WeakHashMap;
137   */
138   public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
139  
140 +    /*
141 +     * See the internal documentation of class ForkJoinPool for a
142 +     * general implementation overview.  ForkJoinTasks are mainly
143 +     * responsible for maintaining their "status" field amidst relays
144 +     * to methods in ForkJoinWorkerThread and ForkJoinPool. The
145 +     * methods of this class are more-or-less layered into (1) basic
146 +     * status maintenance (2) execution and awaiting completion (3)
147 +     * user-level methods that additionally report results. This is
148 +     * sometimes hard to see because this file orders exported methods
149 +     * in a way that flows well in javadocs.
150 +     */
151 +
152      /**
153       * Run control status bits packed into a single int to minimize
154       * footprint and to ensure atomicity (via CAS).  Status is
# Line 146 | Line 158 | public abstract class ForkJoinTask<V> im
158       * blocking waits by other threads have SIGNAL_MASK bits set --
159       * bit 15 for external (nonFJ) waits, and the rest a count of
160       * waiting FJ threads.  (This representation relies on
161 <     * ForkJoinPool max thread limits). Completion of a stolen task
162 <     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
163 <     * though suboptimal for some purposes, we use basic builtin
164 <     * wait/notify to take advantage of "monitor inflation" in JVMs
165 <     * that we would otherwise need to emulate to avoid adding further
166 <     * per-task bookkeeping overhead. Note that bits 16-28 are
167 <     * currently unused. Also value 0x80000000 is available as spare
168 <     * completion value.
161 >     * ForkJoinPool max thread limits). Signal counts are not directly
162 >     * incremented by ForkJoinTask methods, but instead via a call to
163 >     * requestSignal within ForkJoinPool.preJoin, once their need is
164 >     * established.
165 >     *
166 >     * Completion of a stolen task with SIGNAL_MASK bits set awakens
167 >     * any waiters via notifyAll. Even though suboptimal for some
168 >     * purposes, we use basic builtin wait/notify to take advantage of
169 >     * "monitor inflation" in JVMs that we would otherwise need to
170 >     * emulate to avoid adding further per-task bookkeeping overhead.
171 >     * We want these monitors to be "fat", i.e., not use biasing or
172 >     * thin-lock techniques, so use some odd coding idioms that tend
173 >     * to avoid them.
174 >     *
175 >     * Note that bits 16-28 are currently unused. Also value
176 >     * 0x80000000 is available as spare completion value.
177       */
178      volatile int status; // accessed directly by pool and workers
179  
180 <    static final int COMPLETION_MASK      = 0xe0000000;
181 <    static final int NORMAL               = 0xe0000000; // == mask
182 <    static final int CANCELLED            = 0xc0000000;
183 <    static final int EXCEPTIONAL          = 0xa0000000;
184 <    static final int SIGNAL_MASK          = 0x0000ffff;
185 <    static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
186 <    static final int EXTERNAL_SIGNAL      = 0x00008000; // top bit of low word
180 >    private static final int COMPLETION_MASK      = 0xe0000000;
181 >    private static final int NORMAL               = 0xe0000000; // == mask
182 >    private static final int CANCELLED            = 0xc0000000;
183 >    private static final int EXCEPTIONAL          = 0xa0000000;
184 >    private static final int SIGNAL_MASK          = 0x0000ffff;
185 >    private static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
186 >    private static final int EXTERNAL_SIGNAL      = 0x00008000;
187  
188      /**
189       * Table of exceptions thrown by tasks, to enable reporting by
# Line 177 | Line 197 | public abstract class ForkJoinTask<V> im
197          Collections.synchronizedMap
198          (new WeakHashMap<ForkJoinTask<?>, Throwable>());
199  
200 <    // within-package utilities
200 >    // Maintaining completion status
201  
202      /**
203 <     * Gets current worker thread, or null if not a worker thread.
203 >     * Marks completion and wakes up threads waiting to join this task,
204 >     * also clearing signal request bits.
205 >     *
206 >     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
207       */
208 <    static ForkJoinWorkerThread getWorker() {
209 <        Thread t = Thread.currentThread();
210 <        return ((t instanceof ForkJoinWorkerThread) ?
211 <                (ForkJoinWorkerThread) t : null);
212 <    }
213 <
214 <    final boolean casStatus(int cmp, int val) {
215 <        return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
208 >    private void setCompletion(int completion) {
209 >        int s;
210 >        while ((s = status) >= 0) {
211 >            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
212 >                if ((s & SIGNAL_MASK) != 0) {
213 >                    Thread t = Thread.currentThread();
214 >                    if (t instanceof ForkJoinWorkerThread)
215 >                        ((ForkJoinWorkerThread) t).pool.updateRunningCount
216 >                            (s & INTERNAL_SIGNAL_MASK);
217 >                    synchronized (this) { notifyAll(); }
218 >                }
219 >                return;
220 >            }
221 >        }
222      }
223  
224      /**
225 <     * Workaround for not being able to rethrow unchecked exceptions.
225 >     * Record exception and set exceptional completion
226       */
227 <    static void rethrowException(Throwable ex) {
228 <        if (ex != null)
229 <            UNSAFE.throwException(ex);
227 >    private void setDoneExceptionally(Throwable rex) {
228 >        exceptionMap.put(this, rex);
229 >        setCompletion(EXCEPTIONAL);
230      }
231  
203    // Setting completion status
204
232      /**
233 <     * Marks completion and wakes up threads waiting to join this task.
233 >     * Main internal execution method: Unless done, calls exec and
234 >     * records completion.
235       *
236 <     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
236 >     * @return true if ran and completed normally
237       */
238 <    final void setCompletion(int completion) {
239 <        ForkJoinPool pool = getPool();
240 <        if (pool != null) {
241 <            int s; // Clear signal bits while setting completion status
242 <            do {} while ((s = status) >= 0 && !casStatus(s, completion));
243 <
244 <            if ((s & SIGNAL_MASK) != 0) {
217 <                if ((s &= INTERNAL_SIGNAL_MASK) != 0)
218 <                    pool.updateRunningCount(s);
219 <                synchronized (this) { notifyAll(); }
220 <            }
238 >    final boolean tryExec() {
239 >        try {
240 >            if (status < 0 || !exec())
241 >                return false;
242 >        } catch (Throwable rex) {
243 >            setDoneExceptionally(rex);
244 >            return false;
245          }
246 <        else
247 <            externallySetCompletion(completion);
246 >        setCompletion(NORMAL); // must be outside try block
247 >        return true;
248      }
249  
250      /**
251 <     * Version of setCompletion for non-FJ threads.  Leaves signal
252 <     * bits for unblocked threads to adjust, and always notifies.
251 >     * Increments internal signal count (thus requesting signal upon
252 >     * completion) unless already done.  Call only once per join.
253 >     * Used by ForkJoinPool.preJoin.
254 >     *
255 >     * @return status
256       */
257 <    private void externallySetCompletion(int completion) {
257 >    final int requestSignal() {
258          int s;
259 <        do {} while ((s = status) >= 0 &&
260 <                     !casStatus(s, (s & SIGNAL_MASK) | completion));
261 <        synchronized (this) { notifyAll(); }
259 >        do {} while ((s = status) >= 0 &&
260 >                     !UNSAFE.compareAndSwapInt(this, statusOffset, s, s + 1));
261 >        return s;
262      }
263 <
263 >    
264      /**
265 <     * Sets status to indicate normal completion.
265 >     * Sets external signal request unless already done.
266 >     *
267 >     * @return status
268       */
269 <    final void setNormalCompletion() {
270 <        // Try typical fast case -- single CAS, no signal, not already done.
271 <        // Manually expand casStatus to improve chances of inlining it
272 <        if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
273 <            setCompletion(NORMAL);
269 >    private int requestExternalSignal() {
270 >        int s;
271 >        do {} while ((s = status) >= 0 &&
272 >                     !UNSAFE.compareAndSwapInt(this, statusOffset,
273 >                                               s, s | EXTERNAL_SIGNAL));
274 >        return s;
275      }
276  
277 <    // internal waiting and notification
278 <
279 <    /**
280 <     * Performs the actual monitor wait for awaitDone.
277 >    /*
278 >     * Awaiting completion. The four versions, internal vs external X
279 >     * untimed vs timed, have the same overall structure but differ
280 >     * from each other enough to defy simple integration.
281       */
252    private void doAwaitDone() {
253        // Minimize lock bias and in/de-flation effects by maximizing
254        // chances of waiting inside sync
255        try {
256            while (status >= 0)
257                synchronized (this) { if (status >= 0) wait(); }
258        } catch (InterruptedException ie) {
259            onInterruptedWait();
260        }
261    }
282  
283      /**
284 <     * Performs the actual timed monitor wait for awaitDone.
284 >     * Blocks a worker until this task is done, also maintaining pool
285 >     * and signal counts
286       */
287 <    private void doAwaitDone(long startTime, long nanos) {
288 <        synchronized (this) {
289 <            try {
290 <                while (status >= 0) {
291 <                    long nt = nanos - (System.nanoTime() - startTime);
292 <                    if (nt <= 0)
293 <                        break;
294 <                    wait(nt / 1000000, (int) (nt % 1000000));
287 >    private void awaitDone(ForkJoinWorkerThread w) {
288 >        if (status >= 0) {
289 >            w.pool.preJoin(this);
290 >            while (status >= 0) {
291 >                try { // minimize lock scope
292 >                    synchronized(this) {
293 >                        if (status >= 0)
294 >                            wait();
295 >                        else { // help release; also helps avoid lock-biasing
296 >                            notifyAll();
297 >                            break;
298 >                        }
299 >                    }
300 >                } catch (InterruptedException ie) {
301 >                    cancelIfTerminating();
302                  }
275            } catch (InterruptedException ie) {
276                onInterruptedWait();
303              }
304          }
305      }
306  
281    // Awaiting completion
282
307      /**
308 <     * Sets status to indicate there is joiner, then waits for join,
285 <     * surrounded with pool notifications.
286 <     *
287 <     * @return status upon exit
308 >     * Blocks a non-ForkJoin thread until this task is done.
309       */
310 <    private int awaitDone(ForkJoinWorkerThread w,
311 <                          boolean maintainParallelism) {
312 <        ForkJoinPool pool = (w == null) ? null : w.pool;
313 <        int s;
314 <        while ((s = status) >= 0) {
315 <            if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
316 <                if (pool == null || !pool.preJoin(this, maintainParallelism))
317 <                    doAwaitDone();
318 <                if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
319 <                    adjustPoolCountsOnUnblock(pool);
320 <                break;
310 >    private void externalAwaitDone() {
311 >        if (requestExternalSignal() >= 0) {
312 >            boolean interrupted = false;
313 >            while (status >= 0) {
314 >                try {
315 >                    synchronized(this) {
316 >                        if (status >= 0)
317 >                            wait();
318 >                        else {
319 >                            notifyAll();
320 >                            break;
321 >                        }
322 >                    }
323 >                } catch (InterruptedException ie) {
324 >                    interrupted = true;
325 >                }
326              }
327 +            if (interrupted)
328 +                Thread.currentThread().interrupt();
329          }
302        return s;
330      }
331  
332      /**
333 <     * Timed version of awaitDone
307 <     *
308 <     * @return status upon exit
333 >     * Blocks a worker until this task is done or timeout elapses
334       */
335 <    private int awaitDone(ForkJoinWorkerThread w, long nanos) {
336 <        ForkJoinPool pool = (w == null) ? null : w.pool;
337 <        int s;
338 <        while ((s = status) >= 0) {
339 <            if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
340 <                long startTime = System.nanoTime();
341 <                if (pool == null || !pool.preJoin(this, false))
342 <                    doAwaitDone(startTime, nanos);
343 <                if ((s = status) >= 0) {
344 <                    adjustPoolCountsOnCancelledWait(pool);
345 <                    s = status;
335 >    private void timedAwaitDone(ForkJoinWorkerThread w, long nanos) {
336 >        if (status >= 0) {
337 >            long startTime = System.nanoTime();
338 >            ForkJoinPool pool = w.pool;
339 >            pool.preJoin(this);
340 >            while (status >= 0) {
341 >                long nt = nanos - (System.nanoTime() - startTime);
342 >                if (nt > 0) {
343 >                    long ms = nt / 1000000;
344 >                    int ns = (int) (nt % 1000000);
345 >                    try {
346 >                        synchronized(this) { if (status >= 0) wait(ms, ns); }
347 >                    } catch (InterruptedException ie) {
348 >                        cancelIfTerminating();
349 >                    }
350 >                }
351 >                else {
352 >                    int s; // adjust running count on timeout
353 >                    while ((s = status) >= 0 &&
354 >                           (s & INTERNAL_SIGNAL_MASK) != 0) {
355 >                        if (UNSAFE.compareAndSwapInt(this, statusOffset,
356 >                                                     s, s - 1)) {
357 >                            pool.updateRunningCount(1);
358 >                            break;
359 >                        }
360 >                    }
361 >                    break;
362                  }
322                if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
323                    adjustPoolCountsOnUnblock(pool);
324                break;
363              }
364          }
327        return s;
365      }
366  
367      /**
368 <     * Notifies pool that thread is unblocked. Called by signalled
332 <     * threads when woken by non-FJ threads (which is atypical).
368 >     * Blocks a non-ForkJoin thread until this task is done or timeout elapses
369       */
370 <    private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
371 <        int s;
372 <        do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
373 <        if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
374 <            pool.updateRunningCount(s);
375 <    }
376 <
341 <    /**
342 <     * Notifies pool to adjust counts on cancelled or timed out wait.
343 <     */
344 <    private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
345 <        if (pool != null) {
346 <            int s;
347 <            while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
348 <                if (casStatus(s, s - 1)) {
349 <                    pool.updateRunningCount(1);
370 >    private void externalTimedAwaitDone(long nanos) {
371 >        if (requestExternalSignal() >= 0) {
372 >            long startTime = System.nanoTime();
373 >            boolean interrupted = false;
374 >            while (status >= 0) {
375 >                long nt = nanos - (System.nanoTime() - startTime);
376 >                if (nt <= 0)
377                      break;
378 +                long ms = nt / 1000000;
379 +                int ns = (int) (nt % 1000000);
380 +                try {
381 +                    synchronized(this) { if (status >= 0) wait(ms, ns); }
382 +                } catch (InterruptedException ie) {
383 +                    interrupted = true;
384                  }
385              }
386 +            if (interrupted)
387 +                Thread.currentThread().interrupt();
388          }
389      }
390  
391 <    /**
357 <     * Handles interruptions during waits.
358 <     */
359 <    private void onInterruptedWait() {
360 <        ForkJoinWorkerThread w = getWorker();
361 <        if (w == null)
362 <            Thread.currentThread().interrupt(); // re-interrupt
363 <        else if (w.isTerminating())
364 <            cancelIgnoringExceptions();
365 <        // else if FJworker, ignore interrupt
366 <    }
367 <
368 <    // Recording and reporting exceptions
369 <
370 <    private void setDoneExceptionally(Throwable rex) {
371 <        exceptionMap.put(this, rex);
372 <        setCompletion(EXCEPTIONAL);
373 <    }
391 >    // reporting results
392  
393      /**
394 <     * Throws the exception associated with status s.
395 <     *
396 <     * @throws the exception
394 >     * Returns result or throws the exception associated with status.
395 >     * Uses Unsafe as a workaround for javac not allowing rethrow of
396 >     * unchecked exceptions.
397       */
398 <    private void reportException(int s) {
399 <        if ((s &= COMPLETION_MASK) < NORMAL) {
400 <            if (s == CANCELLED)
401 <                throw new CancellationException();
402 <            else
385 <                rethrowException(exceptionMap.get(this));
398 >    private V reportResult() {
399 >        if ((status & COMPLETION_MASK) < NORMAL) {
400 >            Throwable ex = getException();
401 >            if (ex != null)
402 >                UNSAFE.throwException(ex);
403          }
404 +        return getRawResult();
405      }
406  
407      /**
# Line 426 | Line 444 | public abstract class ForkJoinTask<V> im
444              throw new TimeoutException();
445      }
446  
429    // internal execution methods
430
431    /**
432     * Calls exec, recording completion, and rethrowing exception if
433     * encountered. Caller should normally check status before calling.
434     *
435     * @return true if completed normally
436     */
437    private boolean tryExec() {
438        try { // try block must contain only call to exec
439            if (!exec())
440                return false;
441        } catch (Throwable rex) {
442            setDoneExceptionally(rex);
443            rethrowException(rex);
444            return false; // not reached
445        }
446        setNormalCompletion();
447        return true;
448    }
449
450    /**
451     * Main execution method used by worker threads. Invokes
452     * base computation unless already complete.
453     */
454    final void quietlyExec() {
455        if (status >= 0) {
456            try {
457                if (!exec())
458                    return;
459            } catch (Throwable rex) {
460                setDoneExceptionally(rex);
461                return;
462            }
463            setNormalCompletion();
464        }
465    }
466
467    /**
468     * Calls exec(), recording but not rethrowing exception.
469     * Caller should normally check status before calling.
470     *
471     * @return true if completed normally
472     */
473    private boolean tryQuietlyInvoke() {
474        try {
475            if (!exec())
476                return false;
477        } catch (Throwable rex) {
478            setDoneExceptionally(rex);
479            return false;
480        }
481        setNormalCompletion();
482        return true;
483    }
484
485    /**
486     * Cancels, ignoring any exceptions it throws.
487     */
488    final void cancelIgnoringExceptions() {
489        try {
490            cancel(false);
491        } catch (Throwable ignore) {
492        }
493    }
494
495    /**
496     * Main implementation of helpJoin
497     */
498    private int busyJoin(ForkJoinWorkerThread w) {
499        int s;
500        ForkJoinTask<?> t;
501        while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
502            t.quietlyExec();
503        return (s >= 0) ? awaitDone(w, false) : s; // block if no work
504    }
505
447      // public methods
448  
449      /**
# Line 538 | Line 479 | public abstract class ForkJoinTask<V> im
479       * @return the computed result
480       */
481      public final V join() {
482 <        ForkJoinWorkerThread w = getWorker();
483 <        if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
543 <            reportException(awaitDone(w, true));
544 <        return getRawResult();
482 >        quietlyJoin();
483 >        return reportResult();
484      }
485  
486      /**
# Line 552 | Line 491 | public abstract class ForkJoinTask<V> im
491       * @return the computed result
492       */
493      public final V invoke() {
494 <        if (status >= 0 && tryExec())
495 <            return getRawResult();
496 <        else
558 <            return join();
494 >        if (!tryExec())
495 >            quietlyJoin();
496 >        return reportResult();
497      }
498  
499      /**
# Line 632 | Line 570 | public abstract class ForkJoinTask<V> im
570              }
571          }
572          if (ex != null)
573 <            rethrowException(ex);
573 >            UNSAFE.throwException(ex);
574      }
575  
576      /**
# Line 694 | Line 632 | public abstract class ForkJoinTask<V> im
632              }
633          }
634          if (ex != null)
635 <            rethrowException(ex);
635 >            UNSAFE.throwException(ex);
636          return tasks;
637      }
638  
# Line 727 | Line 665 | public abstract class ForkJoinTask<V> im
665          return (status & COMPLETION_MASK) == CANCELLED;
666      }
667  
668 +    /**
669 +     * Cancels, ignoring any exceptions it throws. Used during worker
670 +     * and pool shutdown.
671 +     */
672 +    final void cancelIgnoringExceptions() {
673 +        try {
674 +            cancel(false);
675 +        } catch (Throwable ignore) {
676 +        }
677 +    }
678 +
679 +    /**
680 +     * Cancels ignoring exceptions if worker is terminating
681 +     */
682 +    private void cancelIfTerminating() {
683 +        Thread t = Thread.currentThread();
684 +        if ((t instanceof ForkJoinWorkerThread) &&
685 +            ((ForkJoinWorkerThread) t).isTerminating()) {
686 +            try {
687 +                cancel(false);
688 +            } catch (Throwable ignore) {
689 +            }
690 +        }
691 +    }
692 +
693      public final boolean isDone() {
694          return status < 0;
695      }
# Line 808 | Line 771 | public abstract class ForkJoinTask<V> im
771              setDoneExceptionally(rex);
772              return;
773          }
774 <        setNormalCompletion();
774 >        setCompletion(NORMAL);
775      }
776  
777      public final V get() throws InterruptedException, ExecutionException {
778 <        ForkJoinWorkerThread w = getWorker();
816 <        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
817 <            awaitDone(w, true);
778 >        quietlyJoin();
779          return reportFutureResult();
780      }
781 <
781 >    
782      public final V get(long timeout, TimeUnit unit)
783          throws InterruptedException, ExecutionException, TimeoutException {
784          long nanos = unit.toNanos(timeout);
785 <        ForkJoinWorkerThread w = getWorker();
786 <        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
787 <            awaitDone(w, nanos);
785 >        Thread t = Thread.currentThread();
786 >        if (t instanceof ForkJoinWorkerThread) {
787 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
788 >            if (!w.unpushTask(this) || !tryExec())
789 >                timedAwaitDone(w, nanos);
790 >        }
791 >        else
792 >            externalTimedAwaitDone(nanos);
793          return reportTimedFutureResult();
794      }
795  
# Line 845 | Line 811 | public abstract class ForkJoinTask<V> im
811       * @return the computed result
812       */
813      public final V helpJoin() {
814 <        ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
815 <        if (status < 0 || !w.unpushTask(this) || !tryExec())
850 <            reportException(busyJoin(w));
851 <        return getRawResult();
814 >        quietlyHelpJoin();
815 >        return reportResult();
816      }
817  
818      /**
# Line 864 | Line 828 | public abstract class ForkJoinTask<V> im
828       * ClassCastException}.
829       */
830      public final void quietlyHelpJoin() {
831 <        if (status >= 0) {
832 <            ForkJoinWorkerThread w =
833 <                (ForkJoinWorkerThread) Thread.currentThread();
834 <            if (!w.unpushTask(this) || !tryQuietlyInvoke())
835 <                busyJoin(w);
831 >        ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
832 >        if (!w.unpushTask(this) || !tryExec()) {
833 >            while (status >= 0) {
834 >                ForkJoinTask<?> t = w.scanWhileJoining(this);
835 >                if (t == null) {
836 >                    if (status >= 0)
837 >                        awaitDone(w);
838 >                    break;
839 >                }
840 >                t.tryExec();
841 >            }
842          }
843      }
844  
# Line 879 | Line 849 | public abstract class ForkJoinTask<V> im
849       * known to have aborted.
850       */
851      public final void quietlyJoin() {
852 <        if (status >= 0) {
853 <            ForkJoinWorkerThread w = getWorker();
854 <            if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
855 <                awaitDone(w, true);
852 >        Thread t = Thread.currentThread();
853 >        if (t instanceof ForkJoinWorkerThread) {
854 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
855 >            if (!w.unpushTask(this) || !tryExec())
856 >                awaitDone(w);
857          }
858 +        else
859 +            externalAwaitDone();
860      }
861  
862      /**
# Line 894 | Line 867 | public abstract class ForkJoinTask<V> im
867       * known to have aborted.
868       */
869      public final void quietlyInvoke() {
870 <        if (status >= 0 && !tryQuietlyInvoke())
870 >        if (!tryExec())
871              quietlyJoin();
872      }
873  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines