ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.123 by jsr166, Thu Oct 17 01:51:37 2019 UTC vs.
Revision 1.124 by dl, Fri Jan 17 18:12:07 2020 UTC

# Line 15 | Line 15 | import java.lang.reflect.Constructor;
15   import java.util.Collection;
16   import java.util.List;
17   import java.util.RandomAccess;
18 < import java.util.concurrent.locks.ReentrantLock;
18 > import java.util.concurrent.locks.LockSupport;
19  
20   /**
21   * Abstract base class for tasks that run within a {@link ForkJoinPool}.
# Line 170 | Line 170 | import java.util.concurrent.locks.Reentr
170   * used in extensions such as remote execution frameworks. It is
171   * sensible to serialize tasks only before or after, but not during,
172   * execution. Serialization is not relied on during execution itself.
173 + * A deserialized task that completed exceptionally in any way reports
174 + * a {@code CancellationException}.
175   *
176   * @since 1.7
177   * @author Doug Lea
# Line 188 | Line 190 | public abstract class ForkJoinTask<V> im
190       * (3) user-level methods that additionally report results.
191       * This is sometimes hard to see because this file orders exported
192       * methods in a way that flows well in javadocs.
191     */
192
193    /**
194     * The status field holds run control status bits packed into a
195     * single int to ensure atomicity.  Status is initially zero, and
196     * takes on nonnegative values until completed, upon which it
197     * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
198     * exceptional) and THROWN (in which case an exception has been
199     * stored). Tasks with dependent blocked waiting joiners have the
200     * SIGNAL bit set.  Completion of a task with SIGNAL set awakens
201     * any waiters via notifyAll. (Waiters also help signal others
202     * upon completion.)
203     *
204     * These control bits occupy only (some of) the upper half (16
205     * bits) of status field. The lower bits are used for user-defined
206     * tags.
207     */
208    volatile int status; // accessed directly by pool and workers
209
210    private static final int DONE     = 1 << 31; // must be negative
211    private static final int ABNORMAL = 1 << 18; // set atomically with DONE
212    private static final int THROWN   = 1 << 17; // set atomically with ABNORMAL
213    private static final int SIGNAL   = 1 << 16; // true if joiner waiting
214    private static final int SMASK    = 0xffff;  // short bits for tags
215
216    static boolean isExceptionalStatus(int s) {  // needed by subclasses
217        return (s & THROWN) != 0;
218    }
219
220    /**
221     * Sets DONE status and wakes up threads waiting to join this task.
193       *
194 <     * @return status on exit
194 >     * Revision notes: The use of "Aux" field replaces previous
195 >     * reliance on a table to hold exceptions and synchronized blocks
196 >     * and monitors to wait for completion.
197       */
225    private int setDone() {
226        int s;
227        if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
228            synchronized (this) { notifyAll(); }
229        return s | DONE;
230    }
198  
199      /**
200 <     * Marks cancelled or exceptional completion unless already done.
201 <     *
202 <     * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
203 <     * @return status on exit
204 <     */
205 <    private int abnormalCompletion(int completion) {
206 <        for (int s, ns;;) {
207 <            if ((s = status) < 0)
208 <                return s;
209 <            else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
210 <                if ((s & SIGNAL) != 0)
211 <                    synchronized (this) { notifyAll(); }
212 <                return ns;
200 >     * Nodes for threads waiting for completion, or holding a thrown
201 >     * exception (never both). Waiting threads prepend nodes
202 >     * Treiber-stack-style.  Signallers detach and unpark
203 >     * waiters. Cancelled waiters try to unsplice.
204 >     */
205 >    static final class Aux {
206 >        final Thread thread;
207 >        final Throwable ex;  // null if a waiter
208 >        Aux next;            // accessed only via memory-acquire chains
209 >        Aux(Thread thread, Throwable ex) {
210 >            this.thread = thread;
211 >            this.ex = ex;
212 >        }
213 >        final boolean casNext(Aux c, Aux v) { // used only in cancellation
214 >            return NEXT.compareAndSet(this, c, v);
215 >        }
216 >        private static final VarHandle NEXT;
217 >        static {
218 >            try {
219 >                NEXT = MethodHandles.lookup()
220 >                    .findVarHandle(Aux.class, "next", Aux.class);
221 >            } catch (ReflectiveOperationException e) {
222 >                throw new ExceptionInInitializerError(e);
223              }
224          }
225      }
226  
227 <    /**
228 <     * Primary execution method for stolen tasks. Unless done, calls
229 <     * exec and records status if completed, but doesn't wait for
230 <     * completion otherwise.
231 <     *
232 <     * @return status on exit from this method
227 >    /*
228 >     * The status field holds bits packed into a single int to ensure
229 >     * atomicity.  Status is initially zero, and takes on nonnegative
230 >     * values until completed, upon which it holds (sign bit) DONE,
231 >     * possibly with ABNORMAL (cancelled or exceptional) and THROWN
232 >     * (in which case an exception has been stored).  These control
233 >     * bits occupy only (some of) the upper half (16 bits) of status
234 >     * field. The lower bits are used for user-defined tags.
235       */
236 <    final int doExec() {
237 <        int s; boolean completed;
238 <        if ((s = status) >= 0) {
239 <            try {
240 <                completed = exec();
241 <            } catch (Throwable rex) {
242 <                completed = false;
243 <                s = setExceptionalCompletion(rex);
236 >    private static final int DONE     = 1 << 31; // must be negative
237 >    private static final int ABNORMAL = 1 << 16; // set atomically with DONE
238 >    private static final int THROWN   = 1 << 17; // set atomically with ABNORMAL
239 >    private static final int SMASK    = 0xffff;  // short bits for tags
240 >    // sentinels can be any positive upper half value:
241 >    private static final int INTRPT   = 1 << 16; // awaitDone interrupt return
242 >    static final         int ADJUST   = 1 << 16; // uncompensate after block
243 >
244 >    // Fields
245 >    volatile int status;                // accessed directly by pool and workers
246 >    private transient volatile Aux aux; // either waiters or thrown Exception
247 >    // Support for atomic operations
248 >    private static final VarHandle STATUS;
249 >    private static final VarHandle AUX;
250 >    private int getAndBitwiseOrStatus(int v) {
251 >        return (int)STATUS.getAndBitwiseOr(this, v);
252 >    }
253 >    private boolean casStatus(int c, int v) {
254 >        return STATUS.weakCompareAndSet(this, c, v);
255 >    }
256 >    private boolean casAux(Aux c, Aux v) {
257 >        return AUX.compareAndSet(this, c, v);
258 >    }
259 >
260 >    /** Removes and unparks waiters */
261 >    private void signalWaiters() {
262 >        for (Aux a; (a = aux) != null && a.ex == null; ) {
263 >            if (casAux(a, null)) {             // detach entire list
264 >                for (Thread t; a != null; a = a.next) {
265 >                    if ((t = a.thread) != Thread.currentThread() && t != null)
266 >                        LockSupport.unpark(t); // don't self-signal
267 >                }
268 >                break;
269              }
266            if (completed)
267                s = setDone();
270          }
269        return s;
271      }
272  
273      /**
274 <     * If not done, sets SIGNAL status and performs Object.wait(timeout).
274 <     * This task may or may not be done on exit. Ignores interrupts.
274 >     * Possibly blocks until task is done or interrupted or timed out.
275       *
276 <     * @param timeout using Object.wait conventions.
277 <     */
278 <    final void internalWait(long timeout) {
279 <        if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
280 <            synchronized (this) {
281 <                if (status >= 0)
282 <                    try { wait(timeout); } catch (InterruptedException ie) { }
283 <                else
284 <                    notifyAll();
276 >     * @param interruptible true if wait can be cancelled by interrupt
277 >     * @param deadline if non-zero use timed waits and possibly timeout
278 >     * @param pool if nonull, pool to uncompensate when unblocking
279 >     * @return status on exit, or INTRPT if interrupted while waiting
280 >     */
281 >    final int awaitDone(boolean interruptible, long deadline,
282 >                        ForkJoinPool pool) {
283 >        int s; Aux node = null; boolean interrupted = false, queued = false;
284 >        for (;;) {
285 >            Aux a; long nanos;
286 >            if ((s = status) < 0)
287 >                break;
288 >            else if (node == null)
289 >                node = new Aux(Thread.currentThread(), null);
290 >            else if (!queued) {
291 >                if ((a = aux) != null && a.ex != null)
292 >                    Thread.onSpinWait(); // exception in progress
293 >                else if (queued = casAux(node.next = a, node))
294 >                    LockSupport.setCurrentBlocker(this);
295 >            }
296 >            else {
297 >                if (deadline == 0L)
298 >                    LockSupport.park();
299 >                else if ((nanos = deadline - System.nanoTime()) > 0L)
300 >                    LockSupport.parkNanos(nanos);
301 >                else {
302 >                    s = 0;               // timeout
303 >                    break;
304 >                }
305 >                if ((interrupted |= Thread.interrupted()) && interruptible) {
306 >                    s = INTRPT;
307 >                    break;
308 >                }
309              }
310          }
311 <    }
312 <
313 <    /**
314 <     * Blocks a non-worker-thread until completion.
315 <     * @return status upon completion
316 <     */
317 <    private int externalAwaitDone() {
318 <        int s = tryExternalHelp();
319 <        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
320 <            boolean interrupted = false;
321 <            synchronized (this) {
322 <                for (;;) {
323 <                    if ((s = status) >= 0) {
324 <                        try {
325 <                            wait(0L);
326 <                        } catch (InterruptedException ie) {
303 <                            interrupted = true;
304 <                        }
305 <                    }
306 <                    else {
307 <                        notifyAll();
308 <                        break;
311 >        if (pool != null)
312 >            pool.uncompensate();
313 >        if (s >= 0) {                     // try to unsplice after cancellation
314 >            outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
315 >                for (Aux trail = null;;) {
316 >                    Aux next = a.next;
317 >                    if (a == node) {
318 >                        if (trail != null)
319 >                            trail.casNext(trail, next);
320 >                        else if (casAux(a, next))
321 >                            break outer; // cannot be re-encountered
322 >                        break;           // restart
323 >                    } else {
324 >                        trail = a;
325 >                        if ((a = next) == null)
326 >                            break outer;
327                      }
328                  }
329              }
330 <            if (interrupted)
331 <                Thread.currentThread().interrupt();
330 >        }
331 >        else if (interrupted)
332 >            Thread.currentThread().interrupt();
333 >        if (queued) {
334 >            LockSupport.setCurrentBlocker(null);
335 >            signalWaiters();             // help clean or signal
336          }
337          return s;
338      }
339  
340      /**
341 <     * Blocks a non-worker-thread until completion or interruption.
341 >     * Sets DONE status and wakes up threads waiting to join this task.
342 >     * @return status on exit
343       */
344 <    private int externalInterruptibleAwaitDone() throws InterruptedException {
345 <        int s = tryExternalHelp();
346 <        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
324 <            synchronized (this) {
325 <                for (;;) {
326 <                    if ((s = status) >= 0)
327 <                        wait(0L);
328 <                    else {
329 <                        notifyAll();
330 <                        break;
331 <                    }
332 <                }
333 <            }
334 <        }
335 <        else if (Thread.interrupted())
336 <            throw new InterruptedException();
344 >    private int setDone() {
345 >        int s = getAndBitwiseOrStatus(DONE) | DONE;
346 >        signalWaiters();
347          return s;
348      }
349  
350      /**
351 <     * Tries to help with tasks allowed for external callers.
352 <     *
353 <     * @return current status
351 >     * Sets ABNORMAL DONE status unless already done, and wakes up threads
352 >     * waiting to join this task.
353 >     * @return status on exit
354       */
355 <    private int tryExternalHelp() {
355 >    private int trySetCancelled() {
356          int s;
357 <        return ((s = status) < 0 ? s:
358 <                (this instanceof CountedCompleter) ?
359 <                ForkJoinPool.common.externalHelpComplete(
350 <                    (CountedCompleter<?>)this, 0) :
351 <                ForkJoinPool.common.tryExternalUnpush(this) ?
352 <                doExec() : 0);
357 >        do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
358 >        signalWaiters();
359 >        return s;
360      }
361  
362      /**
363 <     * Implementation for join, get, quietlyJoin. Directly handles
364 <     * only cases of already-completed, external wait, and
365 <     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
363 >     * Records exception and sets ABNORMAL THROWN DONE status unless
364 >     * already done, and wakes up threads waiting to join this task.
365 >     * If losing a race with setDone or trySetCancelled, the exception
366 >     * may be recorded but not reported.
367       *
368 <     * @return status upon completion
368 >     * @return status on exit
369       */
370 <    private int doJoin() {
371 <        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
372 <        return (s = status) < 0 ? s :
373 <            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
374 <            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
375 <            tryUnpush(this) && (s = doExec()) < 0 ? s :
376 <            wt.pool.awaitJoin(w, this, 0L) :
377 <            externalAwaitDone();
370 >    final int trySetThrown(Throwable ex) {
371 >        Aux h = new Aux(Thread.currentThread(), ex), p = null;
372 >        boolean installed = false;
373 >        int s;
374 >        while ((s = status) >= 0) {
375 >            Aux a;
376 >            if (!installed && ((a = aux) == null || a.ex == null) &&
377 >                (installed = casAux(a, h)))
378 >                p = a; // list of waiters replaced by h
379 >            if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
380 >                break;
381 >        }
382 >        for (; p != null; p = p.next)
383 >            LockSupport.unpark(p.thread);
384 >        return s;
385      }
386  
387      /**
388 <     * Implementation for invoke, quietlyInvoke.
388 >     * Records exception unless already done. Overridable in subclasses.
389       *
390 <     * @return status upon completion
390 >     * @return status on exit
391       */
392 <    private int doInvoke() {
393 <        int s; Thread t; ForkJoinWorkerThread wt;
379 <        return (s = doExec()) < 0 ? s :
380 <            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
381 <            (wt = (ForkJoinWorkerThread)t).pool.
382 <            awaitJoin(wt.workQueue, this, 0L) :
383 <            externalAwaitDone();
392 >    int trySetException(Throwable ex) {
393 >        return trySetThrown(ex);
394      }
395  
396 <    // Exception table support
397 <
388 <    /**
389 <     * Hash table of exceptions thrown by tasks, to enable reporting
390 <     * by callers. Because exceptions are rare, we don't directly keep
391 <     * them with task objects, but instead use a weak ref table.  Note
392 <     * that cancellation exceptions don't appear in the table, but are
393 <     * instead recorded as status values.
394 <     *
395 <     * The exception table has a fixed capacity.
396 <     */
397 <    private static final ExceptionNode[] exceptionTable
398 <        = new ExceptionNode[32];
399 <
400 <    /** Lock protecting access to exceptionTable. */
401 <    private static final ReentrantLock exceptionTableLock
402 <        = new ReentrantLock();
403 <
404 <    /** Reference queue of stale exceptionally completed tasks. */
405 <    private static final ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue
406 <        = new ReferenceQueue<>();
407 <
408 <    /**
409 <     * Key-value nodes for exception table.  The chained hash table
410 <     * uses identity comparisons, full locking, and weak references
411 <     * for keys. The table has a fixed capacity because it only
412 <     * maintains task exceptions long enough for joiners to access
413 <     * them, so should never become very large for sustained
414 <     * periods. However, since we do not know when the last joiner
415 <     * completes, we must use weak references and expunge them. We do
416 <     * so on each operation (hence full locking). Also, some thread in
417 <     * any ForkJoinPool will call helpExpungeStaleExceptions when its
418 <     * pool becomes isQuiescent.
419 <     */
420 <    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
421 <        final Throwable ex;
422 <        ExceptionNode next;
423 <        final long thrower;  // use id not ref to avoid weak cycles
424 <        final int hashCode;  // store task hashCode before weak ref disappears
425 <        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next,
426 <                      ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue) {
427 <            super(task, exceptionTableRefQueue);
428 <            this.ex = ex;
429 <            this.next = next;
430 <            this.thrower = Thread.currentThread().getId();
431 <            this.hashCode = System.identityHashCode(task);
432 <        }
396 >    static boolean isExceptionalStatus(int s) {  // needed by subclasses
397 >        return (s & THROWN) != 0;
398      }
399  
400      /**
401 <     * Records exception and sets status.
401 >     * Unless done, calls exec and records status if completed, but
402 >     * doesn't wait for completion otherwise.
403       *
404 <     * @return status on exit
404 >     * @return status on exit from this method
405       */
406 <    final int recordExceptionalCompletion(Throwable ex) {
407 <        int s;
406 >    final int doExec() {
407 >        int s; boolean completed;
408          if ((s = status) >= 0) {
443            int h = System.identityHashCode(this);
444            final ReentrantLock lock = exceptionTableLock;
445            lock.lock();
409              try {
410 <                expungeStaleExceptions();
411 <                ExceptionNode[] t = exceptionTable;
412 <                int i = h & (t.length - 1);
413 <                for (ExceptionNode e = t[i]; ; e = e.next) {
451 <                    if (e == null) {
452 <                        t[i] = new ExceptionNode(this, ex, t[i],
453 <                                                 exceptionTableRefQueue);
454 <                        break;
455 <                    }
456 <                    if (e.get() == this) // already present
457 <                        break;
458 <                }
459 <            } finally {
460 <                lock.unlock();
410 >                completed = exec();
411 >            } catch (Throwable rex) {
412 >                s = trySetException(rex);
413 >                completed = false;
414              }
415 <            s = abnormalCompletion(DONE | ABNORMAL | THROWN);
415 >            if (completed)
416 >                s = setDone();
417          }
418          return s;
419      }
420  
421      /**
422 <     * Records exception and possibly propagates.
422 >     * Helps and/or waits for completion. Overridable in subclasses.
423       *
424 <     * @return status on exit
425 <     */
426 <    private int setExceptionalCompletion(Throwable ex) {
473 <        int s = recordExceptionalCompletion(ex);
474 <        if ((s & THROWN) != 0)
475 <            internalPropagateException(ex);
476 <        return s;
477 <    }
478 <
479 <    /**
480 <     * Hook for exception propagation support for tasks with completers.
424 >     * @param interruptible true if wait can be cancelled by interrupt
425 >     * @param ran true if task known to be invoked
426 >     * @return status on exit, or INTRPT if interruptible and interrupted
427       */
428 <    void internalPropagateException(Throwable ex) {
428 >    int awaitJoin(boolean interruptible, boolean ran) {
429 >        Thread t; ForkJoinWorkerThread wt;
430 >        ForkJoinPool.WorkQueue q = null;
431 >        ForkJoinPool p = null;
432 >        boolean unforked = false;
433 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
434 >            p = (wt = (ForkJoinWorkerThread)t).pool;
435 >            q = wt.workQueue;
436 >            if (!ran && q != null && q.tryRemove(this))
437 >                unforked = true;
438 >        }
439 >        else if (!ran && (q = ForkJoinPool.commonQueue()) != null &&
440 >                 q.externalTryUnpush(this))
441 >            unforked = true;
442 >        int s;
443 >        if (unforked && (s = doExec()) < 0)
444 >            return s;
445 >        if (p != null) {
446 >            if ((s = p.helpJoin(this, q)) < 0)
447 >                return s;
448 >            if (s != ADJUST) // uncompensated
449 >                p = null;
450 >        }
451 >        return awaitDone(interruptible, 0L, p);
452      }
453  
454      /**
# Line 489 | Line 458 | public abstract class ForkJoinTask<V> im
458       * shutdown, so guard against this case.
459       */
460      static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
461 <        if (t != null && t.status >= 0) {
461 >        if (t != null) {
462              try {
463                  t.cancel(false);
464              } catch (Throwable ignore) {
# Line 498 | Line 467 | public abstract class ForkJoinTask<V> im
467      }
468  
469      /**
501     * Removes exception node and clears status.
502     */
503    private void clearExceptionalCompletion() {
504        int h = System.identityHashCode(this);
505        final ReentrantLock lock = exceptionTableLock;
506        lock.lock();
507        try {
508            ExceptionNode[] t = exceptionTable;
509            int i = h & (t.length - 1);
510            ExceptionNode e = t[i];
511            ExceptionNode pred = null;
512            while (e != null) {
513                ExceptionNode next = e.next;
514                if (e.get() == this) {
515                    if (pred == null)
516                        t[i] = next;
517                    else
518                        pred.next = next;
519                    break;
520                }
521                pred = e;
522                e = next;
523            }
524            expungeStaleExceptions();
525            status = 0;
526        } finally {
527            lock.unlock();
528        }
529    }
530
531    /**
470       * Returns a rethrowable exception for this task, if available.
471       * To provide accurate stack traces, if the exception was not
472       * thrown by the current thread, we try to create a new exception
# Line 543 | Line 481 | public abstract class ForkJoinTask<V> im
481       * @return the exception, or null if none
482       */
483      private Throwable getThrowableException() {
484 <        int h = System.identityHashCode(this);
485 <        ExceptionNode e;
486 <        final ReentrantLock lock = exceptionTableLock;
487 <        lock.lock();
550 <        try {
551 <            expungeStaleExceptions();
552 <            ExceptionNode[] t = exceptionTable;
553 <            e = t[h & (t.length - 1)];
554 <            while (e != null && e.get() != this)
555 <                e = e.next;
556 <        } finally {
557 <            lock.unlock();
558 <        }
559 <        Throwable ex;
560 <        if (e == null || (ex = e.ex) == null)
561 <            return null;
562 <        if (e.thrower != Thread.currentThread().getId()) {
484 >        Throwable ex; Aux a;
485 >        if ((a = aux) == null)
486 >            ex = null;
487 >        else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
488              try {
489 <                Constructor<?> noArgCtor = null;
565 <                // public ctors only
489 >                Constructor<?> noArgCtor = null, oneArgCtor = null;
490                  for (Constructor<?> c : ex.getClass().getConstructors()) {
491                      Class<?>[] ps = c.getParameterTypes();
492                      if (ps.length == 0)
493                          noArgCtor = c;
494 <                    else if (ps.length == 1 && ps[0] == Throwable.class)
495 <                        return (Throwable)c.newInstance(ex);
494 >                    else if (ps.length == 1 && ps[0] == Throwable.class) {
495 >                        oneArgCtor = c;
496 >                        break;
497 >                    }
498                  }
499 <                if (noArgCtor != null) {
500 <                    Throwable wx = (Throwable)noArgCtor.newInstance();
501 <                    wx.initCause(ex);
502 <                    return wx;
499 >                if (oneArgCtor != null)
500 >                    ex = (Throwable)oneArgCtor.newInstance(ex);
501 >                else if (noArgCtor != null) {
502 >                    Throwable rx = (Throwable)noArgCtor.newInstance();
503 >                    rx.initCause(ex);
504 >                    ex = rx;
505                  }
506              } catch (Exception ignore) {
507              }
# Line 582 | Line 510 | public abstract class ForkJoinTask<V> im
510      }
511  
512      /**
513 <     * Polls stale refs and removes them. Call only while holding lock.
514 <     */
587 <    private static void expungeStaleExceptions() {
588 <        for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
589 <            if (x instanceof ExceptionNode) {
590 <                ExceptionNode[] t = exceptionTable;
591 <                int i = ((ExceptionNode)x).hashCode & (t.length - 1);
592 <                ExceptionNode e = t[i];
593 <                ExceptionNode pred = null;
594 <                while (e != null) {
595 <                    ExceptionNode next = e.next;
596 <                    if (e == x) {
597 <                        if (pred == null)
598 <                            t[i] = next;
599 <                        else
600 <                            pred.next = next;
601 <                        break;
602 <                    }
603 <                    pred = e;
604 <                    e = next;
605 <                }
606 <            }
607 <        }
608 <    }
609 <
610 <    /**
611 <     * If lock is available, polls stale refs and removes them.
612 <     * Called from ForkJoinPool when pools become quiescent.
513 >     * Throws exception associated with the given status, or
514 >     * CancellationException if none recorded.
515       */
516 <    static final void helpExpungeStaleExceptions() {
517 <        final ReentrantLock lock = exceptionTableLock;
518 <        if (lock.tryLock()) {
617 <            try {
618 <                expungeStaleExceptions();
619 <            } finally {
620 <                lock.unlock();
621 <            }
622 <        }
516 >    private void reportException(int s) {
517 >        ForkJoinTask.<RuntimeException>uncheckedThrow(
518 >            (s & THROWN) != 0 ? getThrowableException() : null);
519      }
520  
521      /**
522 <     * A version of "sneaky throw" to relay exceptions.
522 >     * A version of "sneaky throw" to relay exceptions in other
523 >     * contexts.
524       */
525      static void rethrow(Throwable ex) {
526          ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
# Line 632 | Line 529 | public abstract class ForkJoinTask<V> im
529      /**
530       * The sneaky part of sneaky throw, relying on generics
531       * limitations to evade compiler complaints about rethrowing
532 <     * unchecked exceptions.
532 >     * unchecked exceptions. If argument null, throws
533 >     * CancellationException.
534       */
535      @SuppressWarnings("unchecked") static <T extends Throwable>
536      void uncheckedThrow(Throwable t) throws T {
537 <        if (t != null)
538 <            throw (T)t; // rely on vacuous cast
539 <        else
642 <            throw new Error("Unknown Exception");
643 <    }
644 <
645 <    /**
646 <     * Throws exception, if any, associated with the given status.
647 <     */
648 <    private void reportException(int s) {
649 <        rethrow((s & THROWN) != 0 ? getThrowableException() :
650 <                new CancellationException());
537 >        if (t == null)
538 >            t = new CancellationException();
539 >        throw (T)t; // rely on vacuous cast
540      }
541  
542      // public methods
# Line 668 | Line 557 | public abstract class ForkJoinTask<V> im
557       * @return {@code this}, to simplify usage
558       */
559      public final ForkJoinTask<V> fork() {
560 <        Thread t;
560 >        Thread t; ForkJoinWorkerThread w;
561          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
562 <            ((ForkJoinWorkerThread)t).workQueue.push(this);
562 >            (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
563          else
564              ForkJoinPool.common.externalPush(this);
565          return this;
# Line 689 | Line 578 | public abstract class ForkJoinTask<V> im
578       */
579      public final V join() {
580          int s;
581 <        if (((s = doJoin()) & ABNORMAL) != 0)
581 >        if ((s = status) >= 0)
582 >            s = awaitJoin(false, false);
583 >        if ((s & ABNORMAL) != 0)
584              reportException(s);
585          return getRawResult();
586      }
# Line 704 | Line 595 | public abstract class ForkJoinTask<V> im
595       */
596      public final V invoke() {
597          int s;
598 <        if (((s = doInvoke()) & ABNORMAL) != 0)
598 >        if ((s = doExec()) >= 0)
599 >            s = awaitJoin(false, true);
600 >        if ((s & ABNORMAL) != 0)
601              reportException(s);
602          return getRawResult();
603      }
# Line 728 | Line 621 | public abstract class ForkJoinTask<V> im
621       */
622      public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
623          int s1, s2;
624 +        if (t1 == null || t2 == null)
625 +            throw new NullPointerException();
626          t2.fork();
627 <        if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
627 >        if ((s1 = t1.doExec()) >= 0)
628 >            s1 = t1.awaitJoin(false, true);
629 >        if ((s1 & ABNORMAL) != 0) {
630 >            t2.cancel(false);
631              t1.reportException(s1);
632 <        if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
633 <            t2.reportException(s2);
632 >        }
633 >        else {
634 >            if ((s2 = t2.status) >= 0)
635 >                s2 = t2.awaitJoin(false, false);
636 >            if ((s2 & ABNORMAL) != 0)
637 >                t2.reportException(s2);
638 >        }
639      }
640  
641      /**
# Line 753 | Line 656 | public abstract class ForkJoinTask<V> im
656      public static void invokeAll(ForkJoinTask<?>... tasks) {
657          Throwable ex = null;
658          int last = tasks.length - 1;
659 <        for (int i = last; i >= 0; --i) {
660 <            ForkJoinTask<?> t = tasks[i];
661 <            if (t == null) {
662 <                if (ex == null)
663 <                    ex = new NullPointerException();
659 >        for (int i = last, s; i >= 0; --i) {
660 >            ForkJoinTask<?> t;
661 >            if ((t = tasks[i]) == null) {
662 >                ex = new NullPointerException();
663 >                break;
664              }
665 <            else if (i != 0)
666 <                t.fork();
667 <            else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
668 <                ex = t.getException();
766 <        }
767 <        for (int i = 1; i <= last; ++i) {
768 <            ForkJoinTask<?> t = tasks[i];
769 <            if (t != null) {
770 <                if (ex != null)
771 <                    t.cancel(false);
772 <                else if ((t.doJoin() & ABNORMAL) != 0)
665 >            if (i == 0) {
666 >                if ((s = t.doExec()) >= 0)
667 >                    s = t.awaitJoin(false, true);
668 >                if ((s & ABNORMAL) != 0)
669                      ex = t.getException();
670 +                break;
671              }
672 +            t.fork();
673          }
674 <        if (ex != null)
674 >        if (ex == null) {
675 >            for (int i = 1, s; i <= last; ++i) {
676 >                ForkJoinTask<?> t;
677 >                if ((t = tasks[i]) != null) {
678 >                    if ((s = t.status) >= 0)
679 >                        s = t.awaitJoin(false, false);
680 >                    if ((s & ABNORMAL) != 0) {
681 >                        ex = t.getException();
682 >                        break;
683 >                    }
684 >                }
685 >            }
686 >        }
687 >        if (ex != null) { // try to cancel others
688 >            for (int i = 0, s; i <= last; ++i) {
689 >                ForkJoinTask<?> t;
690 >                if ((t = tasks[i]) != null)
691 >                    t.cancel(false);
692 >            }
693              rethrow(ex);
694 +        }
695      }
696  
697      /**
# Line 804 | Line 721 | public abstract class ForkJoinTask<V> im
721          List<? extends ForkJoinTask<?>> ts =
722              (List<? extends ForkJoinTask<?>>) tasks;
723          Throwable ex = null;
724 <        int last = ts.size() - 1;
725 <        for (int i = last; i >= 0; --i) {
726 <            ForkJoinTask<?> t = ts.get(i);
727 <            if (t == null) {
728 <                if (ex == null)
729 <                    ex = new NullPointerException();
724 >        int last = ts.size() - 1;  // nearly same as array version
725 >        for (int i = last, s; i >= 0; --i) {
726 >            ForkJoinTask<?> t;
727 >            if ((t = ts.get(i)) == null) {
728 >                ex = new NullPointerException();
729 >                break;
730              }
731 <            else if (i != 0)
732 <                t.fork();
733 <            else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
734 <                ex = t.getException();
818 <        }
819 <        for (int i = 1; i <= last; ++i) {
820 <            ForkJoinTask<?> t = ts.get(i);
821 <            if (t != null) {
822 <                if (ex != null)
823 <                    t.cancel(false);
824 <                else if ((t.doJoin() & ABNORMAL) != 0)
731 >            if (i == 0) {
732 >                if ((s = t.doExec()) >= 0)
733 >                    s = t.awaitJoin(false, true);
734 >                if ((s & ABNORMAL) != 0)
735                      ex = t.getException();
736 +                break;
737              }
738 +            t.fork();
739          }
740 <        if (ex != null)
740 >        if (ex == null) {
741 >            for (int i = 1, s; i <= last; ++i) {
742 >                ForkJoinTask<?> t;
743 >                if ((t = ts.get(i)) != null) {
744 >                    if ((s = t.status) >= 0)
745 >                        s = t.awaitJoin(false, false);
746 >                    if ((s & ABNORMAL) != 0) {
747 >                        ex = t.getException();
748 >                        break;
749 >                    }
750 >                }
751 >            }
752 >        }
753 >        if (ex != null) {
754 >            for (int i = 0, s; i <= last; ++i) {
755 >                ForkJoinTask<?> t;
756 >                if ((t = ts.get(i)) != null)
757 >                    t.cancel(false);
758 >            }
759              rethrow(ex);
760 +        }
761          return tasks;
762      }
763  
# Line 858 | Line 789 | public abstract class ForkJoinTask<V> im
789       * @return {@code true} if this task is now cancelled
790       */
791      public boolean cancel(boolean mayInterruptIfRunning) {
792 <        int s = abnormalCompletion(DONE | ABNORMAL);
862 <        return (s & (ABNORMAL | THROWN)) == ABNORMAL;
792 >        return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
793      }
794  
795      public final boolean isDone() {
# Line 919 | Line 849 | public abstract class ForkJoinTask<V> im
849       * thrown will be a {@code RuntimeException} with cause {@code ex}.
850       */
851      public void completeExceptionally(Throwable ex) {
852 <        setExceptionalCompletion((ex instanceof RuntimeException) ||
853 <                                 (ex instanceof Error) ? ex :
854 <                                 new RuntimeException(ex));
852 >        trySetException((ex instanceof RuntimeException) ||
853 >                        (ex instanceof Error) ? ex :
854 >                        new RuntimeException(ex));
855      }
856  
857      /**
# Line 941 | Line 871 | public abstract class ForkJoinTask<V> im
871          try {
872              setRawResult(value);
873          } catch (Throwable rex) {
874 <            setExceptionalCompletion(rex);
874 >            trySetException(rex);
875              return;
876          }
877          setDone();
# Line 971 | Line 901 | public abstract class ForkJoinTask<V> im
901       * member of a ForkJoinPool and was interrupted while waiting
902       */
903      public final V get() throws InterruptedException, ExecutionException {
904 <        int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
905 <            doJoin() : externalInterruptibleAwaitDone();
906 <        if ((s & THROWN) != 0)
904 >        int s;
905 >        if (Thread.interrupted())
906 >            s = INTRPT;
907 >        else if ((s = status) >= 0)
908 >            s = awaitJoin(true, false);
909 >        if (s == INTRPT)
910 >            throw new InterruptedException();
911 >        else if ((s & THROWN) != 0)
912              throw new ExecutionException(getThrowableException());
913          else if ((s & ABNORMAL) != 0)
914              throw new CancellationException();
# Line 997 | Line 932 | public abstract class ForkJoinTask<V> im
932       */
933      public final V get(long timeout, TimeUnit unit)
934          throws InterruptedException, ExecutionException, TimeoutException {
1000        int s;
935          long nanos = unit.toNanos(timeout);
936 +        int s;
937          if (Thread.interrupted())
938 <            throw new InterruptedException();
939 <        if ((s = status) >= 0 && nanos > 0L) {
940 <            long d = System.nanoTime() + nanos;
938 >            s = INTRPT;
939 >        else if ((s = status) >= 0 && nanos > 0L) {
940 >            long d = nanos + System.nanoTime();
941              long deadline = (d == 0L) ? 1L : d; // avoid 0
942 <            Thread t = Thread.currentThread();
943 <            if (t instanceof ForkJoinWorkerThread) {
944 <                ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
945 <                s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
946 <            }
947 <            else if ((s = ((this instanceof CountedCompleter) ?
1013 <                           ForkJoinPool.common.externalHelpComplete(
1014 <                               (CountedCompleter<?>)this, 0) :
1015 <                           ForkJoinPool.common.tryExternalUnpush(this) ?
1016 <                           doExec() : 0)) >= 0) {
1017 <                long ns, ms; // measure in nanosecs, but wait in millisecs
1018 <                while ((s = status) >= 0 &&
1019 <                       (ns = deadline - System.nanoTime()) > 0L) {
1020 <                    if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
1021 <                        (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
1022 <                        synchronized (this) {
1023 <                            if (status >= 0)
1024 <                                wait(ms); // OK to throw InterruptedException
1025 <                            else
1026 <                                notifyAll();
1027 <                        }
1028 <                    }
1029 <                }
1030 <            }
942 >            ForkJoinPool p = null; Thread t;
943 >            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
944 >                (p = ((ForkJoinWorkerThread)t).pool) != null &&
945 >                p.preCompensate() == 0)
946 >                p = null;
947 >            s = awaitDone(true, deadline, p);
948          }
949 <        if (s >= 0)
949 >
950 >        if (s == INTRPT)
951 >            throw new InterruptedException();
952 >        else if (s >= 0)
953              throw new TimeoutException();
954          else if ((s & THROWN) != 0)
955              throw new ExecutionException(getThrowableException());
# Line 1046 | Line 966 | public abstract class ForkJoinTask<V> im
966       * known to have aborted.
967       */
968      public final void quietlyJoin() {
969 <        doJoin();
969 >        if (status >= 0)
970 >            awaitJoin(false, false);
971      }
972  
973      /**
# Line 1055 | Line 976 | public abstract class ForkJoinTask<V> im
976       * exception.
977       */
978      public final void quietlyInvoke() {
979 <        doInvoke();
979 >        if (doExec() >= 0)
980 >            awaitJoin(false, true);
981      }
982  
983      /**
# Line 1066 | Line 988 | public abstract class ForkJoinTask<V> im
988       * all are processed.
989       */
990      public static void helpQuiesce() {
991 <        Thread t;
992 <        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
993 <            ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
994 <            wt.pool.helpQuiescePool(wt.workQueue);
1073 <        }
991 >        Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
992 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
993 >            (p = (w = (ForkJoinWorkerThread)t).pool) != null)
994 >            p.helpQuiescePool(w.workQueue);
995          else
996              ForkJoinPool.quiesceCommonPool();
997      }
# Line 1092 | Line 1013 | public abstract class ForkJoinTask<V> im
1013       * setRawResult(null)}.
1014       */
1015      public void reinitialize() {
1016 <        if ((status & THROWN) != 0)
1017 <            clearExceptionalCompletion();
1097 <        else
1098 <            status = 0;
1016 >        aux = null;
1017 >        status = 0;
1018      }
1019  
1020      /**
# Line 1108 | Line 1027 | public abstract class ForkJoinTask<V> im
1027       * @return the pool, or {@code null} if none
1028       */
1029      public static ForkJoinPool getPool() {
1030 <        Thread t = Thread.currentThread();
1031 <        return (t instanceof ForkJoinWorkerThread) ?
1032 <            ((ForkJoinWorkerThread) t).pool : null;
1030 >        Thread t;
1031 >        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1032 >                ((ForkJoinWorkerThread) t).pool : null);
1033      }
1034  
1035      /**
# Line 1136 | Line 1055 | public abstract class ForkJoinTask<V> im
1055       * @return {@code true} if unforked
1056       */
1057      public boolean tryUnfork() {
1058 <        Thread t;
1058 >        Thread t; ForkJoinPool.WorkQueue q;
1059          return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1060 <                ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
1061 <                ForkJoinPool.common.tryExternalUnpush(this));
1060 >                (q = ((ForkJoinWorkerThread)t).workQueue) != null &&
1061 >                q.tryUnpush(this) :
1062 >                (q = ForkJoinPool.commonQueue()) != null &&
1063 >                q.externalTryUnpush(this));
1064      }
1065  
1066      /**
# Line 1155 | Line 1076 | public abstract class ForkJoinTask<V> im
1076          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1077              q = ((ForkJoinWorkerThread)t).workQueue;
1078          else
1079 <            q = ForkJoinPool.commonSubmitterQueue();
1079 >            q = ForkJoinPool.commonQueue();
1080          return (q == null) ? 0 : q.queueSize();
1081      }
1082  
# Line 1230 | Line 1151 | public abstract class ForkJoinTask<V> im
1151          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1152              q = ((ForkJoinWorkerThread)t).workQueue;
1153          else
1154 <            q = ForkJoinPool.commonSubmitterQueue();
1154 >            q = ForkJoinPool.commonQueue();
1155          return (q == null) ? null : q.peek();
1156      }
1157  
# Line 1245 | Line 1166 | public abstract class ForkJoinTask<V> im
1166       */
1167      protected static ForkJoinTask<?> pollNextLocalTask() {
1168          Thread t;
1169 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1170 <            ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
1250 <            null;
1169 >        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1170 >                ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null);
1171      }
1172  
1173      /**
# Line 1264 | Line 1184 | public abstract class ForkJoinTask<V> im
1184       * @return a task, or {@code null} if none are available
1185       */
1186      protected static ForkJoinTask<?> pollTask() {
1187 <        Thread t; ForkJoinWorkerThread wt;
1188 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1189 <            (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
1190 <            null;
1187 >        Thread t; ForkJoinWorkerThread w;
1188 >        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1189 >                (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) :
1190 >                null);
1191      }
1192  
1193      /**
# Line 1283 | Line 1203 | public abstract class ForkJoinTask<V> im
1203       */
1204      protected static ForkJoinTask<?> pollSubmission() {
1205          Thread t;
1206 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1207 <            ((ForkJoinWorkerThread)t).pool.pollSubmission() : null;
1206 >        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1207 >                ((ForkJoinWorkerThread)t).pool.pollSubmission() : null);
1208      }
1209  
1210      // tag operations
# Line 1308 | Line 1228 | public abstract class ForkJoinTask<V> im
1228       */
1229      public final short setForkJoinTaskTag(short newValue) {
1230          for (int s;;) {
1231 <            if (STATUS.weakCompareAndSet(this, s = status,
1312 <                                         (s & ~SMASK) | (newValue & SMASK)))
1231 >            if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK)))
1232                  return (short)s;
1233          }
1234      }
# Line 1332 | Line 1251 | public abstract class ForkJoinTask<V> im
1251          for (int s;;) {
1252              if ((short)(s = status) != expect)
1253                  return false;
1254 <            if (STATUS.weakCompareAndSet(this, s,
1336 <                                         (s & ~SMASK) | (update & SMASK)))
1254 >            if (casStatus(s, (s & ~SMASK) | (update & SMASK)))
1255                  return true;
1256          }
1257      }
# Line 1398 | Line 1316 | public abstract class ForkJoinTask<V> im
1316          public final Void getRawResult() { return null; }
1317          public final void setRawResult(Void v) { }
1318          public final boolean exec() { runnable.run(); return true; }
1319 <        void internalPropagateException(Throwable ex) {
1320 <            rethrow(ex); // rethrow outside exec() catches.
1319 >        int trySetException(Throwable ex) {
1320 >            int s;
1321 >            if (isExceptionalStatus(s = trySetThrown(ex)))
1322 >                rethrow(ex); // rethrow outside exec() catches.
1323 >            return s;
1324          }
1325          private static final long serialVersionUID = 5232453952276885070L;
1326      }
# Line 1490 | Line 1411 | public abstract class ForkJoinTask<V> im
1411       */
1412      private void writeObject(java.io.ObjectOutputStream s)
1413          throws java.io.IOException {
1414 +        Aux a;
1415          s.defaultWriteObject();
1416 <        s.writeObject(getException());
1416 >        s.writeObject((a = aux) == null ? null : a.ex);
1417      }
1418  
1419      /**
# Line 1506 | Line 1428 | public abstract class ForkJoinTask<V> im
1428          s.defaultReadObject();
1429          Object ex = s.readObject();
1430          if (ex != null)
1431 <            setExceptionalCompletion((Throwable)ex);
1431 >            trySetThrown((Throwable)ex);
1432      }
1433  
1512    // VarHandle mechanics
1513    private static final VarHandle STATUS;
1434      static {
1435          try {
1436              MethodHandles.Lookup l = MethodHandles.lookup();
1437              STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class);
1438 +            AUX = l.findVarHandle(ForkJoinTask.class, "aux", Aux.class);
1439          } catch (ReflectiveOperationException e) {
1440              throw new ExceptionInInitializerError(e);
1441          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines