ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.125 by dl, Sat Jan 18 12:30:04 2020 UTC vs.
Revision 1.126 by dl, Mon Jan 20 15:51:54 2020 UTC

# Line 276 | Line 276 | public abstract class ForkJoinTask<V> im
276       * @param pool if nonull, pool to uncompensate when unblocking
277       * @return status on exit, or INTRPT if interrupted while waiting
278       */
279 <    final int awaitDone(boolean interruptible, long deadline,
280 <                        ForkJoinPool pool) {
281 <        int s; Aux node = null; boolean interrupted = false, queued = false;
282 <        for (;;) {
283 <            Aux a; long nanos;
284 <            if ((s = status) < 0)
285 <                break;
286 <            else if (node == null)
287 <                node = new Aux(Thread.currentThread(), null);
288 <            else if (!queued) {
289 <                if ((a = aux) != null && a.ex != null)
290 <                    Thread.onSpinWait(); // exception in progress
291 <                else if (queued = casAux(node.next = a, node))
292 <                    LockSupport.setCurrentBlocker(this);
293 <            }
294 <            else {
295 <                if (deadline == 0L)
296 <                    LockSupport.park();
297 <                else if ((nanos = deadline - System.nanoTime()) > 0L)
298 <                    LockSupport.parkNanos(nanos);
299 <                else {
300 <                    s = 0;               // timeout
279 >    private int awaitDone(boolean interruptible, long deadline,
280 >                          ForkJoinPool pool) {
281 >        int s;
282 >        try {
283 >            Aux node = null; boolean interrupted = false, queued = false;
284 >            for (;;) {
285 >                Aux a; long nanos;
286 >                if ((s = status) < 0)
287                      break;
288 +                else if (node == null)
289 +                    node = new Aux(Thread.currentThread(), null);
290 +                else if (!queued) {
291 +                    if ((a = aux) != null && a.ex != null)
292 +                        Thread.onSpinWait(); // exception in progress
293 +                    else if (queued = casAux(node.next = a, node))
294 +                        LockSupport.setCurrentBlocker(this);
295                  }
296 <                if ((interrupted |= Thread.interrupted()) && interruptible) {
297 <                    s = INTRPT;
298 <                    break;
296 >                else {
297 >                    if (deadline == 0L)
298 >                        LockSupport.park();
299 >                    else if ((nanos = deadline - System.nanoTime()) > 0L)
300 >                        LockSupport.parkNanos(nanos);
301 >                    else {
302 >                        s = 0;               // timeout
303 >                        break;
304 >                    }
305 >                    if ((interrupted |= Thread.interrupted()) && interruptible) {
306 >                        s = INTRPT;
307 >                        break;
308 >                    }
309                  }
310              }
311 <        }
312 <        if (pool != null)
313 <            pool.uncompensate();
314 <        if (s >= 0) {                     // try to unsplice after cancellation
315 <            outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
316 <                for (Aux trail = null;;) {
317 <                    Aux next = a.next;
318 <                    if (a == node) {
319 <                        if (trail != null)
320 <                            trail.casNext(trail, next);
321 <                        else if (casAux(a, next))
322 <                            break outer; // cannot be re-encountered
323 <                        break;           // restart
324 <                    } else {
325 <                        trail = a;
326 <                        if ((a = next) == null)
327 <                            break outer;
311 >            if (queued) {
312 >                LockSupport.setCurrentBlocker(null);
313 >                if (s >= 0) {            // try to unsplice after cancellation
314 >                    outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
315 >                        for (Aux trail = null;;) {
316 >                            Aux next = a.next;
317 >                            if (a == node) {
318 >                                if (trail != null)
319 >                                    trail.casNext(trail, next);
320 >                                else if (casAux(a, next))
321 >                                    break outer; // cannot be re-encountered
322 >                                break;           // restart
323 >                            } else {
324 >                                trail = a;
325 >                                if ((a = next) == null)
326 >                                    break outer;
327 >                            }
328 >                        }
329                      }
330                  }
331 +                else {
332 +                    signalWaiters();             // help clean or signal
333 +                    if (interrupted)
334 +                        Thread.currentThread().interrupt();
335 +                }
336              }
337 <        }
338 <        else if (interrupted)
339 <            Thread.currentThread().interrupt();
331 <        if (queued) {
332 <            LockSupport.setCurrentBlocker(null);
333 <            signalWaiters();             // help clean or signal
337 >        } finally { // for sake of OOME on node construction
338 >            if (pool != null)
339 >                pool.uncompensate();
340          }
341          return s;
342      }
# Line 417 | Line 423 | public abstract class ForkJoinTask<V> im
423      }
424  
425      /**
426 <     * Helps and/or waits for completion. Overridable in subclasses.
426 >     * Helps and/or waits for completion.
427       *
422     * @param interruptible true if wait can be cancelled by interrupt
428       * @param ran true if task known to be invoked
429 +     * @param interruptible true if wait can be cancelled by interrupt
430 +     * @param deadline if non-zero use timed waits and possibly timeout
431       * @return status on exit, or INTRPT if interruptible and interrupted
432       */
433 <    int awaitJoin(boolean interruptible, boolean ran) {
433 >    private int helpOrWait(boolean ran, boolean interruptible, long deadline) {
434 >        boolean cc = (this instanceof CountedCompleter);
435          Thread t; ForkJoinWorkerThread wt;
436 <        ForkJoinPool.WorkQueue q = null;
437 <        ForkJoinPool p = null;
438 <        boolean unforked = false;
431 <        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
436 >        ForkJoinPool.WorkQueue q; ForkJoinPool p; boolean owned; int s;
437 >        if (owned = ((t = Thread.currentThread())
438 >                     instanceof ForkJoinWorkerThread)) {
439              p = (wt = (ForkJoinWorkerThread)t).pool;
440              q = wt.workQueue;
434            if (!ran && q != null && q.tryRemove(this))
435                unforked = true;
441          }
442 <        else if (!ran && (q = ForkJoinPool.commonQueue()) != null &&
443 <                 q.externalTryUnpush(this))
444 <            unforked = true;
440 <        int s;
441 <        if (unforked && (s = doExec()) < 0)
442 <            return s;
443 <        if (p != null) {
444 <            if ((s = p.helpJoin(this, q)) < 0)
445 <                return s;
446 <            if (s != ADJUST) // uncompensated
447 <                p = null;
442 >        else {
443 >            p = ForkJoinPool.common;
444 >            q = ForkJoinPool.commonQueue();
445          }
446 <        return awaitDone(interruptible, 0L, p);
446 >        if (p == null || q == null)
447 >            s = 0;
448 >        else if (cc)
449 >            s = p.helpComplete(this, q, owned);
450 >        else if (ran || !q.tryRemove(this, owned) || (s = doExec()) >= 0)
451 >            s = owned? p.helpJoin(this, q) : 0;
452 >        return (s < 0) ? s : awaitDone(interruptible, deadline,
453 >                                       (s == ADJUST) ? p : null);
454      }
455  
456      /**
# Line 577 | Line 581 | public abstract class ForkJoinTask<V> im
581      public final V join() {
582          int s;
583          if ((s = status) >= 0)
584 <            s = awaitJoin(false, false);
584 >            s = helpOrWait(false, false, 0L);
585          if ((s & ABNORMAL) != 0)
586              reportException(s);
587          return getRawResult();
# Line 594 | Line 598 | public abstract class ForkJoinTask<V> im
598      public final V invoke() {
599          int s;
600          if ((s = doExec()) >= 0)
601 <            s = awaitJoin(false, true);
601 >            s = helpOrWait(false, true, 0L);
602          if ((s & ABNORMAL) != 0)
603              reportException(s);
604          return getRawResult();
# Line 623 | Line 627 | public abstract class ForkJoinTask<V> im
627              throw new NullPointerException();
628          t2.fork();
629          if ((s1 = t1.doExec()) >= 0)
630 <            s1 = t1.awaitJoin(false, true);
631 <        if ((s1 & ABNORMAL) != 0) {
628 <            t2.cancel(false);
630 >            s1 = t1.helpOrWait(true, false, 0L);
631 >        if ((s1 & ABNORMAL) != 0)
632              t1.reportException(s1);
633 <        }
634 <        else {
635 <            if ((s2 = t2.status) >= 0)
636 <                s2 = t2.awaitJoin(false, false);
634 <            if ((s2 & ABNORMAL) != 0)
635 <                t2.reportException(s2);
636 <        }
633 >        if ((s2 = t2.status) >= 0)
634 >            s2 = t2.helpOrWait(false, false, 0L);
635 >        if ((s2 & ABNORMAL) != 0)
636 >            t2.reportException(s2);
637      }
638  
639      /**
# Line 662 | Line 662 | public abstract class ForkJoinTask<V> im
662              }
663              if (i == 0) {
664                  if ((s = t.doExec()) >= 0)
665 <                    s = t.awaitJoin(false, true);
665 >                    s = t.helpOrWait(true, false, 0L);
666                  if ((s & ABNORMAL) != 0)
667                      ex = t.getException();
668                  break;
# Line 674 | Line 674 | public abstract class ForkJoinTask<V> im
674                  ForkJoinTask<?> t;
675                  if ((t = tasks[i]) != null) {
676                      if ((s = t.status) >= 0)
677 <                        s = t.awaitJoin(false, false);
677 >                        s = t.helpOrWait(false, false, 0L);
678                      if ((s & ABNORMAL) != 0) {
679                          ex = t.getException();
680                          break;
# Line 682 | Line 682 | public abstract class ForkJoinTask<V> im
682                  }
683              }
684          }
685 <        if (ex != null) { // try to cancel others
686 <            for (int i = 0, s; i <= last; ++i) {
687 <                ForkJoinTask<?> t;
688 <                if ((t = tasks[i]) != null)
689 <                    t.cancel(false);
690 <            }
685 >        if (ex != null)
686              rethrow(ex);
692        }
687      }
688  
689      /**
# Line 728 | Line 722 | public abstract class ForkJoinTask<V> im
722              }
723              if (i == 0) {
724                  if ((s = t.doExec()) >= 0)
725 <                    s = t.awaitJoin(false, true);
725 >                    s = t.helpOrWait(true, false, 0L);
726                  if ((s & ABNORMAL) != 0)
727                      ex = t.getException();
728                  break;
# Line 740 | Line 734 | public abstract class ForkJoinTask<V> im
734                  ForkJoinTask<?> t;
735                  if ((t = ts.get(i)) != null) {
736                      if ((s = t.status) >= 0)
737 <                        s = t.awaitJoin(false, false);
737 >                        s = t.helpOrWait(false, false, 0L);
738                      if ((s & ABNORMAL) != 0) {
739                          ex = t.getException();
740                          break;
# Line 748 | Line 742 | public abstract class ForkJoinTask<V> im
742                  }
743              }
744          }
745 <        if (ex != null) {
752 <            for (int i = 0, s; i <= last; ++i) {
753 <                ForkJoinTask<?> t;
754 <                if ((t = ts.get(i)) != null)
755 <                    t.cancel(false);
756 <            }
745 >        if (ex != null)
746              rethrow(ex);
758        }
747          return tasks;
748      }
749  
# Line 903 | Line 891 | public abstract class ForkJoinTask<V> im
891          if (Thread.interrupted())
892              s = INTRPT;
893          else if ((s = status) >= 0)
894 <            s = awaitJoin(true, false);
894 >            s = helpOrWait(false, true, 0L);
895          if (s == INTRPT)
896              throw new InterruptedException();
897          else if ((s & THROWN) != 0)
# Line 936 | Line 924 | public abstract class ForkJoinTask<V> im
924              s = INTRPT;
925          else if ((s = status) >= 0 && nanos > 0L) {
926              long d = nanos + System.nanoTime();
927 <            long deadline = (d == 0L) ? 1L : d; // avoid 0
940 <            Thread t = Thread.currentThread();
941 <            ForkJoinPool p = (t instanceof ForkJoinWorkerThread) ?
942 <                ((ForkJoinWorkerThread)t).pool : ForkJoinPool.common;
943 <            if (p != null && p.preCompensate() == 0)
944 <                p = null;
945 <            s = awaitDone(true, deadline, p);
927 >            s = helpOrWait(false, true, (d == 0L) ? 1L : d); // avoid 0
928          }
929  
930          if (s == INTRPT)
# Line 965 | Line 947 | public abstract class ForkJoinTask<V> im
947       */
948      public final void quietlyJoin() {
949          if (status >= 0)
950 <            awaitJoin(false, false);
950 >            helpOrWait(false, false, 0L);
951      }
952  
953      /**
# Line 975 | Line 957 | public abstract class ForkJoinTask<V> im
957       */
958      public final void quietlyInvoke() {
959          if (doExec() >= 0)
960 <            awaitJoin(false, true);
960 >            helpOrWait(true, false, 0L);
961      }
962  
963      /**
# Line 1053 | Line 1035 | public abstract class ForkJoinTask<V> im
1035       * @return {@code true} if unforked
1036       */
1037      public boolean tryUnfork() {
1038 <        Thread t; ForkJoinPool.WorkQueue q;
1039 <        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1040 <                (q = ((ForkJoinWorkerThread)t).workQueue) != null &&
1041 <                q.tryUnpush(this) :
1042 <                (q = ForkJoinPool.commonQueue()) != null &&
1043 <                q.externalTryUnpush(this));
1038 >        Thread t; boolean owned;
1039 >        ForkJoinPool.WorkQueue q = ((owned = (t = Thread.currentThread())
1040 >                                     instanceof ForkJoinWorkerThread) ?
1041 >                                    ((ForkJoinWorkerThread)t).workQueue :
1042 >                                    ForkJoinPool.commonQueue());
1043 >        return q != null && q.tryUnpush(this, owned);
1044      }
1045  
1046      /**
# Line 1315 | Line 1297 | public abstract class ForkJoinTask<V> im
1297          public final void setRawResult(Void v) { }
1298          public final boolean exec() { runnable.run(); return true; }
1299          int trySetException(Throwable ex) {
1300 <            int s;
1301 <            if (isExceptionalStatus(s = trySetThrown(ex)))
1302 <                rethrow(ex); // rethrow outside exec() catches.
1300 >            int s; // if runnable has a handler, invoke it
1301 >            if (isExceptionalStatus(s = trySetThrown(ex)) &&
1302 >                runnable instanceof java.lang.Thread.UncaughtExceptionHandler) {
1303 >                try {
1304 >                    ((java.lang.Thread.UncaughtExceptionHandler)runnable).
1305 >                        uncaughtException(Thread.currentThread(), ex);
1306 >                } catch (Throwable ignore) {
1307 >                }
1308 >            }
1309              return s;
1310          }
1311          private static final long serialVersionUID = 5232453952276885070L;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines