ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.41 by dl, Tue Aug 17 18:30:33 2010 UTC vs.
Revision 1.50 by dl, Fri Sep 17 14:24:56 2010 UTC

# Line 110 | Line 110 | public class ForkJoinWorkerThread extend
110       * idea).  (4) We bound the number of attempts to find work (see
111       * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112       * necessary replacing it with a spare (see
113 <     * ForkJoinPool.tryAwaitJoin).
113 >     * ForkJoinPool.awaitJoin).
114       *
115       * Efficient implementation of these algorithms currently relies
116       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 164 | Line 164 | public class ForkJoinWorkerThread extend
164      private static final int MAX_HELP_DEPTH = 8;
165  
166      /**
167     * The wakeup interval (in nanoseconds) for the oldest worker
168     * suspended as spare.  On each wakeup not signalled by a
169     * resumption, it may ask the pool to reduce the number of spares.
170     */
171    private static final long TRIM_RATE_NANOS =
172        5L * 1000L * 1000L * 1000L; // 5sec
173
174    /**
167       * Capacity of work-stealing queue array upon initialization.
168       * Must be a power of two. Initial size must be at least 4, but is
169       * padded to minimize cache effects.
# Line 180 | Line 172 | public class ForkJoinWorkerThread extend
172  
173      /**
174       * Maximum work-stealing queue array size.  Must be less than or
175 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
176 <     * is less than usual bounds, because we need leftshift by 3
177 <     * to be in int range).
175 >     * equal to 1 << (31 - width of array entry) to ensure lack of
176 >     * index wraparound. The value is set in the static block
177 >     * at the end of this file after obtaining width.
178       */
179 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
179 >    private static final int MAXIMUM_QUEUE_CAPACITY;
180  
181      /**
182       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 226 | Line 218 | public class ForkJoinWorkerThread extend
218       * tracks if this worker is suspended as a spare, and if it was
219       * killed (trimmed) while suspended. However, "active" status is
220       * maintained separately and modified only in conjunction with
221 <     * CASes of the pool's runState (which are currently sadly manually
222 <     * inlined for performance.)
221 >     * CASes of the pool's runState (which are currently sadly
222 >     * manually inlined for performance.)  Accessed directly by pool
223 >     * to simplify checks for normal (zero) status.
224       */
225 <    private volatile int runState;
225 >    volatile int runState;
226  
227      private static final int TERMINATING = 0x01;
228      private static final int TERMINATED  = 0x02;
# Line 237 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
233 <     * Number of steals, transferred and reset in pool callbacks pool
234 <     * when idle Accessed directly by pool.
233 >     * Number of steals. Directly accessed (and reset) by
234 >     * pool.tryAccumulateStealCount when idle.
235       */
236      int stealCount;
237  
# Line 274 | Line 267 | public class ForkJoinWorkerThread extend
267      int lastEventCount;
268  
269      /**
270 <     * Encoded index and event count of next event waiter. Used only
271 <     * by ForkJoinPool for managing event waiters.
270 >     * Encoded index and event count of next event waiter. Accessed
271 >     * only by ForkJoinPool for managing event waiters.
272       */
273      volatile long nextWaiter;
274  
275      /**
276 <     * Number of times this thread suspended as spare
276 >     * Number of times this thread suspended as spare. Accessed only
277 >     * by pool.
278       */
279      int spareCount;
280  
281      /**
282 <     * Encoded index and count of next spare waiter. Used only
282 >     * Encoded index and count of next spare waiter. Accessed only
283       * by ForkJoinPool for managing spares.
284       */
285      volatile int nextSpare;
286  
287      /**
288       * The task currently being joined, set only when actively trying
289 <     * to helpStealer. Written only by current thread, but read by
290 <     * others.
289 >     * to help other stealers in helpJoinTask. Written only by this
290 >     * thread, but read by others.
291       */
292      private volatile ForkJoinTask<?> currentJoin;
293  
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Not volatile because always read/written in
297 <     * presence of related volatiles in those cases where it matters.
296 >     * submission queue).  Written only by this thread, but read by
297 >     * others.
298       */
299 <    private ForkJoinTask<?> currentSteal;
299 >    private volatile ForkJoinTask<?> currentSteal;
300  
301      /**
302       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 319 | Line 313 | public class ForkJoinWorkerThread extend
313      }
314  
315      /**
316 <     * Performs additional initialization and starts this thread
316 >     * Performs additional initialization and starts this thread.
317       */
318      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
319          this.poolIndex = poolIndex;
# Line 355 | Line 349 | public class ForkJoinWorkerThread extend
349      /**
350       * Initializes internal state after construction but before
351       * processing any tasks. If you override this method, you must
352 <     * invoke super.onStart() at the beginning of the method.
352 >     * invoke @code{super.onStart()} at the beginning of the method.
353       * Initialization requires care: Most fields must have legal
354       * default values, to ensure that attempted accesses from other
355       * threads work correctly even before this thread starts
# Line 387 | Line 381 | public class ForkJoinWorkerThread extend
381              if (active) {
382                  int a; // inline p.tryDecrementActiveCount
383                  active = false;
384 <                do {} while(!UNSAFE.compareAndSwapInt
385 <                            (p, poolRunStateOffset, a = p.runState, a - 1));
384 >                do {} while (!UNSAFE.compareAndSwapInt
385 >                             (p, poolRunStateOffset, a = p.runState, a - 1));
386              }
387              cancelTasks();
388              setTerminated();
# Line 422 | Line 416 | public class ForkJoinWorkerThread extend
416      // helpers for run()
417  
418      /**
419 <     * Find and execute tasks and check status while running
419 >     * Finds and executes tasks, and checks status while running.
420       */
421      private void mainLoop() {
422 <        int misses = 0; // track consecutive times failed to find work; max 2
422 >        boolean ran = false; // true if ran a task on last step
423          ForkJoinPool p = pool;
424          for (;;) {
425 <            p.preStep(this, misses);
425 >            p.preStep(this, ran);
426              if (runState != 0)
427                  break;
428 <            misses = ((tryExecSteal() || tryExecSubmission()) ? 0 :
435 <                      (misses < 2 ? misses + 1 : 2));
428 >            ran = tryExecSteal() || tryExecSubmission();
429          }
430      }
431  
432      /**
433 <     * Try to steal a task and execute it
433 >     * Tries to steal a task and execute it.
434       *
435       * @return true if ran a task
436       */
437      private boolean tryExecSteal() {
438          ForkJoinTask<?> t;
439 <        if ((t  = scan()) != null) {
439 >        if ((t = scan()) != null) {
440              t.quietlyExec();
441 <            currentSteal = null;
441 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
442              if (sp != base)
443                  execLocalTasks();
444              return true;
# Line 454 | Line 447 | public class ForkJoinWorkerThread extend
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and run it;
450 >     * If a submission exists, try to activate and run it.
451       *
452       * @return true if ran a task
453       */
454      private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456 +        // This loop is needed in case attempt to activate fails, in
457 +        // which case we only retry if there still appears to be a
458 +        // submission.
459          while (p.hasQueuedSubmissions()) {
460              ForkJoinTask<?> t; int a;
461 <            if (active || // ugly/hacky: inline p.tryIncrementActiveCount
461 >            if (active || // inline p.tryIncrementActiveCount
462                  (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
463                                                     a = p.runState, a + 1))) {
464                  if ((t = p.pollSubmission()) != null) {
465 <                    currentSteal = t;
465 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
466                      t.quietlyExec();
467 <                    currentSteal = null;
467 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
468                      if (sp != base)
469                          execLocalTasks();
470                      return true;
# Line 484 | Line 480 | public class ForkJoinWorkerThread extend
480       */
481      private void execLocalTasks() {
482          while (runState == 0) {
483 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
483 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
484              if (t != null)
485                  t.quietlyExec();
486              else if (sp == base)
# Line 494 | Line 490 | public class ForkJoinWorkerThread extend
490  
491      /*
492       * Intrinsics-based atomic writes for queue slots. These are
493 <     * basically the same as methods in AtomicObjectArray, but
493 >     * basically the same as methods in AtomicReferenceArray, but
494       * specialized for (1) ForkJoinTask elements (2) requirement that
495       * nullness and bounds checks have already been performed by
496       * callers and (3) effective offsets are known not to overflow
497       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
498       * need corresponding version for reads: plain array reads are OK
499 <     * because they protected by other volatile reads and are
499 >     * because they are protected by other volatile reads and are
500       * confirmed by CASes.
501       *
502       * Most uses don't actually call these methods, but instead contain
# Line 524 | Line 520 | public class ForkJoinWorkerThread extend
520       * range. This method is used only during resets and backouts.
521       */
522      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
523 <                                              ForkJoinTask<?> t) {
523 >                                        ForkJoinTask<?> t) {
524          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
525      }
526  
# Line 569 | Line 565 | public class ForkJoinWorkerThread extend
565  
566      /**
567       * Tries to take a task from the base of own queue. Assumes active
568 <     * status.  Called only by current thread.
568 >     * status.  Called only by this thread.
569       *
570       * @return a task, or null if none
571       */
# Line 592 | Line 588 | public class ForkJoinWorkerThread extend
588  
589      /**
590       * Returns a popped task, or null if empty. Assumes active status.
591 <     * Called only by current thread.
591 >     * Called only by this thread.
592       */
593      private ForkJoinTask<?> popTask() {
594          ForkJoinTask<?>[] q = queue;
# Line 616 | Line 612 | public class ForkJoinWorkerThread extend
612  
613      /**
614       * Specialized version of popTask to pop only if topmost element
615 <     * is the given task. Called only by current thread while
620 <     * active.
615 >     * is the given task. Called only by this thread while active.
616       *
617       * @param t the task. Caller must ensure non-null.
618       */
# Line 627 | Line 622 | public class ForkJoinWorkerThread extend
622          if ((s = sp) != base && q != null &&
623              UNSAFE.compareAndSwapObject
624              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
625 <            sp = s;
625 >            sp = s; // putOrderedInt may encourage more timely write
626              // UNSAFE.putOrderedInt(this, spOffset, s);
627              return true;
628          }
# Line 635 | Line 630 | public class ForkJoinWorkerThread extend
630      }
631  
632      /**
633 <     * Returns next task or null if empty or contended
633 >     * Returns next task, or null if empty or contended.
634       */
635      final ForkJoinTask<?> peekTask() {
636          ForkJoinTask<?>[] q = queue;
# Line 677 | Line 672 | public class ForkJoinWorkerThread extend
672       * Computes next value for random victim probe in scan().  Scans
673       * don't require a very high quality generator, but also not a
674       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
675 <     * Note: This is manually inlined in scan()
675 >     * Note: This is manually inlined in scan().
676       */
677      private static final int xorShift(int r) {
678          r ^= r << 13;
# Line 716 | Line 711 | public class ForkJoinWorkerThread extend
711              for (;;) {
712                  ForkJoinWorkerThread v = ws[k & mask];
713                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
714 <                if (v != null && v.base != v.sp) {
715 <                    ForkJoinTask<?>[] q; int b, a;
716 <                    if ((canSteal ||      // Ugly/hacky: inline
717 <                         (canSteal = active =  // p.tryIncrementActiveCount
718 <                          UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
719 <                                                   a = p.runState, a + 1))) &&
720 <                        (q = v.queue) != null && (b = v.base) != v.sp) {
721 <                        int i = (q.length - 1) & b;
722 <                        long u = (i << qShift) + qBase; // raw offset
723 <                        ForkJoinTask<?> t = q[i];
724 <                        if (v.base == b && t != null &&
714 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
715 >                if (v != null && (b = v.base) != v.sp &&
716 >                    (q = v.queue) != null) {
717 >                    int i = (q.length - 1) & b;
718 >                    long u = (i << qShift) + qBase; // raw offset
719 >                    int pid = poolIndex;
720 >                    if ((t = q[i]) != null) {
721 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
722 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
723 >                                                     a = p.runState, a + 1))
724 >                            canSteal = active = true;
725 >                        if (canSteal && v.base == b++ &&
726                              UNSAFE.compareAndSwapObject(q, u, t, null)) {
727 <                            int pid = poolIndex;
732 <                            currentSteal = t;
727 >                            v.base = b;
728                              v.stealHint = pid;
729 <                            v.base = b + 1;
729 >                            UNSAFE.putOrderedObject(this,
730 >                                                    currentStealOffset, t);
731                              seed = r;
732                              ++stealCount;
733                              return t;
# Line 755 | Line 751 | public class ForkJoinWorkerThread extend
751  
752      // status check methods used mainly by ForkJoinPool
753      final boolean isRunning()     { return runState == 0; }
758    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
754      final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
755      final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
756      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
757  
758 +    final boolean isTerminating() {
759 +        if ((runState & TERMINATING) != 0)
760 +            return true;
761 +        if (pool.isAtLeastTerminating()) { // propagate pool state
762 +            shutdown();
763 +            return true;
764 +        }
765 +        return false;
766 +    }
767 +
768      /**
769       * Sets state to TERMINATING. Does NOT unpark or interrupt
770 <     * to wake up if currently blocked.
770 >     * to wake up if currently blocked. Callers must do so if desired.
771       */
772      final void shutdown() {
773          for (;;) {
# Line 782 | Line 787 | public class ForkJoinWorkerThread extend
787      }
788  
789      /**
790 <     * Sets state to TERMINATED. Called only by onTermination()
790 >     * Sets state to TERMINATED. Called only by onTermination().
791       */
792      private void setTerminated() {
793          int s;
# Line 793 | Line 798 | public class ForkJoinWorkerThread extend
798  
799      /**
800       * If suspended, tries to set status to unsuspended.
801 +     * Does NOT wake up if blocked.
802       *
803       * @return true if successful
804       */
# Line 824 | Line 830 | public class ForkJoinWorkerThread extend
830          }
831          ForkJoinPool p = pool;
832          p.pushSpare(this);
827        lastEventCount = 0;         // reset upon resume
833          while ((runState & SUSPENDED) != 0) {
834              if (p.tryAccumulateStealCount(this)) {
830                boolean untimed = nextSpare != 0;
831                long startTime = untimed? 0 : System.nanoTime();
835                  interrupted();          // clear/ignore interrupts
836                  if ((runState & SUSPENDED) == 0)
837                      break;
838 <                if (untimed)     // untimed
836 <                    LockSupport.park(this);
837 <                else {
838 <                    LockSupport.parkNanos(this, TRIM_RATE_NANOS);
839 <                    if ((runState & SUSPENDED) == 0)
840 <                        break;
841 <                    if (System.nanoTime() - startTime >= TRIM_RATE_NANOS)
842 <                        p.tryShutdownSpare();
843 <                }
838 >                LockSupport.park(this);
839              }
840          }
841      }
# Line 913 | Line 908 | public class ForkJoinWorkerThread extend
908              if (active ||
909                  (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
910                                                     a = p.runState, a + 1)))
911 <                return locallyFifo? locallyDeqTask() : popTask();
911 >                return locallyFifo ? locallyDeqTask() : popTask();
912          }
913          return null;
914      }
# Line 927 | Line 922 | public class ForkJoinWorkerThread extend
922          ForkJoinTask<?> t = pollLocalTask();
923          if (t == null) {
924              t = scan();
925 <            currentSteal = null; // cannot retain/track/help
925 >            // cannot retain/track/help steal
926 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
927          }
928          return t;
929      }
# Line 982 | Line 978 | public class ForkJoinWorkerThread extend
978      }
979  
980      /**
981 <     * Tries to locate and help perform tasks for a stealer of the
982 <     * given task, or in turn one of its stealers.  Traces
983 <     * currentSteal->currentJoin links looking for a thread working on
984 <     * a descendant of the given task and with a non-empty queue to
985 <     * steal back and execute tasks from.
981 >     * Unless terminating, tries to locate and help perform tasks for
982 >     * a stealer of the given task, or in turn one of its stealers.
983 >     * Traces currentSteal->currentJoin links looking for a thread
984 >     * working on a descendant of the given task and with a non-empty
985 >     * queue to steal back and execute tasks from.
986       *
987 <     * The implementation is very branchy to cope with the potential
987 >     * The implementation is very branchy to cope with potential
988       * inconsistencies or loops encountering chains that are stale,
989       * unknown, or of length greater than MAX_HELP_DEPTH links.  All
990       * of these cases are dealt with by just returning back to the
# Line 998 | Line 994 | public class ForkJoinWorkerThread extend
994       * @param joinMe the task to join
995       */
996      final void helpJoinTask(ForkJoinTask<?> joinMe) {
997 <        ForkJoinWorkerThread[] ws = pool.workers;
998 <        int n; // need at least 2 workers
999 <        if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) {
1000 <            ForkJoinTask<?> task = joinMe;        // base of chain
1001 <            ForkJoinWorkerThread thread = this;   // thread with stolen task
1002 <            for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1003 <                // Try to find v, the stealer of task, by first using hint
1004 <                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1005 <                if (v == null || v.currentSteal != task) {
1006 <                    for (int j = 0; ; ++j) {      // search array
1007 <                        if (j < n) {
1008 <                            if ((v = ws[j]) != null) {
1009 <                                if (task.status < 0)
1010 <                                    return;       // stale or done
1011 <                                if (v.currentSteal == task) {
1012 <                                    thread.stealHint = j;
1013 <                                    break;        // save hint for next time
1014 <                                }
997 >        ForkJoinWorkerThread[] ws;
998 >        int n;
999 >        if (joinMe.status < 0)                // already done
1000 >            return;
1001 >        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
1002 >            joinMe.cancelIgnoringExceptions();
1003 >            return;
1004 >        }
1005 >        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1006 >            return;                           // need at least 2 workers
1007 >
1008 >        ForkJoinTask<?> task = joinMe;        // base of chain
1009 >        ForkJoinWorkerThread thread = this;   // thread with stolen task
1010 >        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1011 >            // Try to find v, the stealer of task, by first using hint
1012 >            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1013 >            if (v == null || v.currentSteal != task) {
1014 >                for (int j = 0; ; ++j) {      // search array
1015 >                    if (j < n) {
1016 >                        ForkJoinTask<?> vs;
1017 >                        if ((v = ws[j]) != null &&
1018 >                            (vs = v.currentSteal) != null) {
1019 >                            if (joinMe.status < 0 || task.status < 0)
1020 >                                return;       // stale or done
1021 >                            if (vs == task) {
1022 >                                thread.stealHint = j;
1023 >                                break;        // save hint for next time
1024                              }
1025                          }
1021                        else
1022                            return;               // no stealer
1026                      }
1027 +                    else
1028 +                        return;               // no stealer
1029                  }
1030 <                // Try to help v, using specialized form of deqTask
1031 <                int b;
1032 <                ForkJoinTask<?>[] q;
1033 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1034 <                    int i = (q.length - 1) & b;
1035 <                    long u = (i << qShift) + qBase;
1036 <                    ForkJoinTask<?> t = q[i];
1037 <                    if (task.status < 0)
1038 <                        return;                   // stale or done
1039 <                    if (v.base == b) {
1040 <                        if (t == null)
1041 <                            return;               // producer stalled
1042 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1043 <                            if (joinMe.status < 0) {
1044 <                                UNSAFE.putObjectVolatile(q, u, t);
1045 <                                return;           // back out on cancel
1046 <                            }
1047 <                            int pid = poolIndex;
1048 <                            ForkJoinTask<?> prevSteal = currentSteal;
1049 <                            currentSteal = t;
1045 <                            v.stealHint = pid;
1046 <                            v.base = b + 1;
1047 <                            t.quietlyExec();
1048 <                            currentSteal = prevSteal;
1049 <                        }
1030 >            }
1031 >            for (;;) { // Try to help v, using specialized form of deqTask
1032 >                if (joinMe.status < 0)
1033 >                    return;
1034 >                int b = v.base;
1035 >                ForkJoinTask<?>[] q = v.queue;
1036 >                if (b == v.sp || q == null)
1037 >                    break;
1038 >                int i = (q.length - 1) & b;
1039 >                long u = (i << qShift) + qBase;
1040 >                ForkJoinTask<?> t = q[i];
1041 >                int pid = poolIndex;
1042 >                ForkJoinTask<?> ps = currentSteal;
1043 >                if (task.status < 0)
1044 >                    return;                   // stale or done
1045 >                if (t != null && v.base == b++ &&
1046 >                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1047 >                    if (joinMe.status < 0) {
1048 >                        UNSAFE.putObjectVolatile(q, u, t);
1049 >                        return;               // back out on cancel
1050                      }
1051 <                    if (joinMe.status < 0)
1052 <                        return;
1051 >                    v.base = b;
1052 >                    v.stealHint = pid;
1053 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1054 >                    t.quietlyExec();
1055 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1056                  }
1054                // Try to descend to find v's stealer
1055                ForkJoinTask<?> next = v.currentJoin;
1056                if (task.status < 0 || next == null || next == task ||
1057                    joinMe.status < 0)
1058                    return;
1059                task = next;
1060                thread = v;
1057              }
1058 +            // Try to descend to find v's stealer
1059 +            ForkJoinTask<?> next = v.currentJoin;
1060 +            if (task.status < 0 || next == null || next == task ||
1061 +                joinMe.status < 0)
1062 +                return;
1063 +            task = next;
1064 +            thread = v;
1065          }
1066      }
1067  
1068      /**
1069 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1070       * Returns an estimate of the number of tasks, offset by a
1071       * function of number of idle workers.
1072       *
# Line 1117 | Line 1121 | public class ForkJoinWorkerThread extend
1121       * Runs tasks until {@code pool.isQuiescent()}.
1122       */
1123      final void helpQuiescePool() {
1124 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1125          for (;;) {
1126              ForkJoinTask<?> t = pollLocalTask();
1127 <            if (t != null || (t = scan()) != null) {
1127 >            if (t != null || (t = scan()) != null)
1128                  t.quietlyExec();
1124                currentSteal = null;
1125            }
1129              else {
1130                  ForkJoinPool p = pool;
1131                  int a; // to inline CASes
# Line 1131 | Line 1134 | public class ForkJoinWorkerThread extend
1134                          (p, poolRunStateOffset, a = p.runState, a - 1))
1135                          continue;   // retry later
1136                      active = false; // inactivate
1137 +                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1138                  }
1139                  if (p.isQuiescent()) {
1140                      active = true; // re-activate
1141 <                    do {} while(!UNSAFE.compareAndSwapInt
1142 <                                (p, poolRunStateOffset, a = p.runState, a+1));
1141 >                    do {} while (!UNSAFE.compareAndSwapInt
1142 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1143                      return;
1144                  }
1145              }
# Line 1165 | Line 1169 | public class ForkJoinWorkerThread extend
1169          if ((s & (s-1)) != 0)
1170              throw new Error("data type scale not a power of two");
1171          qShift = 31 - Integer.numberOfLeadingZeros(s);
1172 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1173      }
1174  
1175      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines