ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.58 by dl, Fri Jul 23 13:07:43 2010 UTC vs.
Revision 1.60 by dl, Sat Jul 24 20:28:18 2010 UTC

# Line 52 | Line 52 | import java.util.concurrent.CountDownLat
52   * convenient form for informal monitoring.
53   *
54   * <p> As is the case with other ExecutorServices, there are three
55 < * main task execution methods summarized in the follwoing
55 > * main task execution methods summarized in the following
56   * table. These are designed to be used by clients not already engaged
57   * in fork/join computations in the current pool.  The main forms of
58   * these methods accept instances of {@code ForkJoinTask}, but
# Line 60 | Line 60 | import java.util.concurrent.CountDownLat
60   * Runnable}- or {@code Callable}- based activities as well.  However,
61   * tasks that are already executing in a pool should normally
62   * <em>NOT</em> use these pool execution methods, but instead use the
63 < * within-computation forms listed in the table.
63 > * within-computation forms listed in the table.
64   *
65   * <table BORDER CELLPADDING=3 CELLSPACING=1>
66   *  <tr>
# Line 84 | Line 84 | import java.util.concurrent.CountDownLat
84   *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
85   *  </tr>
86   * </table>
87 < *
87 > *
88   * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
89   * used for all parallel task execution in a program or subsystem.
90   * Otherwise, use would not usually outweigh the construction and
# Line 138 | Line 138 | public class ForkJoinPool extends Abstra
138       * cache pollution effects.)
139       *
140       * Beyond work-stealing support and essential bookkeeping, the
141 <     * main responsibility of this framework is to arrange tactics for
142 <     * when one worker is waiting to join a task stolen (or always
143 <     * held by) another.  Becauae we are multiplexing many tasks on to
144 <     * a pool of workers, we can't just let them block (as in
145 <     * Thread.join).  We also cannot just reassign the joiner's
146 <     * run-time stack with another and replace it later, which would
147 <     * be a form of "continuation", that even if possible is not
148 <     * necessarily a good idea. Given that the creation costs of most
149 <     * threads on most systems mainly surrounds setting up runtime
150 <     * stacks, thread creation and switching is usually not much more
151 <     * expensive than stack creation and switching, and is more
152 <     * flexible). Instead we combine two tactics:
141 >     * main responsibility of this framework is to take actions when
142 >     * one worker is waiting to join a task stolen (or always held by)
143 >     * another.  Becauae we are multiplexing many tasks on to a pool
144 >     * of workers, we can't just let them block (as in Thread.join).
145 >     * We also cannot just reassign the joiner's run-time stack with
146 >     * another and replace it later, which would be a form of
147 >     * "continuation", that even if possible is not necessarily a good
148 >     * idea. Given that the creation costs of most threads on most
149 >     * systems mainly surrounds setting up runtime stacks, thread
150 >     * creation and switching is usually not much more expensive than
151 >     * stack creation and switching, and is more flexible). Instead we
152 >     * combine two tactics:
153       *
154 <     *   1. Arranging for the joiner to execute some task that it
154 >     *   Helping: Arranging for the joiner to execute some task that it
155       *      would be running if the steal had not occurred.  Method
156       *      ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
157       *      links to try to find such a task.
158       *
159 <     *   2. Unless there are already enough live threads, creating or
160 <     *      or re-activating a spare thread to compensate for the
161 <     *      (blocked) joiner until it unblocks.  Spares then suspend
162 <     *      at their next opportunity or eventually die if unused for
163 <     *      too long.  See below and the internal documentation
164 <     *      for tryAwaitJoin for more details about compensation
165 <     *      rules.
159 >     *   Compensating: Unless there are already enough live threads,
160 >     *      creating or or re-activating a spare thread to compensate
161 >     *      for the (blocked) joiner until it unblocks.  Spares then
162 >     *      suspend at their next opportunity or eventually die if
163 >     *      unused for too long.  See below and the internal
164 >     *      documentation for tryAwaitJoin for more details about
165 >     *      compensation rules.
166       *
167       * Because the determining existence of conservatively safe
168       * helping targets, the availability of already-created spares,
# Line 171 | Line 171 | public class ForkJoinPool extends Abstra
171       * ForkJoinWorkerThread.joinTask) interleave these options until
172       * successful.  Creating a new spare always succeeds, but also
173       * increases application footprint, so we try to avoid it, within
174 <     * reason.
174 >     * reason.
175       *
176 <     * The ManagedBlocker extension API can't use option (1) so uses a
177 <     * special version of (2) in method awaitBlocker.
176 >     * The ManagedBlocker extension API can't use helping so uses a
177 >     * special version of compensation in method awaitBlocker.
178       *
179       * The main throughput advantages of work-stealing stem from
180       * decentralized control -- workers mostly steal tasks from each
# Line 497 | Line 497 | public class ForkJoinPool extends Abstra
497       * making decisions about creating and suspending spare
498       * threads. Updated only by CAS. Note that adding a new worker
499       * requires incrementing both counts, since workers start off in
500 <     * running state.  This field is also used for memory-fencing
501 <     * configuration parameters.
500 >     * running state.
501       */
502      private volatile int workerCounts;
503  
# Line 539 | Line 538 | public class ForkJoinPool extends Abstra
538      final void incrementRunningCount() {
539          int c;
540          do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
541 <                                               c = workerCounts,
541 >                                               c = workerCounts,
542                                                 c + ONE_RUNNING));
543      }
544  
# Line 657 | Line 656 | public class ForkJoinPool extends Abstra
656          try {
657              w = factory.newThread(this);
658          } finally { // Adjust on either null or exceptional factory return
659 <            if (w == null) {
659 >            if (w == null)
660                  onWorkerCreationFailure();
662                return null;
663            }
661          }
662 <        w.start(recordWorker(w), ueh);
662 >        if (w != null)
663 >            w.start(recordWorker(w), ueh);
664          return w;
665      }
666  
# Line 672 | Line 670 | public class ForkJoinPool extends Abstra
670      private void onWorkerCreationFailure() {
671          for (;;) {
672              int wc = workerCounts;
673 <            if ((wc >>> TOTAL_COUNT_SHIFT) == 0)
674 <                Thread.yield(); // wait for other counts to settle
673 >            int rc = wc & RUNNING_COUNT_MASK;
674 >            int tc = wc >>> TOTAL_COUNT_SHIFT;
675 >            if (rc == 0 || wc == 0)
676 >                Thread.yield(); // must wait for other counts to settle
677              else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
678                                                wc - (ONE_RUNNING|ONE_TOTAL)))
679                  break;
# Line 682 | Line 682 | public class ForkJoinPool extends Abstra
682      }
683  
684      /**
685 <     * Creates and/or resumes enough workers to establish target
686 <     * parallelism, giving up if terminating or addWorker fails
687 <     *
688 <     * TODO: recast this to support lazier creation and automated
689 <     * parallelism maintenance
685 >     * Creates enough total workers to establish target parallelism,
686 >     * giving up if terminating or addWorker fails
687       */
688 <    private void ensureEnoughWorkers() {
689 <        for (;;) {
690 <            int pc = parallelism;
691 <            int wc = workerCounts;
692 <            int rc = wc & RUNNING_COUNT_MASK;
693 <            int tc = wc >>> TOTAL_COUNT_SHIFT;
694 <            if (tc < pc) {
698 <                if (runState == TERMINATING ||
699 <                    (UNSAFE.compareAndSwapInt
700 <                     (this, workerCountsOffset,
701 <                      wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
702 <                     addWorker() == null))
703 <                    break;
704 <            }
705 <            else if (tc > pc && rc < pc &&
706 <                     tc > (runState & ACTIVE_COUNT_MASK)) {
707 <                ForkJoinWorkerThread spare = null;
708 <                ForkJoinWorkerThread[] ws = workers;
709 <                int nws = ws.length;
710 <                for (int i = 0; i < nws; ++i) {
711 <                    ForkJoinWorkerThread w = ws[i];
712 <                    if (w != null && w.isSuspended()) {
713 <                        if ((workerCounts & RUNNING_COUNT_MASK) > pc ||
714 <                            runState == TERMINATING)
715 <                            return;
716 <                        if (w.tryResumeSpare())
717 <                            incrementRunningCount();
718 <                        break;
719 <                    }
720 <                }
721 <            }
722 <            else
688 >    private void ensureEnoughTotalWorkers() {
689 >        int wc;
690 >        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < parallelism &&
691 >               runState < TERMINATING) {
692 >            if ((UNSAFE.compareAndSwapInt(this, workerCountsOffset,
693 >                                          wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
694 >                 addWorker() == null))
695                  break;
696          }
697      }
# Line 746 | Line 718 | public class ForkJoinPool extends Abstra
718          for (;;) {
719              int wc = workerCounts;
720              int rc = wc & RUNNING_COUNT_MASK;
721 <            if (rc - nr < 0 || (wc >>> TOTAL_COUNT_SHIFT) == 0)
721 >            int tc = wc >>> TOTAL_COUNT_SHIFT;
722 >            if (rc - nr < 0 || tc == 0)
723                  Thread.yield(); // back off if waiting for other updates
724              else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
725                                                wc, wc - unit))
# Line 755 | Line 728 | public class ForkJoinPool extends Abstra
728  
729          accumulateStealCount(w); // collect final count
730          if (!tryTerminate(false))
731 <            ensureEnoughWorkers();
731 >            ensureEnoughTotalWorkers();
732      }
733  
734      // Waiting for and signalling events
# Line 771 | Line 744 | public class ForkJoinPool extends Abstra
744              int n = ws.length;
745              for (;;) {
746                  int i = ((int)(top & WAITER_ID_MASK)) - 1;
747 <                if (i < 0 || (int)(top >>> EVENT_COUNT_SHIFT) == eventCount)
747 >                int e = (int)(top >>> EVENT_COUNT_SHIFT);
748 >                if (i < 0 || e == eventCount)
749                      return;
750                  ForkJoinWorkerThread w;
751                  if (i < n && (w = ws[i]) != null &&
# Line 792 | Line 766 | public class ForkJoinPool extends Abstra
766       */
767      private void signalEvent() {
768          int c;
769 <        do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
769 >        do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
770                                                 c = eventCount, c+1));
771          releaseWaiters();
772      }
# Line 811 | Line 785 | public class ForkJoinPool extends Abstra
785              int n = ws.length;
786              for (;;) {
787                  int i = ((int)(top & WAITER_ID_MASK)) - 1;
788 <                if (i < 0 || (int)(top >>> EVENT_COUNT_SHIFT) == ec)
788 >                int e = (int)(top >>> EVENT_COUNT_SHIFT);
789 >                if (i < 0 || e == ec)
790                      return;
791                  ForkJoinWorkerThread w;
792                  if (i < n && (w = ws[i]) != null &&
# Line 828 | Line 803 | public class ForkJoinPool extends Abstra
803      }
804  
805      /**
806 <     * If worker is inactive, blocks until terminating or event count
807 <     * advances from last value held by worker; in any case helps
833 <     * release others.
806 >     * Blockss worker until terminating or event count
807 >     * advances from last value held by worker
808       *
809       * @param w the calling worker thread
836     * @param retries the number of scans by caller failing to find work
837     * @return false if now too many threads running
810       */
811 <    private boolean eventSync(ForkJoinWorkerThread w, int retries) {
811 >    private void eventSync(ForkJoinWorkerThread w) {
812          int wec = w.lastEventCount;
813 <        if (retries > 1) { // can only block after 2nd miss
814 <            long nextTop = (((long)wec << EVENT_COUNT_SHIFT) |
815 <                            ((long)(w.poolIndex + 1)));
816 <            long top;
817 <            while ((runState < SHUTDOWN || !tryTerminate(false)) &&
818 <                   (((int)(top = eventWaiters) & WAITER_ID_MASK) == 0 ||
819 <                    (int)(top >>> EVENT_COUNT_SHIFT) == wec) &&
820 <                   eventCount == wec) {
821 <                if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
822 <                                              w.nextWaiter = top, nextTop)) {
823 <                    accumulateStealCount(w); // transfer steals while idle
824 <                    Thread.interrupted();    // clear/ignore interrupt
825 <                    while (eventCount == wec)
826 <                        w.doPark();
855 <                    break;
856 <                }
813 >        long nextTop = (((long)wec << EVENT_COUNT_SHIFT) |
814 >                        ((long)(w.poolIndex + 1)));
815 >        long top;
816 >        while ((runState < SHUTDOWN || !tryTerminate(false)) &&
817 >               (((int)(top = eventWaiters) & WAITER_ID_MASK) == 0 ||
818 >                (int)(top >>> EVENT_COUNT_SHIFT) == wec) &&
819 >               eventCount == wec) {
820 >            if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
821 >                                          w.nextWaiter = top, nextTop)) {
822 >                accumulateStealCount(w); // transfer steals while idle
823 >                Thread.interrupted();    // clear/ignore interrupt
824 >                while (eventCount == wec)
825 >                    w.doPark();
826 >                break;
827              }
858            wec = eventCount;
828          }
829 <        releaseWaiters();
861 <        int wc = workerCounts;
862 <        if ((wc & RUNNING_COUNT_MASK) <= parallelism) {
863 <            w.lastEventCount = wec;
864 <            return true;
865 <        }
866 <        if (wec != w.lastEventCount) // back up if may re-wait
867 <            w.lastEventCount = wec - (wc >>> TOTAL_COUNT_SHIFT);
868 <        return false;
829 >        w.lastEventCount = eventCount;
830      }
831  
832      /**
# Line 891 | Line 852 | public class ForkJoinPool extends Abstra
852       */
853      final void preStep(ForkJoinWorkerThread w, int retries) {
854          boolean active = w.active;
855 <        boolean inactivate = active && retries != 0;
855 >        boolean inactivate = active && retries > 0;
856          for (;;) {
857              int rs, wc;
858              if (inactivate &&
# Line 899 | Line 860 | public class ForkJoinPool extends Abstra
860                                           rs = runState, rs - ONE_ACTIVE))
861                  inactivate = active = w.active = false;
862              if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= parallelism) {
863 <                if (active || eventSync(w, retries))
864 <                    break;
863 >                if (retries > 0) {
864 >                    if (retries > 1 && !active)
865 >                        eventSync(w);
866 >                    releaseWaiters();
867 >                }
868 >                break;
869              }
870 <            else if (!(inactivate |= active) &&  // must inactivate to suspend
870 >            if (!(inactivate |= active) &&  // must inactivate to suspend
871                  UNSAFE.compareAndSwapInt(this, workerCountsOffset,
872                                           wc, wc - ONE_RUNNING) &&
873                  !w.suspendAsSpare())             // false if trimmed
# Line 914 | Line 879 | public class ForkJoinPool extends Abstra
879       * Awaits join of the given task if enough threads, or can resume
880       * or create a spare. Fails (in which case the given task might
881       * not be done) upon contention or lack of decision about
882 <     * blocking. Returns void because caller must check
918 <     * task status on return anyway.
882 >     * blocking.
883       *
884       * We allow blocking if:
885       *
886 <     * 1. There would still be at least as many running threads as
886 >     * 1. There would still be at least as many running threads as
887       *    parallelism level if this thread blocks.
888       *
889       * 2. A spare is resumed to replace this worker. We tolerate
890 <     *    slop in the decision to replace if a spare is found without
891 <     *    first decrementing run count.  This may release too many,
892 <     *    but if so, the superfluous ones will re-suspend via
893 <     *    preStep().
894 <     *
895 <     * 3. After #spares repeated checks, there are no fewer than #spare
896 <     *    threads not running. We allow this slack to avoid hysteresis
933 <     *    and as a hedge against lag/uncertainty of running count
890 >     *    races in the decision to replace when a spare is found.
891 >     *    This may release too many, but if so, the superfluous ones
892 >     *    will re-suspend via preStep().
893 >     *
894 >     * 3. After #spares repeated retries, there are fewer than #spare
895 >     *    threads not running. We allow this slack to avoid hysteresis
896 >     *    and as a hedge against lag/uncertainty of running count
897       *    estimates when signalling or unblocking stalls.
898       *
899 <     * 4. All existing workers are busy (as rechecked via repeated
900 <     *    retries by caller) and a new spare is created.
901 <     *
902 <     * If none of the above hold, we try to escape out by
903 <     * re-incrementing count and returning to caller, which can retry
941 <     * later.
899 >     * 4. All existing workers are busy (as rechecked via #spares
900 >     *    repeated retries by caller) and a new spare is created.
901 >     *
902 >     * If none of the above hold, we escape out by re-incrementing
903 >     * count and returning to caller, which can retry later.
904       *
905       * @param joinMe the task to join
906 <     * @param retries if negative, then serve only as a precheck
945 <     *   that the thread can be replaced by a spare. Otherwise,
946 <     *   the number of repeated calls to this method returning busy
947 <     * @return true if the call must be retried because there
948 <     *   none of the blocking checks hold
906 >     * @param retries the number of calls to this method for this join
907       */
908 <    final boolean tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) {
951 <        if (joinMe.status < 0) // precheck to prime loop
952 <            return false;
908 >    final void tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) {
909          int pc = parallelism;
910          boolean running = true; // false when running count decremented
911 <        outer:for (;;) {
911 >        outer:while (joinMe.status >= 0) {
912              int wc = workerCounts;
913              int rc = wc & RUNNING_COUNT_MASK;
914              int tc = wc >>> TOTAL_COUNT_SHIFT;
915              if (running) { // replace with spare or decrement count
916 <                if (rc <= pc && tc > pc &&
916 >                if (rc <= pc && tc > pc &&
917                      (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
918 <                    ForkJoinWorkerThread[] ws = workers;
918 >                    ForkJoinWorkerThread[] ws = workers; // search for spare
919                      int nws = ws.length;
920 <                    for (int i = 0; i < nws; ++i) { // search for spare
920 >                    for (int i = 0; i < nws; ++i) {
921                          ForkJoinWorkerThread w = ws[i];
922 <                        if (w != null) {
922 >                        if (w != null && w.isSuspended()) {
923 >                            if ((workerCounts & RUNNING_COUNT_MASK) > pc)
924 >                                continue outer;
925                              if (joinMe.status < 0)
926 <                                return false;
927 <                            if (w.isSuspended()) {
928 <                                if ((workerCounts & RUNNING_COUNT_MASK)>=pc &&
929 <                                    w.tryResumeSpare()) {
972 <                                    running = false;
973 <                                    break outer;
974 <                                }
975 <                                continue outer; // rescan
926 >                                break outer;
927 >                            if (w.tryResumeSpare()) {
928 >                                running = false;
929 >                                break outer;
930                              }
931 +                            continue outer; // rescan on failure to resume
932                          }
933                      }
934                  }
935 <                if (retries < 0 || // < 0 means replacement check only
936 <                    rc == 0 || joinMe.status < 0 || workerCounts != wc ||
982 <                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
983 <                                              wc, wc - ONE_RUNNING))
984 <                    return false; // done or inconsistent or contended
985 <                running = false;
986 <                if (rc > pc)
935 >                if ((rc <= pc && (rc == 0 || --retries < 0)) || // no retry
936 >                    joinMe.status < 0)
937                      break;
938 +                if (workerCounts == wc &&
939 +                    UNSAFE.compareAndSwapInt(this, workerCountsOffset,
940 +                                             wc, wc - ONE_RUNNING))
941 +                    running = false;
942              }
943              else { // allow blocking if enough threads
944 <                if (rc >= pc || joinMe.status < 0)
944 >                int sc = tc - pc + 1;          // = spares, plus the one to add
945 >                if (sc > 0 && rc > 0 && rc >= pc - sc && rc > pc - retries)
946 >                    break;  
947 >                if (--retries > sc && tc < MAX_THREADS &&
948 >                    tc == (runState & ACTIVE_COUNT_MASK) &&
949 >                    workerCounts == wc &&
950 >                    UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
951 >                                             wc + (ONE_RUNNING|ONE_TOTAL))) {
952 >                    addWorker();
953                      break;
992                int sc = tc - pc + 1; // = spare threads, plus the one to add
993                if (retries > sc) {
994                    if (rc > 0 && rc >= pc - sc) // allow slack
995                        break;
996                    if (tc < MAX_THREADS &&
997                        tc == (runState & ACTIVE_COUNT_MASK) &&
998                        workerCounts == wc &&
999                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1000                                                 wc+(ONE_RUNNING|ONE_TOTAL))) {
1001                        addWorker();
1002                        break;
1003                    }
954                  }
955 <                if (workerCounts == wc &&        // back out to allow rescan
955 >                if (workerCounts == wc &&
956                      UNSAFE.compareAndSwapInt (this, workerCountsOffset,
957                                                wc, wc + ONE_RUNNING)) {
958 <                    releaseWaiters();            // help others progress
959 <                    return true;                 // let caller retry
958 >                    running = true;            // back out; allow retry
959 >                    break;
960                  }
961              }
962          }
963 <        // arrive here if can block
964 <        joinMe.internalAwaitDone();
965 <        int c;                      // to inline incrementRunningCount
966 <        do {} while (!UNSAFE.compareAndSwapInt
967 <                     (this, workerCountsOffset,
968 <                      c = workerCounts, c + ONE_RUNNING));
969 <        return false;
963 >        if (!running) { // can block
964 >            int c;                      // to inline incrementRunningCount
965 >            joinMe.internalAwaitDone();
966 >            do {} while (!UNSAFE.compareAndSwapInt
967 >                         (this, workerCountsOffset,
968 >                          c = workerCounts, c + ONE_RUNNING));
969 >        }
970      }
971  
972      /**
# Line 1026 | Line 976 | public class ForkJoinPool extends Abstra
976       */
977      final void awaitBlocker(ManagedBlocker blocker)
978          throws InterruptedException {
1029        boolean done;
1030        if (done = blocker.isReleasable())
1031            return;
979          int pc = parallelism;
980 +        boolean running = true;
981          int retries = 0;
982 <        boolean running = true; // false when running count decremented
983 <        outer:for (;;) {
982 >        boolean done;
983 >        outer:while (!(done = blocker.isReleasable())) {
984              int wc = workerCounts;
985              int rc = wc & RUNNING_COUNT_MASK;
986              int tc = wc >>> TOTAL_COUNT_SHIFT;
987 <            if (running) {
987 >            if (running) {
988                  if (rc <= pc && tc > pc &&
989                      (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
990                      ForkJoinWorkerThread[] ws = workers;
991                      int nws = ws.length;
992 <                    for (int i = 0; i < nws; ++i) {
992 >                    for (int i = 0; i < nws; ++i) {
993                          ForkJoinWorkerThread w = ws[i];
994 <                        if (w != null) {
994 >                        if (w != null && w.isSuspended()) {
995 >                            if ((workerCounts & RUNNING_COUNT_MASK) > pc)
996 >                                continue outer;
997                              if (done = blocker.isReleasable())
998 <                                return;
999 <                            if (w.isSuspended()) {
1000 <                                if ((workerCounts & RUNNING_COUNT_MASK)>=pc &&
1001 <                                    w.tryResumeSpare()) {
1052 <                                    running = false;
1053 <                                    break outer;
1054 <                                }
1055 <                                continue outer; // rescan
998 >                                break outer;
999 >                            if (w.tryResumeSpare()) {
1000 >                                running = false;
1001 >                                break outer;
1002                              }
1003 +                            continue outer;
1004                          }
1005                      }
1006 <                }
1060 <                if (done = blocker.isReleasable())
1061 <                    return;
1062 <                if (rc == 0 || workerCounts != wc ||
1063 <                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1064 <                                              wc, wc - ONE_RUNNING))
1065 <                    continue;
1066 <                running = false;
1067 <                if (rc > pc)
1068 <                    break;
1069 <            }
1070 <            else {
1071 <                if (rc >= pc || (done = blocker.isReleasable()))
1072 <                    break;
1073 <                int sc = tc - pc + 1;
1074 <                if (retries++ > sc) {
1075 <                    if (rc > 0 && rc >= pc - sc)
1006 >                    if (done = blocker.isReleasable())
1007                          break;
1008 <                    if (tc < MAX_THREADS &&
1009 <                        tc == (runState & ACTIVE_COUNT_MASK) &&
1010 <                        workerCounts == wc &&
1011 <                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1012 <                                                 wc+(ONE_RUNNING|ONE_TOTAL))) {
1013 <                        addWorker();
1008 >                }
1009 >                if (rc > 0 && workerCounts == wc &&
1010 >                    UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1011 >                                             wc, wc - ONE_RUNNING)) {
1012 >                    running = false;
1013 >                    if (rc > pc)
1014                          break;
1084                    }
1015                  }
1016 +            }
1017 +            else if (rc >= pc)
1018 +                break;
1019 +            else if (tc < MAX_THREADS &&
1020 +                     tc == (runState & ACTIVE_COUNT_MASK) &&
1021 +                     workerCounts == wc &&
1022 +                     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1023 +                                              wc + (ONE_RUNNING|ONE_TOTAL))) {
1024 +                addWorker();
1025 +                break;
1026 +            }
1027 +            else if (workerCounts == wc &&
1028 +                     UNSAFE.compareAndSwapInt (this, workerCountsOffset,
1029 +                                              wc, wc + ONE_RUNNING)) {
1030                  Thread.yield();
1031 +                ++retries;
1032 +                running = true;            // allow rescan
1033              }
1034          }
1035 <        
1035 >
1036          try {
1037              if (!done)
1038                  do {} while (!blocker.isReleasable() && !blocker.block());
# Line 1098 | Line 1044 | public class ForkJoinPool extends Abstra
1044                                c = workerCounts, c + ONE_RUNNING));
1045              }
1046          }
1047 <    }  
1047 >    }
1048  
1049      /**
1050       * Possibly initiates and/or completes termination.
# Line 1275 | Line 1221 | public class ForkJoinPool extends Abstra
1221       * use {@link java.lang.Runtime#availableProcessors}.
1222       * @param factory the factory for creating new threads. For default value,
1223       * use {@link #defaultForkJoinWorkerThreadFactory}.
1224 <     * @param handler the handler for internal worker threads that
1225 <     * terminate due to unrecoverable errors encountered while executing
1224 >     * @param handler the handler for internal worker threads that
1225 >     * terminate due to unrecoverable errors encountered while executing
1226       * tasks. For default value, use <code>null</code>.
1227 <     * @param asyncMode if true,
1227 >     * @param asyncMode if true,
1228       * establishes local first-in-first-out scheduling mode for forked
1229       * tasks that are never joined. This mode may be more appropriate
1230       * than default locally stack-based mode in applications in which
# Line 1292 | Line 1238 | public class ForkJoinPool extends Abstra
1238       *         because it does not hold {@link
1239       *         java.lang.RuntimePermission}{@code ("modifyThread")}
1240       */
1241 <    public ForkJoinPool(int parallelism,
1241 >    public ForkJoinPool(int parallelism,
1242                          ForkJoinWorkerThreadFactory factory,
1243                          Thread.UncaughtExceptionHandler handler,
1244                          boolean asyncMode) {
# Line 1339 | Line 1285 | public class ForkJoinPool extends Abstra
1285              throw new RejectedExecutionException();
1286          submissionQueue.offer(task);
1287          signalEvent();
1288 <        ensureEnoughWorkers();
1288 >        ensureEnoughTotalWorkers();
1289      }
1290  
1291      /**
1292       * Performs the given task, returning its result upon completion.
1293       * If the caller is already engaged in a fork/join computation in
1294 <     * the current pool, this method is equivalent in effect to
1294 >     * the current pool, this method is equivalent in effect to
1295       * {@link ForkJoinTask#invoke}.
1296       *
1297       * @param task the task
# Line 1362 | Line 1308 | public class ForkJoinPool extends Abstra
1308      /**
1309       * Arranges for (asynchronous) execution of the given task.
1310       * If the caller is already engaged in a fork/join computation in
1311 <     * the current pool, this method is equivalent in effect to
1311 >     * the current pool, this method is equivalent in effect to
1312       * {@link ForkJoinTask#fork}.
1313       *
1314       * @param task the task
# Line 1393 | Line 1339 | public class ForkJoinPool extends Abstra
1339      /**
1340       * Submits a ForkJoinTask for execution.
1341       * If the caller is already engaged in a fork/join computation in
1342 <     * the current pool, this method is equivalent in effect to
1342 >     * the current pool, this method is equivalent in effect to
1343       * {@link ForkJoinTask#fork}.
1344       *
1345       * @param task the task to submit

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines