ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.393 by dl, Wed Feb 3 12:41:58 2021 UTC vs.
Revision 1.394 by dl, Fri Feb 5 15:20:03 2021 UTC

# Line 1023 | Line 1023 | public class ForkJoinPool extends Abstra
1023                      a[k = (cap - 1) & (s - 1)] != task)
1024                      break;
1025                  if (tryLock()) {
1026 <                    if (top == s && array == a &&
1027 <                        (taken = casSlotToNull(a, k, task)))
1028 <                        top = s - 1;
1029 <                    source = 0; // release lock
1030 <                    break;
1026 >                    if (top == s && array == a) {
1027 >                        if (taken = casSlotToNull(a, k, task)) {
1028 >                            top = s - 1;
1029 >                            source = 0;
1030 >                            break;
1031 >                        }
1032 >                    }
1033 >                    source = 0; // release lock for retry
1034                  }
1035                  Thread.yield(); // trylock failure
1036              }
# Line 1171 | Line 1174 | public class ForkJoinPool extends Abstra
1174                                  top = s;
1175                              source = 0;
1176                          }
1177 +                        if (taken)
1178 +                            t.doExec();
1179 +                        else if (!owned)
1180 +                            Thread.yield(); // tryLock failure
1181                          break;
1182                      }
1183                      else if ((f = f.completer) == null)
1184                          break;
1185                  }
1186 <                if (!taken)
1180 <                    break;
1181 <                t.doExec();
1182 <                if (limit != 0 && --limit == 0)
1186 >                if (taken && limit != 0 && --limit == 0)
1187                      break;
1188              }
1189              return status;
# Line 1563 | Line 1567 | public class ForkJoinPool extends Abstra
1567       * @param w caller's WorkQueue (may be null on failed initialization)
1568       */
1569      final void runWorker(WorkQueue w) {
1570 <        if (w != null) {                        // skip on failed init
1570 >        if (mode >= 0 && w != null) {           // skip on failed init
1571              w.config |= SRC;                    // mark as valid source
1572              int r = w.stackPred, src = 0;       // use seed from registerWorker
1573              do {
# Line 2272 | Line 2276 | public class ForkJoinPool extends Abstra
2276                  return false;
2277              md = getAndBitwiseOrMode(STOP);
2278          }
2279 <        for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates
2280 <            for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
2279 >        for (boolean rescan = true;;) { // repeat until no changes
2280 >            boolean changed = false;
2281 >            for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
2282 >                changed = true;
2283                  ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
2284 +            }
2285              WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2286              if ((qs = queues) != null && (n = qs.length) > 0) {
2287                  for (int j = 1; j < n; j += 2) { // unblock other workers
2288                      if ((q = qs[j]) != null && (thread = q.owner) != null &&
2289                          !thread.isInterrupted()) {
2290 +                        changed = true;
2291                          try {
2292                              thread.interrupt();
2293                          } catch (Throwable ignore) {
# Line 2297 | Line 2305 | public class ForkJoinPool extends Abstra
2305                      cond.signalAll();
2306                  lock.unlock();
2307              }
2308 +            if (changed)
2309 +                rescan = true;
2310 +            else if (rescan)
2311 +                rescan = false;
2312 +            else
2313 +                break;
2314          }
2315          return true;
2316      }
# Line 2728 | Line 2742 | public class ForkJoinPool extends Abstra
2742          }
2743          final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2744              Throwable ex = null;
2745 <            boolean failed = (c == null || isCancelled() ||
2746 <                              (pool != null && pool.mode < 0));
2747 <            if (!failed && !isDone()) {
2745 >            boolean failed;
2746 >            if (c == null || Thread.interrupted() ||
2747 >                (pool != null && pool.mode < 0))
2748 >                failed = true;
2749 >            else if (isDone())
2750 >                failed = false;
2751 >            else {
2752                  try {
2753                      complete(c.call());
2754 +                    failed = false;
2755                  } catch (Throwable tx) {
2756                      ex = tx;
2757                      failed = true;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines