ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.395 by dl, Sat Feb 6 17:33:47 2021 UTC vs.
Revision 1.396 by dl, Sun Feb 7 15:01:38 2021 UTC

# Line 1692 | Line 1692 | public class ForkJoinPool extends Abstra
1692      // Utilities used by ForkJoinTask
1693  
1694      /**
1695     * Returns true if all workers are busy, possibly creating one if allowed
1696     */
1697    final boolean isSaturated() {
1698        int par = mode & SMASK, maxTotal = bounds >>> SWIDTH;
1699        for (long c;;) {
1700            if (((int)(c = ctl) & ~UNSIGNALLED) != 0)
1701                return false;
1702            if ((short)(c >>> TC_SHIFT) >= maxTotal || par == 0)
1703                return true; // cannot create
1704            long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1705            if (compareAndSetCtl(c, nc))
1706                return !createWorker();
1707        }
1708    }
1709
1710    /**
1695       * Returns true if can start terminating if enabled, or already terminated
1696       */
1697      final boolean canStop() {
# Line 1803 | Line 1787 | public class ForkJoinPool extends Abstra
1787       *
1788       * @param task the task
1789       * @param w caller's WorkQueue
1790 +     * @param canHelp if false, compensate only
1791       * @return task status on exit, or UNCOMPENSATE for compensated blocking
1792       */
1793 <    final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
1793 >    final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) {
1794          int s = 0;
1795          if (task != null && w != null) {
1796              int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
# Line 1820 | Line 1805 | public class ForkJoinPool extends Abstra
1805                      else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
1806                          break;                    // block
1807                  }
1808 <                else {                            // scan for subtasks
1808 >                else if (canHelp) {               // scan for subtasks
1809                      WorkQueue[] qs = queues;
1810                      int n = (qs == null) ? 0 : qs.length, m = n - 1;
1811                      for (int i = n; i > 0; i -= 2, r += 2) {
# Line 2179 | Line 2164 | public class ForkJoinPool extends Abstra
2164      }
2165  
2166      /**
2167 +     * Returns queue for an external thread, if one exists
2168 +     */
2169 +    final WorkQueue externalQueue() {
2170 +        WorkQueue[] qs;
2171 +        int r = ThreadLocalRandom.getProbe(), n;
2172 +        return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
2173 +            qs[(n - 1) & (r << 1)] : null;
2174 +    }
2175 +
2176 +    /**
2177       * If the given executor is a ForkJoinPool, poll and execute
2178       * AsynchronousCompletionTasks from worker's queue until none are
2179       * available or blocker is released.
# Line 2189 | Line 2184 | public class ForkJoinPool extends Abstra
2184              if ((wt = (ForkJoinWorkerThread)t).pool == e)
2185                  w = wt.workQueue;
2186          }
2187 <        else if (e == common)
2188 <            w = commonQueue();
2187 >        else if (e instanceof ForkJoinPool)
2188 >            w = ((ForkJoinPool)e).externalQueue();
2189          if (w != null)
2190              w.helpAsyncBlocker(blocker);
2191      }
# Line 2681 | Line 2676 | public class ForkJoinPool extends Abstra
2676                  externalSubmit(f);
2677              }
2678              for (int i = futures.size() - 1; i >= 0; --i)
2679 <                ((ForkJoinTask<?>)futures.get(i)).tryJoinForPoolInvoke(this);
2679 >                ((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this);
2680              return futures;
2681          } catch (Throwable t) {
2682              for (Future<T> e : futures)
# Line 2711 | Line 2706 | public class ForkJoinPool extends Abstra
2706                      if (timedOut)
2707                          ForkJoinTask.cancelIgnoringExceptions(f);
2708                      else {
2709 <                        try {
2715 <                            ((ForkJoinTask<T>)f).getForPoolInvoke(this, ns);
2716 <                        } catch (CancellationException | TimeoutException |
2717 <                                 ExecutionException ok) {
2718 <                        }
2709 >                        ((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
2710                          if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2711                              timedOut = true;
2712                      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines