ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.55 by dl, Sun Apr 18 13:59:57 2010 UTC vs.
Revision 1.56 by dl, Thu May 27 16:46:48 2010 UTC

# Line 140 | Line 140 | public class ForkJoinPool extends Abstra
140       * (workerLock) but the array is otherwise concurrently readable,
141       * and accessed directly by workers. To simplify index-based
142       * operations, the array size is always a power of two, and all
143 <     * readers must tolerate null slots. Currently, all but the first
144 <     * worker thread creation is on-demand, triggered by task
145 <     * submissions, replacement of terminated workers, and/or
146 <     * compensation for blocked workers. However, all other support
147 <     * code is set up to work with other policies.
143 >     * readers must tolerate null slots. Currently, all worker thread
144 >     * creation is on-demand, triggered by task submissions,
145 >     * replacement of terminated workers, and/or compensation for
146 >     * blocked workers. However, all other support code is set up to
147 >     * work with other policies.
148       *
149       * 2. Bookkeeping for dynamically adding and removing workers. We
150       * maintain a given level of parallelism (or, if
# Line 157 | Line 157 | public class ForkJoinPool extends Abstra
157       * the total number.  These two values are packed into one field,
158       * "workerCounts" because we need accurate snapshots when deciding
159       * to create, resume or suspend.  To support these decisions,
160 <     * updates must be prospective (not retrospective).  For example,
161 <     * the running count is decremented before blocking by a thread
162 <     * about to block, but incremented by the thread about to unblock
163 <     * it. (In a few cases, these prospective updates may need to be
164 <     * rolled back, for example when deciding to create a new worker
165 <     * but the thread factory fails or returns null. In these cases,
166 <     * we are no worse off wrt other decisions than we would be
167 <     * otherwise.)  Updates to the workerCounts field sometimes
168 <     * transiently encounter a fair amount of contention when join
169 <     * dependencies are such that many threads block or unblock at
170 <     * about the same time. We alleviate this by sometimes bundling
171 <     * updates (for example blocking one thread on join and resuming a
172 <     * spare cancel each other out), and in most other cases
173 <     * performing an alternative action (like releasing waiters and
174 <     * finding spares; see below) as a more productive form of
175 <     * backoff.
160 >     * updates to spare counts must be prospective (not
161 >     * retrospective).  For example, the running count is decremented
162 >     * before blocking by a thread about to block as a spare, but
163 >     * incremented by the thread about to unblock it. Updates upon
164 >     * resumption ofr threads blocking in awaitJoin or awaitBlocker
165 >     * cannot usually be prospective, so the running count is in
166 >     * general an upper bound of the number of productively running
167 >     * threads Updates to the workerCounts field sometimes transiently
168 >     * encounter a fair amount of contention when join dependencies
169 >     * are such that many threads block or unblock at about the same
170 >     * time. We alleviate this by sometimes bundling updates (for
171 >     * example blocking one thread on join and resuming a spare cancel
172 >     * each other out), and in most other cases performing an
173 >     * alternative action like releasing waiters or locating spares.
174       *
175       * 3. Maintaining global run state. The run state of the pool
176       * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
# Line 221 | Line 219 | public class ForkJoinPool extends Abstra
219       * that only releases idle workers until it detects interference
220       * by other threads trying to release, and lets them take
221       * over. The net effect is a tree-like diffusion of signals, where
222 <     * released threads and possibly others) help with unparks.  To
222 >     * released threads (and possibly others) help with unparks.  To
223       * further reduce contention effects a bit, failed CASes to
224       * increment field eventCount are tolerated without retries.
225       * Conceptually they are merged into the same event, which is OK
# Line 238 | Line 236 | public class ForkJoinPool extends Abstra
236       * "extra" spare threads from normal "core" threads: On each call
237       * to preStep (the only point at which we can do this) a worker
238       * checks to see if there are now too many running workers, and if
239 <     * so, suspends itself.  Methods preJoin and doBlock look for
240 <     * suspended threads to resume before considering creating a new
241 <     * replacement. We don't need a special data structure to maintain
242 <     * spares; simply scanning the workers array looking for
239 >     * so, suspends itself.  Methods awaitJoin and awaitBlocker look
240 >     * for suspended threads to resume before considering creating a
241 >     * new replacement. We don't need a special data structure to
242 >     * maintain spares; simply scanning the workers array looking for
243       * worker.isSuspended() is fine because the calling thread is
244       * otherwise not doing anything useful anyway; we are at least as
245       * happy if after locating a spare, the caller doesn't actually
# Line 260 | Line 258 | public class ForkJoinPool extends Abstra
258       *
259       * 6. Deciding when to create new workers. The main dynamic
260       * control in this class is deciding when to create extra threads,
261 <     * in methods preJoin and doBlock. We always need to create one
262 <     * when the number of running threads becomes zero. But because
263 <     * blocked joins are typically dependent, we don't necessarily
264 <     * need or want one-to-one replacement. Using a one-to-one
265 <     * compensation rule often leads to enough useless overhead
266 <     * creating, suspending, resuming, and/or killing threads to
267 <     * signficantly degrade throughput.  We use a rule reflecting the
268 <     * idea that, the more spare threads you already have, the more
269 <     * evidence you need to create another one; where "evidence" is
270 <     * expressed as the current deficit -- target minus running
261 >     * in methods awaitJoin and awaitBlocker. We always
262 >     * need to create one when the number of running threads becomes
263 >     * zero. But because blocked joins are typically dependent, we
264 >     * don't necessarily need or want one-to-one replacement. Using a
265 >     * one-to-one compensation rule often leads to enough useless
266 >     * overhead creating, suspending, resuming, and/or killing threads
267 >     * to signficantly degrade throughput.  We use a rule reflecting
268 >     * the idea that, the more spare threads you already have, the
269 >     * more evidence you need to create another one. The "evidence"
270 >     * here takes two forms: (1) Using a creation threshold expressed
271 >     * in terms of the current deficit -- target minus running
272       * threads. To reduce flickering and drift around target values,
273       * the relation is quadratic: adding a spare if (dc*dc)>=(sc*pc)
274       * (where dc is deficit, sc is number of spare threads and pc is
275 <     * target parallelism.)  This effectively reduces churn at the
276 <     * price of systematically undershooting target parallelism when
277 <     * many threads are blocked.  However, biasing toward undeshooting
278 <     * partially compensates for the above mechanics to suspend extra
279 <     * threads, that normally lead to overshoot because we can only
280 <     * suspend workers in-between top-level actions. It also better
281 <     * copes with the fact that some of the methods in this class tend
282 <     * to never become compiled (but are interpreted), so some
283 <     * components of the entire set of controls might execute many
284 <     * times faster than others. And similarly for cases where the
285 <     * apparent lack of work is just due to GC stalls and other
275 >     * target parallelism.)  (2) Using a form of adaptive
276 >     * spionning. requiring a number of threshold checks proportional
277 >     * to the number of spare threads.  This effectively reduces churn
278 >     * at the price of systematically undershooting target parallelism
279 >     * when many threads are blocked.  However, biasing toward
280 >     * undeshooting partially compensates for the above mechanics to
281 >     * suspend extra threads, that normally lead to overshoot because
282 >     * we can only suspend workers in-between top-level actions. It
283 >     * also better copes with the fact that some of the methods in
284 >     * this class tend to never become compiled (but are interpreted),
285 >     * so some components of the entire set of controls might execute
286 >     * many times faster than others. And similarly for cases where
287 >     * the apparent lack of work is just due to GC stalls and other
288       * transient system activity.
289       *
290       * 7. Maintaining other configuration parameters and monitoring
# Line 485 | Line 486 | public class ForkJoinPool extends Abstra
486      private static final int ONE_TOTAL          = 1 << TOTAL_COUNT_SHIFT;
487  
488      /*
489 <     * Fields parallelism. maxPoolSize, locallyFifo,
490 <     * maintainsParallelism, and ueh are non-volatile, but external
491 <     * reads/writes use workerCount fences to ensure visability.
489 >     * Fields parallelism. maxPoolSize, and maintainsParallelism are
490 >     * non-volatile, but external reads/writes use workerCount fences
491 >     * to ensure visability.
492       */
493  
494      /**
# Line 504 | Line 505 | public class ForkJoinPool extends Abstra
505       * True if use local fifo, not default lifo, for local polling
506       * Replicated by ForkJoinWorkerThreads
507       */
508 <    private boolean locallyFifo;
508 >    private volatile boolean locallyFifo;
509  
510      /**
511       * Controls whether to add spares to maintain parallelism
# Line 515 | Line 516 | public class ForkJoinPool extends Abstra
516       * The uncaught exception handler used when any worker
517       * abruptly terminates
518       */
519 <    private Thread.UncaughtExceptionHandler ueh;
519 >    private volatile Thread.UncaughtExceptionHandler ueh;
520  
521      /**
522       * Pool number, just for assigning useful names to worker threads
# Line 526 | Line 527 | public class ForkJoinPool extends Abstra
527  
528      /**
529       * Adds delta to running count.  Used mainly by ForkJoinTask.
529     *
530     * @param delta the number to add
530       */
531      final void updateRunningCount(int delta) {
532          int wc;
# Line 537 | Line 536 | public class ForkJoinPool extends Abstra
536      }
537  
538      /**
539 +     * Decrements running count unless already zero
540 +     */
541 +    final boolean tryDecrementRunningCount() {
542 +        int wc = workerCounts;
543 +        if ((wc & RUNNING_COUNT_MASK) == 0)
544 +            return false;
545 +        return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
546 +                                        wc, wc - ONE_RUNNING);
547 +    }
548 +
549 +    /**
550       * Write fence for user modifications of pool parameters
551       * (parallelism. etc).  Note that it doesn't matter if CAS fails.
552       */
# Line 602 | Line 612 | public class ForkJoinPool extends Abstra
612          lock.lock();
613          try {
614              ForkJoinWorkerThread[] ws = workers;
615 <            int len = ws.length;
616 <            if (k < 0 || k >= len || ws[k] != null) {
617 <                for (k = 0; k < len && ws[k] != null; ++k)
615 >            int nws = ws.length;
616 >            if (k < 0 || k >= nws || ws[k] != null) {
617 >                for (k = 0; k < nws && ws[k] != null; ++k)
618                      ;
619 <                if (k == len)
620 <                    ws = Arrays.copyOf(ws, len << 1);
619 >                if (k == nws)
620 >                    ws = Arrays.copyOf(ws, nws << 1);
621              }
622              ws[k] = w;
623              workers = ws; // volatile array write ensures slot visibility
# Line 661 | Line 671 | public class ForkJoinPool extends Abstra
671       * Adjusts counts upon failure to create worker
672       */
673      private void onWorkerCreationFailure() {
674 <        int c;
675 <        do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
676 <                                               c = workerCounts,
677 <                                               c - (ONE_RUNNING|ONE_TOTAL)));
674 >        for (;;) {
675 >            int wc = workerCounts;
676 >            if ((wc >>> TOTAL_COUNT_SHIFT) > 0 &&
677 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
678 >                                         wc, wc - (ONE_RUNNING|ONE_TOTAL)))
679 >                break;
680 >        }
681          tryTerminate(false); // in case of failure during shutdown
682      }
683  
# Line 674 | Line 687 | public class ForkJoinPool extends Abstra
687       */
688      private void ensureEnoughTotalWorkers() {
689          int wc;
690 <        while (runState < TERMINATING &&
691 <               ((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < parallelism) {
690 >        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < parallelism &&
691 >               runState < TERMINATING) {
692              if ((UNSAFE.compareAndSwapInt(this, workerCountsOffset,
693                                            wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
694                   addWorker() == null))
# Line 698 | Line 711 | public class ForkJoinPool extends Abstra
711          }
712          forgetWorker(w);
713  
714 <        // decrement total count, and if was running, running count
715 <        int unit = w.isTrimmed()? ONE_TOTAL : (ONE_RUNNING|ONE_TOTAL);
716 <        int wc;
717 <        do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
718 <                                               wc = workerCounts, wc - unit));
714 >        // Decrement total count, and if was running, running count
715 >        // Spin (waiting for other updates) if either would be negative
716 >        int nr = w.isTrimmed() ? 0 : ONE_RUNNING;
717 >        int unit = ONE_TOTAL + nr;
718 >        for (;;) {
719 >            int wc = workerCounts;
720 >            int rc = wc & RUNNING_COUNT_MASK;
721 >            if (rc - nr < 0 || (wc >>> TOTAL_COUNT_SHIFT) == 0)
722 >                Thread.yield(); // back off if waiting for other updates
723 >            else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
724 >                                              wc, wc - unit))
725 >                break;
726 >        }
727  
728          accumulateStealCount(w); // collect final count
729          if (!tryTerminate(false))
# Line 841 | Line 862 | public class ForkJoinPool extends Abstra
862  
863      /**
864       * Adjusts counts and creates or resumes compensating threads for
865 <     * a worker about to block on task joinMe, returning early if
866 <     * joinMe becomes ready. First tries resuming an existing spare
867 <     * (which usually also avoids any count adjustment), but must then
868 <     * decrement running count to determine whether a new thread is
869 <     * needed. See above for fuller explanation.
865 >     * a worker blocking on task joinMe.  First tries resuming an
866 >     * existing spare (which usually also avoids any count
867 >     * adjustment), but must then decrement running count to determine
868 >     * whether a new thread is needed. See above for fuller
869 >     * explanation. This code is sprawled out non-modularly mainly
870 >     * because adaptive spinning works best if the entire method is
871 >     * either interpreted or compiled vs having only some pieces of it
872 >     * compiled.
873 >     *
874 >     * @param joinMe the task to join
875 >     * @return task status on exit (to simplify usage by callers)
876       */
877 <    final void preJoin(ForkJoinTask<?> joinMe) {
878 <        boolean dec = false;       // true when running count decremented
879 <        for (;;) {
880 <            releaseWaiters();      // help other threads progress
877 >    final int awaitJoin(ForkJoinTask<?> joinMe) {
878 >        int pc = parallelism;
879 >        boolean adj = false;        // true when running count adjusted
880 >        int scans = 0;
881  
882 <            if (joinMe.status < 0) // surround spare search with done checks
856 <                return;
882 >        while (joinMe.status >= 0) {
883              ForkJoinWorkerThread spare = null;
884 <            for (ForkJoinWorkerThread w : workers) {
885 <                if (w != null && w.isSuspended()) {
886 <                    spare = w;
887 <                    break;
884 >            if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
885 >                ForkJoinWorkerThread[] ws = workers;
886 >                int nws = ws.length;
887 >                for (int i = 0; i < nws; ++i) {
888 >                    ForkJoinWorkerThread w = ws[i];
889 >                    if (w != null && w.isSuspended()) {
890 >                        spare = w;
891 >                        break;
892 >                    }
893                  }
894 +                if (joinMe.status < 0)
895 +                    break;
896              }
897 <            if (joinMe.status < 0)
898 <                return;
899 <
900 <            if (spare != null && spare.tryUnsuspend()) {
901 <                if (dec || joinMe.requestSignal() < 0) {
897 >            int wc = workerCounts;
898 >            int rc = wc & RUNNING_COUNT_MASK;
899 >            int dc = pc - rc;
900 >            if (dc > 0 && spare != null && spare.tryUnsuspend()) {
901 >                if (adj) {
902                      int c;
903 <                    do {} while (!UNSAFE.compareAndSwapInt(this,
904 <                                                           workerCountsOffset,
905 <                                                           c = workerCounts,
906 <                                                           c + ONE_RUNNING));
907 <                } // else no net count change
903 >                    do {} while (!UNSAFE.compareAndSwapInt
904 >                                 (this, workerCountsOffset,
905 >                                  c = workerCounts, c + ONE_RUNNING));
906 >                }
907 >                adj = true;
908                  LockSupport.unpark(spare);
876                return;
877            }
878
879            int wc = workerCounts; // decrement running count
880            if (!dec && (wc & RUNNING_COUNT_MASK) != 0 &&
881                (dec = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
882                                                wc, wc -= ONE_RUNNING)) &&
883                joinMe.requestSignal() < 0) { // cannot block
884                int c;                        // back out
885                do {} while (!UNSAFE.compareAndSwapInt(this,
886                                                       workerCountsOffset,
887                                                       c = workerCounts,
888                                                       c + ONE_RUNNING));
889                return;
909              }
910 <
911 <            if (dec) {
910 >            else if (adj) {
911 >                if (dc <= 0)
912 >                    break;
913                  int tc = wc >>> TOTAL_COUNT_SHIFT;
914 <                int pc = parallelism;
915 <                int dc = pc - (wc & RUNNING_COUNT_MASK); // deficit count
916 <                if ((dc < pc && (dc <= 0 || (dc * dc < (tc - pc) * pc) ||
917 <                                 !maintainsParallelism)) ||
918 <                    tc >= maxPoolSize) // cannot add
919 <                    return;
920 <                if (spare == null &&
921 <                    UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
922 <                                             wc + (ONE_RUNNING|ONE_TOTAL))) {
923 <                    addWorker();
904 <                    return;
914 >                if (scans > tc) {
915 >                    int ts = (tc - pc) * pc;
916 >                    if (rc != 0 &&  (dc * dc < ts || !maintainsParallelism))
917 >                        break;
918 >                    if (scans > ts && tc < maxPoolSize &&
919 >                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
920 >                                                 wc+(ONE_RUNNING|ONE_TOTAL))){
921 >                        addWorker();
922 >                        break;
923 >                    }
924                  }
925              }
926 +            else if (rc != 0)
927 +                adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
928 +                                                wc, wc - ONE_RUNNING);
929 +            if ((scans++ & 1) == 0)
930 +                releaseWaiters();   // help others progress
931 +            else
932 +                Thread.yield();     // avoid starving productive threads
933 +        }
934 +
935 +        if (adj) {
936 +            joinMe.internalAwaitDone();
937 +            int c;
938 +            do {} while (!UNSAFE.compareAndSwapInt
939 +                         (this, workerCountsOffset,
940 +                          c = workerCounts, c + ONE_RUNNING));
941          }
942 +        return joinMe.status;
943      }
944  
945      /**
946 <     * Same idea as preJoin but with too many differing details to
912 <     * integrate: There are no task-based signal counts, and only one
913 <     * way to do the actual blocking. So for simplicity it is directly
914 <     * incorporated into this method.
946 >     * Same idea as awaitJoin
947       */
948 <    final void doBlock(ManagedBlocker blocker, boolean maintainPar)
948 >    final void awaitBlocker(ManagedBlocker blocker, boolean maintainPar)
949          throws InterruptedException {
950 <        maintainPar &= maintainsParallelism; // override
951 <        boolean dec = false;
952 <        boolean done = false;
950 >        maintainPar &= maintainsParallelism;
951 >        int pc = parallelism;
952 >        boolean adj = false;        // true when running count adjusted
953 >        int scans = 0;
954 >        boolean done;
955 >
956          for (;;) {
922            releaseWaiters();
957              if (done = blocker.isReleasable())
958                  break;
959              ForkJoinWorkerThread spare = null;
960 <            for (ForkJoinWorkerThread w : workers) {
961 <                if (w != null && w.isSuspended()) {
962 <                    spare = w;
963 <                    break;
960 >            if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
961 >                ForkJoinWorkerThread[] ws = workers;
962 >                int nws = ws.length;
963 >                for (int i = 0; i < nws; ++i) {
964 >                    ForkJoinWorkerThread w = ws[i];
965 >                    if (w != null && w.isSuspended()) {
966 >                        spare = w;
967 >                        break;
968 >                    }
969                  }
970 +                if (done = blocker.isReleasable())
971 +                    break;
972              }
973 <            if (done = blocker.isReleasable())
974 <                break;
975 <            if (spare != null && spare.tryUnsuspend()) {
976 <                if (dec) {
973 >            int wc = workerCounts;
974 >            int rc = wc & RUNNING_COUNT_MASK;
975 >            int dc = pc - rc;
976 >            if (dc > 0 && spare != null && spare.tryUnsuspend()) {
977 >                if (adj) {
978                      int c;
979 <                    do {} while (!UNSAFE.compareAndSwapInt(this,
980 <                                                           workerCountsOffset,
981 <                                                           c = workerCounts,
940 <                                                           c + ONE_RUNNING));
979 >                    do {} while (!UNSAFE.compareAndSwapInt
980 >                                 (this, workerCountsOffset,
981 >                                  c = workerCounts, c + ONE_RUNNING));
982                  }
983 +                adj = true;
984                  LockSupport.unpark(spare);
943                break;
985              }
986 <            int wc = workerCounts;
987 <            if (!dec && (wc & RUNNING_COUNT_MASK) != 0)
947 <                dec = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
948 <                                               wc, wc -= ONE_RUNNING);
949 <            if (dec) {
950 <                int tc = wc >>> TOTAL_COUNT_SHIFT;
951 <                int pc = parallelism;
952 <                int dc = pc - (wc & RUNNING_COUNT_MASK);
953 <                if ((dc < pc && (dc <= 0 || (dc * dc < (tc - pc) * pc) ||
954 <                                 !maintainPar)) ||
955 <                    tc >= maxPoolSize)
956 <                    break;
957 <                if (spare == null &&
958 <                    UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
959 <                                             wc + (ONE_RUNNING|ONE_TOTAL))){
960 <                    addWorker();
986 >            else if (adj) {
987 >                if (dc <= 0)
988                      break;
989 +                int tc = wc >>> TOTAL_COUNT_SHIFT;
990 +                if (scans > tc) {
991 +                    int ts = (tc - pc) * pc;
992 +                    if (rc != 0 &&  (dc * dc < ts || !maintainPar))
993 +                        break;
994 +                    if (scans > ts && tc < maxPoolSize &&
995 +                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
996 +                                                 wc+(ONE_RUNNING|ONE_TOTAL))){
997 +                        addWorker();
998 +                        break;
999 +                    }
1000                  }
1001              }
1002 +            else if (rc != 0)
1003 +                adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
1004 +                                                wc, wc - ONE_RUNNING);
1005 +            if ((++scans & 1) == 0)
1006 +                releaseWaiters();   // help others progress
1007 +            else
1008 +                Thread.yield();     // avoid starving productive threads
1009          }
1010  
1011          try {
1012              if (!done)
1013                  do {} while (!blocker.isReleasable() && !blocker.block());
1014          } finally {
1015 <            if (dec) {
1015 >            if (adj) {
1016                  int c;
1017 <                do {} while (!UNSAFE.compareAndSwapInt(this,
1018 <                                                       workerCountsOffset,
1019 <                                                       c = workerCounts,
975 <                                                       c + ONE_RUNNING));
1017 >                do {} while (!UNSAFE.compareAndSwapInt
1018 >                             (this, workerCountsOffset,
1019 >                              c = workerCounts, c + ONE_RUNNING));
1020              }
1021          }
1022      }
1023  
1024      /**
1025       * Unless there are not enough other running threads, adjusts
1026 <     * counts for a a worker in performing helpJoin that cannot find
1027 <     * any work, so that this worker can now block.
1026 >     * counts and blocks a worker performing helpJoin that cannot find
1027 >     * any work.
1028       *
1029 <     * @return true if worker may block
1029 >     * @return true if joinMe now done
1030       */
1031 <    final boolean preBlockHelpingJoin(ForkJoinTask<?> joinMe) {
1032 <        while (joinMe.status >= 0) {
1033 <            releaseWaiters(); // help other threads progress
1034 <
1035 <            // if a spare exists, resume it to maintain parallelism level
1036 <            if ((workerCounts & RUNNING_COUNT_MASK) <= parallelism) {
1037 <                ForkJoinWorkerThread spare = null;
1038 <                for (ForkJoinWorkerThread w : workers) {
1031 >    final boolean tryAwaitBusyJoin(ForkJoinTask<?> joinMe) {
1032 >        int pc = parallelism;
1033 >        outer:for (;;) {
1034 >            releaseWaiters();
1035 >            if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
1036 >                ForkJoinWorkerThread[] ws = workers;
1037 >                int nws = ws.length;
1038 >                for (int i = 0; i < nws; ++i) {
1039 >                    ForkJoinWorkerThread w = ws[i];
1040                      if (w != null && w.isSuspended()) {
1041 <                        spare = w;
1042 <                        break;
1043 <                    }
1044 <                }
1045 <                if (joinMe.status < 0)
1046 <                    break;
1047 <                if (spare != null) {
1003 <                    if (spare.tryUnsuspend()) {
1004 <                        boolean canBlock = true;
1005 <                        if (joinMe.requestSignal() < 0) {
1006 <                            canBlock = false; // already done
1007 <                            int c;
1008 <                            do {} while (!UNSAFE.compareAndSwapInt
1009 <                                         (this, workerCountsOffset,
1010 <                                          c = workerCounts, c + ONE_RUNNING));
1041 >                        if (joinMe.status < 0)
1042 >                            return true;
1043 >                        if ((workerCounts & RUNNING_COUNT_MASK) > pc)
1044 >                            break;
1045 >                        if (w.tryUnsuspend()) {
1046 >                            LockSupport.unpark(w);
1047 >                            break outer;
1048                          }
1049 <                        LockSupport.unpark(spare);
1013 <                        return canBlock;
1049 >                        continue outer;
1050                      }
1015                    continue; // recheck -- another spare may exist
1051                  }
1052              }
1053 <
1054 <            int wc = workerCounts; // reread to shorten CAS window
1055 <            int rc = wc & RUNNING_COUNT_MASK;
1056 <            if (rc <= 2) // keep this and at most one other thread alive
1057 <                break;
1058 <
1053 >            if (joinMe.status < 0)
1054 >                return true;
1055 >            int wc = workerCounts;
1056 >            if ((wc & RUNNING_COUNT_MASK) <= 2 ||
1057 >                (wc >>> TOTAL_COUNT_SHIFT) < pc)
1058 >                return false;  // keep this thread alive
1059              if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1060 <                                         wc, wc - ONE_RUNNING)) {
1026 <                if (joinMe.requestSignal() >= 0)
1027 <                    return true;
1028 <                int c;                        // back out
1029 <                do {} while (!UNSAFE.compareAndSwapInt
1030 <                             (this, workerCountsOffset,
1031 <                              c = workerCounts, c + ONE_RUNNING));
1060 >                                         wc, wc - ONE_RUNNING))
1061                  break;
1033            }
1062          }
1063 <        return false;
1063 >
1064 >        joinMe.internalAwaitDone();
1065 >        int c;
1066 >        do {} while (!UNSAFE.compareAndSwapInt
1067 >                     (this, workerCountsOffset,
1068 >                      c = workerCounts, c + ONE_RUNNING));
1069 >        return true;
1070      }
1071  
1072      /**
# Line 1065 | Line 1099 | public class ForkJoinPool extends Abstra
1099       * Actions on transition to TERMINATING
1100       */
1101      private void startTerminating() {
1102 <        // Clear out and cancel submissions, ignoring exceptions
1102 >        for (int i = 0; i < 2; ++i) { // twice to mop up newly created workers
1103 >            cancelSubmissions();
1104 >            shutdownWorkers();
1105 >            cancelWorkerTasks();
1106 >            advanceEventCount();
1107 >            releaseWaiters();
1108 >            interruptWorkers();
1109 >        }
1110 >    }
1111 >
1112 >    /**
1113 >     * Clear out and cancel submissions, ignoring exceptions
1114 >     */
1115 >    private void cancelSubmissions() {
1116          ForkJoinTask<?> task;
1117          while ((task = submissionQueue.poll()) != null) {
1118              try {
# Line 1073 | Line 1120 | public class ForkJoinPool extends Abstra
1120              } catch (Throwable ignore) {
1121              }
1122          }
1123 <        // Propagate run level
1124 <        for (ForkJoinWorkerThread w : workers) {
1123 >    }
1124 >
1125 >    /**
1126 >     * Sets all worker run states to at least shutdown,
1127 >     * also resuming suspended workers
1128 >     */
1129 >    private void shutdownWorkers() {
1130 >        ForkJoinWorkerThread[] ws = workers;
1131 >        int nws = ws.length;
1132 >        for (int i = 0; i < nws; ++i) {
1133 >            ForkJoinWorkerThread w = ws[i];
1134              if (w != null)
1135 <                w.shutdown();    // also resumes suspended workers
1135 >                w.shutdown();
1136          }
1137 <        // Ensure no straggling local tasks
1138 <        for (ForkJoinWorkerThread w : workers) {
1137 >    }
1138 >
1139 >    /**
1140 >     * Clears out and cancels all locally queued tasks
1141 >     */
1142 >    private void cancelWorkerTasks() {
1143 >        ForkJoinWorkerThread[] ws = workers;
1144 >        int nws = ws.length;
1145 >        for (int i = 0; i < nws; ++i) {
1146 >            ForkJoinWorkerThread w = ws[i];
1147              if (w != null)
1148                  w.cancelTasks();
1149          }
1150 <        // Wake up idle workers
1151 <        advanceEventCount();
1152 <        releaseWaiters();
1153 <        // Unstick pending joins
1154 <        for (ForkJoinWorkerThread w : workers) {
1150 >    }
1151 >
1152 >    /**
1153 >     * Unsticks all workers blocked on joins etc
1154 >     */
1155 >    private void interruptWorkers() {
1156 >        ForkJoinWorkerThread[] ws = workers;
1157 >        int nws = ws.length;
1158 >        for (int i = 0; i < nws; ++i) {
1159 >            ForkJoinWorkerThread w = ws[i];
1160              if (w != null && !w.isTerminated()) {
1161                  try {
1162                      w.interrupt();
# Line 1213 | Line 1282 | public class ForkJoinPool extends Abstra
1282          this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1283          this.workerLock = new ReentrantLock();
1284          this.terminationLatch = new CountDownLatch(1);
1216        // Start first worker; remaining workers added upon first submission
1217        workerCounts = ONE_RUNNING | ONE_TOTAL;
1218        addWorker();
1285      }
1286  
1287      /**
# Line 1245 | Line 1311 | public class ForkJoinPool extends Abstra
1311          submissionQueue.offer(task);
1312          advanceEventCount();
1313          releaseWaiters();
1314 <        if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
1249 <            ensureEnoughTotalWorkers();
1314 >        ensureEnoughTotalWorkers();
1315      }
1316  
1317      /**
# Line 1404 | Line 1469 | public class ForkJoinPool extends Abstra
1469      public Thread.UncaughtExceptionHandler
1470          setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
1471          checkPermission();
1407        workerCountReadFence();
1472          Thread.UncaughtExceptionHandler old = ueh;
1473          if (h != old) {
1474              ueh = h;
1475 <            workerCountWriteFence();
1476 <            for (ForkJoinWorkerThread w : workers) {
1475 >            ForkJoinWorkerThread[] ws = workers;
1476 >            int nws = ws.length;
1477 >            for (int i = 0; i < nws; ++i) {
1478 >                ForkJoinWorkerThread w = ws[i];
1479                  if (w != null)
1480                      w.setUncaughtExceptionHandler(h);
1481              }
# Line 1438 | Line 1504 | public class ForkJoinPool extends Abstra
1504              this.parallelism = parallelism;
1505              workerCountWriteFence();
1506              // Release spares. If too many, some will die after re-suspend
1507 <            for (ForkJoinWorkerThread w : workers) {
1507 >            ForkJoinWorkerThread[] ws = workers;
1508 >            int nws = ws.length;
1509 >            for (int i = 0; i < nws; ++i) {
1510 >                ForkJoinWorkerThread w = ws[i];
1511                  if (w != null && w.tryUnsuspend()) {
1512 <                    updateRunningCount(1);
1512 >                    int c;
1513 >                    do {} while (!UNSAFE.compareAndSwapInt
1514 >                                 (this, workerCountsOffset,
1515 >                                  c = workerCounts, c + ONE_RUNNING));
1516                      LockSupport.unpark(w);
1517                  }
1518              }
# Line 1550 | Line 1622 | public class ForkJoinPool extends Abstra
1622          if (oldMode != async) {
1623              locallyFifo = async;
1624              workerCountWriteFence();
1625 <            for (ForkJoinWorkerThread w : workers) {
1625 >            ForkJoinWorkerThread[] ws = workers;
1626 >            int nws = ws.length;
1627 >            for (int i = 0; i < nws; ++i) {
1628 >                ForkJoinWorkerThread w = ws[i];
1629                  if (w != null)
1630                      w.setAsyncMode(async);
1631              }
# Line 1635 | Line 1710 | public class ForkJoinPool extends Abstra
1710       */
1711      public long getQueuedTaskCount() {
1712          long count = 0;
1713 <        for (ForkJoinWorkerThread w : workers) {
1713 >        ForkJoinWorkerThread[] ws = workers;
1714 >        int nws = ws.length;
1715 >        for (int i = 0; i < nws; ++i) {
1716 >            ForkJoinWorkerThread w = ws[i];
1717              if (w != null)
1718                  count += w.getQueueSize();
1719          }
# Line 1693 | Line 1771 | public class ForkJoinPool extends Abstra
1771       */
1772      protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1773          int n = submissionQueue.drainTo(c);
1774 <        for (ForkJoinWorkerThread w : workers) {
1774 >        ForkJoinWorkerThread[] ws = workers;
1775 >        int nws = ws.length;
1776 >        for (int i = 0; i < nws; ++i) {
1777 >            ForkJoinWorkerThread w = ws[i];
1778              if (w != null)
1779                  n += w.drainTasksTo(c);
1780          }
# Line 1906 | Line 1987 | public class ForkJoinPool extends Abstra
1987          Thread t = Thread.currentThread();
1988          if (t instanceof ForkJoinWorkerThread)
1989              ((ForkJoinWorkerThread) t).pool.
1990 <                doBlock(blocker, maintainParallelism);
1990 >                awaitBlocker(blocker, maintainParallelism);
1991          else
1992              awaitBlocker(blocker);
1993      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines