ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.389 by dl, Sat Dec 26 13:20:19 2020 UTC vs.
Revision 1.390 by dl, Sun Dec 27 17:51:00 2020 UTC

# Line 1557 | Line 1557 | public class ForkJoinPool extends Abstra
1557       * @param w caller's WorkQueue (may be null on failed initialization)
1558       */
1559      final void runWorker(WorkQueue w) {
1560 <        if (mode >= 0 && w != null) {           // skip on failed init
1560 >        if (w != null) {                        // skip on failed init
1561              w.config |= SRC;                    // mark as valid source
1562              int r = w.stackPred, src = 0;       // use seed from registerWorker
1563              do {
# Line 2653 | Line 2653 | public class ForkJoinPool extends Abstra
2653                  ForkJoinTask<T> f =
2654                      new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2655                  futures.add(f);
2656 <                if (isSaturated())
2656 >                if (!isSaturated())
2657 >                    externalSubmit(f);
2658 >                else if ((mode & SHUTDOWN) == 0)
2659                      f.doExec();
2660                  else
2661 <                    externalSubmit(f);
2661 >                    throw new RejectedExecutionException();
2662              }
2663              for (int i = futures.size() - 1; i >= 0; --i)
2664                  ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
# Line 2679 | Line 2681 | public class ForkJoinPool extends Abstra
2681                  ForkJoinTask<T> f =
2682                      new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2683                  futures.add(f);
2684 <                if (isSaturated())
2684 >                if (!isSaturated())
2685 >                    externalSubmit(f);
2686 >                else if ((mode & SHUTDOWN) == 0)
2687                      f.doExec();
2688                  else
2689 <                    externalSubmit(f);
2689 >                    throw new RejectedExecutionException();
2690              }
2691              long startTime = System.nanoTime(), ns = nanos;
2692              boolean timedOut = (ns < 0L);
# Line 2716 | Line 2720 | public class ForkJoinPool extends Abstra
2720          @SuppressWarnings("serial") // Conditionally serializable
2721          volatile E result;
2722          final AtomicInteger count;              // in case all throw
2723 <        InvokeAnyRoot(int n) { count = new AtomicInteger(n); }
2723 >        final ForkJoinPool pool;                // to check shutdown while collecting
2724 >        InvokeAnyRoot(int n, ForkJoinPool p) { pool = p; count = new AtomicInteger(n); }
2725          final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2726              Throwable ex = null;
2727 <            boolean failed = false;
2728 <            if (c != null) {                    // raciness OK
2729 <                if (isCancelled())
2727 >            boolean failed = (c == null || isCancelled() || (pool != null && pool.mode < 0));
2728 >            if (!failed && !isDone()) {
2729 >                try {
2730 >                    complete(c.call());
2731 >                } catch (Throwable tx) {
2732 >                    ex = tx;
2733                      failed = true;
2726                else if (!isDone()) {
2727                    try {
2728                        complete(c.call());
2729                    } catch (Throwable tx) {
2730                        ex = tx;
2731                        failed = true;
2732                    }
2734                  }
2735              }
2736 <            if (failed && count.getAndDecrement() <= 1)
2736 >            if ((pool != null && pool.mode < 0) || (failed && count.getAndDecrement() <= 1))
2737                  trySetThrown(ex != null ? ex : new CancellationException());
2738          }
2739          public final boolean exec()         { return false; } // never forked
# Line 2780 | Line 2781 | public class ForkJoinPool extends Abstra
2781          int n = tasks.size();
2782          if (n <= 0)
2783              throw new IllegalArgumentException();
2784 <        InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n);
2784 >        InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2785          ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2785        for (Callable<T> c : tasks) {
2786            if (c == null)
2787                throw new NullPointerException();
2788            InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2789            fs.add(f);
2790            if (isSaturated())
2791                f.doExec();
2792            else
2793                externalSubmit(f);
2794            if (root.isDone())
2795                break;
2796        }
2786          try {
2787 +            for (Callable<T> c : tasks) {
2788 +                if (c == null)
2789 +                    throw new NullPointerException();
2790 +                InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2791 +                fs.add(f);
2792 +                if (!isSaturated())
2793 +                    externalSubmit(f);
2794 +                else if ((mode & SHUTDOWN) == 0)
2795 +                    f.doExec();
2796 +                else
2797 +                    throw new RejectedExecutionException();
2798 +                if (root.isDone())
2799 +                    break;
2800 +            }
2801              return root.get();
2802          } finally {
2803              for (InvokeAnyTask<T> f : fs)
# Line 2810 | Line 2813 | public class ForkJoinPool extends Abstra
2813          int n = tasks.size();
2814          if (n <= 0)
2815              throw new IllegalArgumentException();
2816 <        InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n);
2816 >        InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2817          ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2815        for (Callable<T> c : tasks) {
2816            if (c == null)
2817                throw new NullPointerException();
2818            InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2819            fs.add(f);
2820            if (isSaturated())
2821                f.doExec();
2822            else
2823                externalSubmit(f);
2824            if (root.isDone())
2825                break;
2826        }
2818          try {
2819 +            for (Callable<T> c : tasks) {
2820 +                if (c == null)
2821 +                    throw new NullPointerException();
2822 +                InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2823 +                fs.add(f);
2824 +                if (!isSaturated())
2825 +                    externalSubmit(f);
2826 +                else if ((mode & SHUTDOWN) == 0)
2827 +                    f.doExec();
2828 +                else
2829 +                    throw new RejectedExecutionException();
2830 +                if (root.isDone())
2831 +                    break;
2832 +            }
2833              return root.get(nanos, TimeUnit.NANOSECONDS);
2834          } finally {
2835              for (InvokeAnyTask<T> f : fs)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines