ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.59 by dl, Fri Jul 23 14:09:17 2010 UTC vs.
Revision 1.60 by dl, Sat Jul 24 20:28:18 2010 UTC

# Line 52 | Line 52 | import java.util.concurrent.CountDownLat
52   * convenient form for informal monitoring.
53   *
54   * <p> As is the case with other ExecutorServices, there are three
55 < * main task execution methods summarized in the follwoing
55 > * main task execution methods summarized in the following
56   * table. These are designed to be used by clients not already engaged
57   * in fork/join computations in the current pool.  The main forms of
58   * these methods accept instances of {@code ForkJoinTask}, but
# Line 138 | Line 138 | public class ForkJoinPool extends Abstra
138       * cache pollution effects.)
139       *
140       * Beyond work-stealing support and essential bookkeeping, the
141 <     * main responsibility of this framework is to arrange tactics for
142 <     * when one worker is waiting to join a task stolen (or always
143 <     * held by) another.  Becauae we are multiplexing many tasks on to
144 <     * a pool of workers, we can't just let them block (as in
145 <     * Thread.join).  We also cannot just reassign the joiner's
146 <     * run-time stack with another and replace it later, which would
147 <     * be a form of "continuation", that even if possible is not
148 <     * necessarily a good idea. Given that the creation costs of most
149 <     * threads on most systems mainly surrounds setting up runtime
150 <     * stacks, thread creation and switching is usually not much more
151 <     * expensive than stack creation and switching, and is more
152 <     * flexible). Instead we combine two tactics:
141 >     * main responsibility of this framework is to take actions when
142 >     * one worker is waiting to join a task stolen (or always held by)
143 >     * another.  Becauae we are multiplexing many tasks on to a pool
144 >     * of workers, we can't just let them block (as in Thread.join).
145 >     * We also cannot just reassign the joiner's run-time stack with
146 >     * another and replace it later, which would be a form of
147 >     * "continuation", that even if possible is not necessarily a good
148 >     * idea. Given that the creation costs of most threads on most
149 >     * systems mainly surrounds setting up runtime stacks, thread
150 >     * creation and switching is usually not much more expensive than
151 >     * stack creation and switching, and is more flexible). Instead we
152 >     * combine two tactics:
153       *
154 <     *   1. Arranging for the joiner to execute some task that it
154 >     *   Helping: Arranging for the joiner to execute some task that it
155       *      would be running if the steal had not occurred.  Method
156       *      ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
157       *      links to try to find such a task.
158       *
159 <     *   2. Unless there are already enough live threads, creating or
160 <     *      or re-activating a spare thread to compensate for the
161 <     *      (blocked) joiner until it unblocks.  Spares then suspend
162 <     *      at their next opportunity or eventually die if unused for
163 <     *      too long.  See below and the internal documentation
164 <     *      for tryAwaitJoin for more details about compensation
165 <     *      rules.
159 >     *   Compensating: Unless there are already enough live threads,
160 >     *      creating or or re-activating a spare thread to compensate
161 >     *      for the (blocked) joiner until it unblocks.  Spares then
162 >     *      suspend at their next opportunity or eventually die if
163 >     *      unused for too long.  See below and the internal
164 >     *      documentation for tryAwaitJoin for more details about
165 >     *      compensation rules.
166       *
167       * Because the determining existence of conservatively safe
168       * helping targets, the availability of already-created spares,
# Line 173 | Line 173 | public class ForkJoinPool extends Abstra
173       * increases application footprint, so we try to avoid it, within
174       * reason.
175       *
176 <     * The ManagedBlocker extension API can't use option (1) so uses a
177 <     * special version of (2) in method awaitBlocker.
176 >     * The ManagedBlocker extension API can't use helping so uses a
177 >     * special version of compensation in method awaitBlocker.
178       *
179       * The main throughput advantages of work-stealing stem from
180       * decentralized control -- workers mostly steal tasks from each
# Line 497 | Line 497 | public class ForkJoinPool extends Abstra
497       * making decisions about creating and suspending spare
498       * threads. Updated only by CAS. Note that adding a new worker
499       * requires incrementing both counts, since workers start off in
500 <     * running state.  This field is also used for memory-fencing
501 <     * configuration parameters.
500 >     * running state.
501       */
502      private volatile int workerCounts;
503  
# Line 657 | Line 656 | public class ForkJoinPool extends Abstra
656          try {
657              w = factory.newThread(this);
658          } finally { // Adjust on either null or exceptional factory return
659 <            if (w == null) {
659 >            if (w == null)
660                  onWorkerCreationFailure();
662                return null;
663            }
661          }
662 <        w.start(recordWorker(w), ueh);
662 >        if (w != null)
663 >            w.start(recordWorker(w), ueh);
664          return w;
665      }
666  
# Line 672 | Line 670 | public class ForkJoinPool extends Abstra
670      private void onWorkerCreationFailure() {
671          for (;;) {
672              int wc = workerCounts;
673 <            if ((wc >>> TOTAL_COUNT_SHIFT) == 0)
674 <                Thread.yield(); // wait for other counts to settle
673 >            int rc = wc & RUNNING_COUNT_MASK;
674 >            int tc = wc >>> TOTAL_COUNT_SHIFT;
675 >            if (rc == 0 || wc == 0)
676 >                Thread.yield(); // must wait for other counts to settle
677              else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
678                                                wc - (ONE_RUNNING|ONE_TOTAL)))
679                  break;
# Line 682 | Line 682 | public class ForkJoinPool extends Abstra
682      }
683  
684      /**
685 <     * Creates and/or resumes enough workers to establish target
686 <     * parallelism, giving up if terminating or addWorker fails
687 <     *
688 <     * TODO: recast this to support lazier creation and automated
689 <     * parallelism maintenance
685 >     * Creates enough total workers to establish target parallelism,
686 >     * giving up if terminating or addWorker fails
687       */
688 <    private void ensureEnoughWorkers() {
689 <        while ((runState & TERMINATING) == 0) {
690 <            int pc = parallelism;
691 <            int wc = workerCounts;
692 <            int rc = wc & RUNNING_COUNT_MASK;
693 <            int tc = wc >>> TOTAL_COUNT_SHIFT;
694 <            if (tc < pc) {
698 <                if (UNSAFE.compareAndSwapInt
699 <                    (this, workerCountsOffset,
700 <                     wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
701 <                    addWorker() == null)
702 <                    break;
703 <            }
704 <            else if (tc > pc && rc < pc &&
705 <                     tc > (runState & ACTIVE_COUNT_MASK)) {
706 <                ForkJoinWorkerThread spare = null;
707 <                ForkJoinWorkerThread[] ws = workers;
708 <                int nws = ws.length;
709 <                for (int i = 0; i < nws; ++i) {
710 <                    ForkJoinWorkerThread w = ws[i];
711 <                    if (w != null && w.isSuspended()) {
712 <                        if ((workerCounts & RUNNING_COUNT_MASK) > pc)
713 <                            return;
714 <                        if (w.tryResumeSpare())
715 <                            incrementRunningCount();
716 <                        break;
717 <                    }
718 <                }
719 <            }
720 <            else
688 >    private void ensureEnoughTotalWorkers() {
689 >        int wc;
690 >        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < parallelism &&
691 >               runState < TERMINATING) {
692 >            if ((UNSAFE.compareAndSwapInt(this, workerCountsOffset,
693 >                                          wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
694 >                 addWorker() == null))
695                  break;
696          }
697      }
# Line 744 | Line 718 | public class ForkJoinPool extends Abstra
718          for (;;) {
719              int wc = workerCounts;
720              int rc = wc & RUNNING_COUNT_MASK;
721 <            if (rc - nr < 0 || (wc >>> TOTAL_COUNT_SHIFT) == 0)
721 >            int tc = wc >>> TOTAL_COUNT_SHIFT;
722 >            if (rc - nr < 0 || tc == 0)
723                  Thread.yield(); // back off if waiting for other updates
724              else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
725                                                wc, wc - unit))
# Line 753 | Line 728 | public class ForkJoinPool extends Abstra
728  
729          accumulateStealCount(w); // collect final count
730          if (!tryTerminate(false))
731 <            ensureEnoughWorkers();
731 >            ensureEnoughTotalWorkers();
732      }
733  
734      // Waiting for and signalling events
# Line 769 | Line 744 | public class ForkJoinPool extends Abstra
744              int n = ws.length;
745              for (;;) {
746                  int i = ((int)(top & WAITER_ID_MASK)) - 1;
747 <                if (i < 0 || (int)(top >>> EVENT_COUNT_SHIFT) == eventCount)
747 >                int e = (int)(top >>> EVENT_COUNT_SHIFT);
748 >                if (i < 0 || e == eventCount)
749                      return;
750                  ForkJoinWorkerThread w;
751                  if (i < n && (w = ws[i]) != null &&
# Line 809 | Line 785 | public class ForkJoinPool extends Abstra
785              int n = ws.length;
786              for (;;) {
787                  int i = ((int)(top & WAITER_ID_MASK)) - 1;
788 <                if (i < 0 || (int)(top >>> EVENT_COUNT_SHIFT) == ec)
788 >                int e = (int)(top >>> EVENT_COUNT_SHIFT);
789 >                if (i < 0 || e == ec)
790                      return;
791                  ForkJoinWorkerThread w;
792                  if (i < n && (w = ws[i]) != null &&
# Line 826 | Line 803 | public class ForkJoinPool extends Abstra
803      }
804  
805      /**
806 <     * If worker is inactive, blocks until terminating or event count
807 <     * advances from last value held by worker; in any case helps
831 <     * release others.
806 >     * Blockss worker until terminating or event count
807 >     * advances from last value held by worker
808       *
809       * @param w the calling worker thread
834     * @param retries the number of scans by caller failing to find work
835     * @return false if now too many threads running
810       */
811 <    private boolean eventSync(ForkJoinWorkerThread w, int retries) {
811 >    private void eventSync(ForkJoinWorkerThread w) {
812          int wec = w.lastEventCount;
813 <        if (retries > 1) { // can only block after 2nd miss
814 <            long nextTop = (((long)wec << EVENT_COUNT_SHIFT) |
815 <                            ((long)(w.poolIndex + 1)));
816 <            long top;
817 <            while ((runState < SHUTDOWN || !tryTerminate(false)) &&
818 <                   (((int)(top = eventWaiters) & WAITER_ID_MASK) == 0 ||
819 <                    (int)(top >>> EVENT_COUNT_SHIFT) == wec) &&
820 <                   eventCount == wec) {
821 <                if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
822 <                                              w.nextWaiter = top, nextTop)) {
823 <                    accumulateStealCount(w); // transfer steals while idle
824 <                    Thread.interrupted();    // clear/ignore interrupt
825 <                    while (eventCount == wec)
826 <                        w.doPark();
853 <                    break;
854 <                }
813 >        long nextTop = (((long)wec << EVENT_COUNT_SHIFT) |
814 >                        ((long)(w.poolIndex + 1)));
815 >        long top;
816 >        while ((runState < SHUTDOWN || !tryTerminate(false)) &&
817 >               (((int)(top = eventWaiters) & WAITER_ID_MASK) == 0 ||
818 >                (int)(top >>> EVENT_COUNT_SHIFT) == wec) &&
819 >               eventCount == wec) {
820 >            if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
821 >                                          w.nextWaiter = top, nextTop)) {
822 >                accumulateStealCount(w); // transfer steals while idle
823 >                Thread.interrupted();    // clear/ignore interrupt
824 >                while (eventCount == wec)
825 >                    w.doPark();
826 >                break;
827              }
856            wec = eventCount;
828          }
829 <        releaseWaiters();
859 <        int wc = workerCounts;
860 <        if ((wc & RUNNING_COUNT_MASK) <= parallelism) {
861 <            w.lastEventCount = wec;
862 <            return true;
863 <        }
864 <        if (wec != w.lastEventCount) // back up if may re-wait
865 <            w.lastEventCount = wec - (wc >>> TOTAL_COUNT_SHIFT);
866 <        return false;
829 >        w.lastEventCount = eventCount;
830      }
831  
832      /**
# Line 889 | Line 852 | public class ForkJoinPool extends Abstra
852       */
853      final void preStep(ForkJoinWorkerThread w, int retries) {
854          boolean active = w.active;
855 <        boolean inactivate = active && retries != 0;
855 >        boolean inactivate = active && retries > 0;
856          for (;;) {
857              int rs, wc;
858              if (inactivate &&
# Line 897 | Line 860 | public class ForkJoinPool extends Abstra
860                                           rs = runState, rs - ONE_ACTIVE))
861                  inactivate = active = w.active = false;
862              if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= parallelism) {
863 <                if (active || eventSync(w, retries))
864 <                    break;
863 >                if (retries > 0) {
864 >                    if (retries > 1 && !active)
865 >                        eventSync(w);
866 >                    releaseWaiters();
867 >                }
868 >                break;
869              }
870 <            else if (!(inactivate |= active) &&  // must inactivate to suspend
870 >            if (!(inactivate |= active) &&  // must inactivate to suspend
871                  UNSAFE.compareAndSwapInt(this, workerCountsOffset,
872                                           wc, wc - ONE_RUNNING) &&
873                  !w.suspendAsSpare())             // false if trimmed
# Line 912 | Line 879 | public class ForkJoinPool extends Abstra
879       * Awaits join of the given task if enough threads, or can resume
880       * or create a spare. Fails (in which case the given task might
881       * not be done) upon contention or lack of decision about
882 <     * blocking. Returns void because caller must check
916 <     * task status on return anyway.
882 >     * blocking.
883       *
884       * We allow blocking if:
885       *
# Line 921 | Line 887 | public class ForkJoinPool extends Abstra
887       *    parallelism level if this thread blocks.
888       *
889       * 2. A spare is resumed to replace this worker. We tolerate
890 <     *    slop in the decision to replace if a spare is found without
891 <     *    first decrementing run count.  This may release too many,
892 <     *    but if so, the superfluous ones will re-suspend via
927 <     *    preStep().
890 >     *    races in the decision to replace when a spare is found.
891 >     *    This may release too many, but if so, the superfluous ones
892 >     *    will re-suspend via preStep().
893       *
894 <     * 3. After #spares repeated checks, there are no fewer than #spare
894 >     * 3. After #spares repeated retries, there are fewer than #spare
895       *    threads not running. We allow this slack to avoid hysteresis
896       *    and as a hedge against lag/uncertainty of running count
897       *    estimates when signalling or unblocking stalls.
898       *
899 <     * 4. All existing workers are busy (as rechecked via repeated
900 <     *    retries by caller) and a new spare is created.
899 >     * 4. All existing workers are busy (as rechecked via #spares
900 >     *    repeated retries by caller) and a new spare is created.
901       *
902 <     * If none of the above hold, we try to escape out by
903 <     * re-incrementing count and returning to caller, which can retry
939 <     * later.
902 >     * If none of the above hold, we escape out by re-incrementing
903 >     * count and returning to caller, which can retry later.
904       *
905       * @param joinMe the task to join
906 <     * @param retries if negative, then serve only as a precheck
943 <     *   that the thread can be replaced by a spare. Otherwise,
944 <     *   the number of repeated calls to this method returning busy
945 <     * @return true if the call must be retried because there
946 <     *   none of the blocking checks hold
906 >     * @param retries the number of calls to this method for this join
907       */
908 <    final boolean tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) {
949 <        if (joinMe.status < 0) // precheck for cancellation
950 <            return false;
951 <        if ((runState & TERMINATING) != 0) { // shutting down
952 <            joinMe.cancelIgnoringExceptions();
953 <            return false;
954 <        }
955 <
908 >    final void tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) {
909          int pc = parallelism;
910          boolean running = true; // false when running count decremented
911 <        outer:for (;;) {
911 >        outer:while (joinMe.status >= 0) {
912              int wc = workerCounts;
913              int rc = wc & RUNNING_COUNT_MASK;
914              int tc = wc >>> TOTAL_COUNT_SHIFT;
915              if (running) { // replace with spare or decrement count
916                  if (rc <= pc && tc > pc &&
917                      (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
918 <                    ForkJoinWorkerThread[] ws = workers;
918 >                    ForkJoinWorkerThread[] ws = workers; // search for spare
919                      int nws = ws.length;
920 <                    for (int i = 0; i < nws; ++i) { // search for spare
920 >                    for (int i = 0; i < nws; ++i) {
921                          ForkJoinWorkerThread w = ws[i];
922 <                        if (w != null) {
922 >                        if (w != null && w.isSuspended()) {
923 >                            if ((workerCounts & RUNNING_COUNT_MASK) > pc)
924 >                                continue outer;
925                              if (joinMe.status < 0)
926 <                                return false;
927 <                            if (w.isSuspended()) {
928 <                                if ((workerCounts & RUNNING_COUNT_MASK)>=pc &&
929 <                                    w.tryResumeSpare()) {
975 <                                    running = false;
976 <                                    break outer;
977 <                                }
978 <                                continue outer; // rescan
926 >                                break outer;
927 >                            if (w.tryResumeSpare()) {
928 >                                running = false;
929 >                                break outer;
930                              }
931 +                            continue outer; // rescan on failure to resume
932                          }
933                      }
934                  }
935 <                if (retries < 0 || // < 0 means replacement check only
936 <                    rc == 0 || joinMe.status < 0 || workerCounts != wc ||
985 <                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
986 <                                              wc, wc - ONE_RUNNING))
987 <                    return false; // done or inconsistent or contended
988 <                running = false;
989 <                if (rc > pc)
935 >                if ((rc <= pc && (rc == 0 || --retries < 0)) || // no retry
936 >                    joinMe.status < 0)
937                      break;
938 +                if (workerCounts == wc &&
939 +                    UNSAFE.compareAndSwapInt(this, workerCountsOffset,
940 +                                             wc, wc - ONE_RUNNING))
941 +                    running = false;
942              }
943              else { // allow blocking if enough threads
944 <                if (rc >= pc || joinMe.status < 0)
944 >                int sc = tc - pc + 1;          // = spares, plus the one to add
945 >                if (sc > 0 && rc > 0 && rc >= pc - sc && rc > pc - retries)
946 >                    break;  
947 >                if (--retries > sc && tc < MAX_THREADS &&
948 >                    tc == (runState & ACTIVE_COUNT_MASK) &&
949 >                    workerCounts == wc &&
950 >                    UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
951 >                                             wc + (ONE_RUNNING|ONE_TOTAL))) {
952 >                    addWorker();
953                      break;
995                int sc = tc - pc + 1; // = spare threads, plus the one to add
996                if (retries > sc) {
997                    if (rc > 0 && rc >= pc - sc) // allow slack
998                        break;
999                    if (tc < MAX_THREADS &&
1000                        tc == (runState & ACTIVE_COUNT_MASK) &&
1001                        workerCounts == wc &&
1002                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1003                                                 wc+(ONE_RUNNING|ONE_TOTAL))) {
1004                        addWorker();
1005                        break;
1006                    }
954                  }
955 <                if (workerCounts == wc &&        // back out to allow rescan
955 >                if (workerCounts == wc &&
956                      UNSAFE.compareAndSwapInt (this, workerCountsOffset,
957                                                wc, wc + ONE_RUNNING)) {
958 <                    releaseWaiters();            // help others progress
959 <                    return true;                 // let caller retry
958 >                    running = true;            // back out; allow retry
959 >                    break;
960                  }
961              }
962          }
963 <        // arrive here if can block
964 <        joinMe.internalAwaitDone();
965 <        int c;                      // to inline incrementRunningCount
966 <        do {} while (!UNSAFE.compareAndSwapInt
967 <                     (this, workerCountsOffset,
968 <                      c = workerCounts, c + ONE_RUNNING));
969 <        return false;
963 >        if (!running) { // can block
964 >            int c;                      // to inline incrementRunningCount
965 >            joinMe.internalAwaitDone();
966 >            do {} while (!UNSAFE.compareAndSwapInt
967 >                         (this, workerCountsOffset,
968 >                          c = workerCounts, c + ONE_RUNNING));
969 >        }
970      }
971  
972      /**
# Line 1029 | Line 976 | public class ForkJoinPool extends Abstra
976       */
977      final void awaitBlocker(ManagedBlocker blocker)
978          throws InterruptedException {
1032        boolean done;
1033        if (done = blocker.isReleasable())
1034            return;
979          int pc = parallelism;
980 +        boolean running = true;
981          int retries = 0;
982 <        boolean running = true; // false when running count decremented
983 <        outer:for (;;) {
982 >        boolean done;
983 >        outer:while (!(done = blocker.isReleasable())) {
984              int wc = workerCounts;
985              int rc = wc & RUNNING_COUNT_MASK;
986              int tc = wc >>> TOTAL_COUNT_SHIFT;
987              if (running) {
988 <                if (rc <= pc && tc > pc &&
988 >                if (rc <= pc && tc > pc &&
989                      (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
990                      ForkJoinWorkerThread[] ws = workers;
991                      int nws = ws.length;
992                      for (int i = 0; i < nws; ++i) {
993                          ForkJoinWorkerThread w = ws[i];
994 <                        if (w != null) {
994 >                        if (w != null && w.isSuspended()) {
995 >                            if ((workerCounts & RUNNING_COUNT_MASK) > pc)
996 >                                continue outer;
997                              if (done = blocker.isReleasable())
998 <                                return;
999 <                            if (w.isSuspended()) {
1000 <                                if ((workerCounts & RUNNING_COUNT_MASK)>=pc &&
1001 <                                    w.tryResumeSpare()) {
1055 <                                    running = false;
1056 <                                    break outer;
1057 <                                }
1058 <                                continue outer; // rescan
998 >                                break outer;
999 >                            if (w.tryResumeSpare()) {
1000 >                                running = false;
1001 >                                break outer;
1002                              }
1003 +                            continue outer;
1004                          }
1005                      }
1006 <                }
1063 <                if (done = blocker.isReleasable())
1064 <                    return;
1065 <                if (rc == 0 || workerCounts != wc ||
1066 <                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1067 <                                              wc, wc - ONE_RUNNING))
1068 <                    continue;
1069 <                running = false;
1070 <                if (rc > pc)
1071 <                    break;
1072 <            }
1073 <            else {
1074 <                if (rc >= pc || (done = blocker.isReleasable()))
1075 <                    break;
1076 <                int sc = tc - pc + 1;
1077 <                if (retries++ > sc) {
1078 <                    if (rc > 0 && rc >= pc - sc)
1006 >                    if (done = blocker.isReleasable())
1007                          break;
1008 <                    if (tc < MAX_THREADS &&
1009 <                        tc == (runState & ACTIVE_COUNT_MASK) &&
1010 <                        workerCounts == wc &&
1011 <                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1012 <                                                 wc+(ONE_RUNNING|ONE_TOTAL))) {
1013 <                        addWorker();
1008 >                }
1009 >                if (rc > 0 && workerCounts == wc &&
1010 >                    UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1011 >                                             wc, wc - ONE_RUNNING)) {
1012 >                    running = false;
1013 >                    if (rc > pc)
1014                          break;
1087                    }
1015                  }
1016 +            }
1017 +            else if (rc >= pc)
1018 +                break;
1019 +            else if (tc < MAX_THREADS &&
1020 +                     tc == (runState & ACTIVE_COUNT_MASK) &&
1021 +                     workerCounts == wc &&
1022 +                     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1023 +                                              wc + (ONE_RUNNING|ONE_TOTAL))) {
1024 +                addWorker();
1025 +                break;
1026 +            }
1027 +            else if (workerCounts == wc &&
1028 +                     UNSAFE.compareAndSwapInt (this, workerCountsOffset,
1029 +                                              wc, wc + ONE_RUNNING)) {
1030                  Thread.yield();
1031 +                ++retries;
1032 +                running = true;            // allow rescan
1033              }
1034          }
1035  
# Line 1342 | Line 1285 | public class ForkJoinPool extends Abstra
1285              throw new RejectedExecutionException();
1286          submissionQueue.offer(task);
1287          signalEvent();
1288 <        ensureEnoughWorkers();
1288 >        ensureEnoughTotalWorkers();
1289      }
1290  
1291      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines