ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.40 by dl, Wed Aug 11 18:45:12 2010 UTC vs.
Revision 1.54 by dl, Wed Nov 17 12:06:46 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 110 | Line 109 | public class ForkJoinWorkerThread extend
109       * idea).  (4) We bound the number of attempts to find work (see
110       * MAX_HELP_DEPTH) and fall back to suspending the worker and if
111       * necessary replacing it with a spare (see
112 <     * ForkJoinPool.tryAwaitJoin).
112 >     * ForkJoinPool.awaitJoin).
113       *
114       * Efficient implementation of these algorithms currently relies
115       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 164 | Line 163 | public class ForkJoinWorkerThread extend
163      private static final int MAX_HELP_DEPTH = 8;
164  
165      /**
167     * The wakeup interval (in nanoseconds) for the first worker
168     * suspended as spare.  On each wakeup not signalled by a
169     * resumption, it may ask the pool to reduce the number of spares.
170     */
171    private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L;
172
173    /**
166       * Capacity of work-stealing queue array upon initialization.
167       * Must be a power of two. Initial size must be at least 4, but is
168       * padded to minimize cache effects.
# Line 179 | Line 171 | public class ForkJoinWorkerThread extend
171  
172      /**
173       * Maximum work-stealing queue array size.  Must be less than or
174 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
175 <     * is less than usual bounds, because we need leftshift by 3
176 <     * to be in int range).
174 >     * equal to 1 << (31 - width of array entry) to ensure lack of
175 >     * index wraparound. The value is set in the static block
176 >     * at the end of this file after obtaining width.
177       */
178 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
178 >    private static final int MAXIMUM_QUEUE_CAPACITY;
179  
180      /**
181       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 224 | Line 216 | public class ForkJoinWorkerThread extend
216       * Run state of this worker. In addition to the usual run levels,
217       * tracks if this worker is suspended as a spare, and if it was
218       * killed (trimmed) while suspended. However, "active" status is
219 <     * maintained separately.
219 >     * maintained separately and modified only in conjunction with
220 >     * CASes of the pool's runState (which are currently sadly
221 >     * manually inlined for performance.)  Accessed directly by pool
222 >     * to simplify checks for normal (zero) status.
223       */
224 <    private volatile int runState;
224 >    volatile int runState;
225  
226      private static final int TERMINATING = 0x01;
227      private static final int TERMINATED  = 0x02;
# Line 234 | Line 229 | public class ForkJoinWorkerThread extend
229      private static final int TRIMMED     = 0x08; // killed while suspended
230  
231      /**
232 <     * Number of steals, transferred and reset in pool callbacks pool
233 <     * when idle Accessed directly by pool.
232 >     * Number of steals. Directly accessed (and reset) by
233 >     * pool.tryAccumulateStealCount when idle.
234       */
235      int stealCount;
236  
# Line 271 | Line 266 | public class ForkJoinWorkerThread extend
266      int lastEventCount;
267  
268      /**
269 <     * Encoded index and event count of next event waiter. Used only
270 <     * by ForkJoinPool for managing event waiters.
269 >     * Encoded index and event count of next event waiter. Accessed
270 >     * only by ForkJoinPool for managing event waiters.
271       */
272      volatile long nextWaiter;
273  
274      /**
275 <     * Number of times this thread suspended as spare
275 >     * Number of times this thread suspended as spare. Accessed only
276 >     * by pool.
277       */
278      int spareCount;
279  
280      /**
281 <     * Encoded index and count of next spare waiter. Used only
281 >     * Encoded index and count of next spare waiter. Accessed only
282       * by ForkJoinPool for managing spares.
283       */
284      volatile int nextSpare;
285  
286      /**
287       * The task currently being joined, set only when actively trying
288 <     * to helpStealer. Written only by current thread, but read by
289 <     * others.
288 >     * to help other stealers in helpJoinTask. Written only by this
289 >     * thread, but read by others.
290       */
291      private volatile ForkJoinTask<?> currentJoin;
292  
293      /**
294       * The task most recently stolen from another worker (or
295 <     * submission queue).  Not volatile because always read/written in
296 <     * presence of related volatiles in those cases where it matters.
295 >     * submission queue).  Written only by this thread, but read by
296 >     * others.
297       */
298 <    private ForkJoinTask<?> currentSteal;
298 >    private volatile ForkJoinTask<?> currentSteal;
299  
300      /**
301       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 316 | Line 312 | public class ForkJoinWorkerThread extend
312      }
313  
314      /**
315 <     * Performs additional initialization and starts this thread
315 >     * Performs additional initialization and starts this thread.
316       */
317      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
318          this.poolIndex = poolIndex;
# Line 352 | Line 348 | public class ForkJoinWorkerThread extend
348      /**
349       * Initializes internal state after construction but before
350       * processing any tasks. If you override this method, you must
351 <     * invoke super.onStart() at the beginning of the method.
351 >     * invoke @code{super.onStart()} at the beginning of the method.
352       * Initialization requires care: Most fields must have legal
353       * default values, to ensure that attempted accesses from other
354       * threads work correctly even before this thread starts
# Line 380 | Line 376 | public class ForkJoinWorkerThread extend
376       */
377      protected void onTermination(Throwable exception) {
378          try {
379 +            ForkJoinPool p = pool;
380 +            if (active) {
381 +                int a; // inline p.tryDecrementActiveCount
382 +                active = false;
383 +                do {} while (!UNSAFE.compareAndSwapInt
384 +                             (p, poolRunStateOffset, a = p.runState, a - 1));
385 +            }
386              cancelTasks();
384            while (active)              // force inactive
385                active = !pool.tryDecrementActiveCount();
387              setTerminated();
388 <            pool.workerTerminated(this);
388 >            p.workerTerminated(this);
389          } catch (Throwable ex) {        // Shouldn't ever happen
390              if (exception == null)      // but if so, at least rethrown
391                  exception = ex;
# Line 414 | Line 415 | public class ForkJoinWorkerThread extend
415      // helpers for run()
416  
417      /**
418 <     * Find and execute tasks and check status while running
418 >     * Finds and executes tasks, and checks status while running.
419       */
420      private void mainLoop() {
421 <        int misses = 0; // track consecutive times failed to find work; max 2
421 >        boolean ran = false; // true if ran a task on last step
422          ForkJoinPool p = pool;
423          for (;;) {
424 <            p.preStep(this, misses);
424 >            p.preStep(this, ran);
425              if (runState != 0)
426                  break;
427 <            misses = ((tryExecSteal() || tryExecSubmission()) ? 0 :
427 <                      (misses < 2 ? misses + 1 : 2));
427 >            ran = tryExecSteal() || tryExecSubmission();
428          }
429      }
430  
431      /**
432 <     * Try to steal a task and execute it
432 >     * Tries to steal a task and execute it.
433       *
434       * @return true if ran a task
435       */
436      private boolean tryExecSteal() {
437          ForkJoinTask<?> t;
438 <        if ((t  = scan()) != null) {
438 >        if ((t = scan()) != null) {
439              t.quietlyExec();
440 <            currentSteal = null;
440 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
441              if (sp != base)
442                  execLocalTasks();
443              return true;
# Line 446 | Line 446 | public class ForkJoinWorkerThread extend
446      }
447  
448      /**
449 <     * If a submission exists, try to activate and run it;
449 >     * If a submission exists, try to activate and run it.
450       *
451       * @return true if ran a task
452       */
453      private boolean tryExecSubmission() {
454          ForkJoinPool p = pool;
455 +        // This loop is needed in case attempt to activate fails, in
456 +        // which case we only retry if there still appears to be a
457 +        // submission.
458          while (p.hasQueuedSubmissions()) {
459 <            ForkJoinTask<?> t;
460 <            if (active || (active = p.tryIncrementActiveCount())) {
459 >            ForkJoinTask<?> t; int a;
460 >            if (active || // inline p.tryIncrementActiveCount
461 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
462 >                                                   a = p.runState, a + 1))) {
463                  if ((t = p.pollSubmission()) != null) {
464 <                    currentSteal = t;
464 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
465                      t.quietlyExec();
466 <                    currentSteal = null;
466 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
467                      if (sp != base)
468                          execLocalTasks();
469                      return true;
# Line 474 | Line 479 | public class ForkJoinWorkerThread extend
479       */
480      private void execLocalTasks() {
481          while (runState == 0) {
482 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
482 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
483              if (t != null)
484                  t.quietlyExec();
485              else if (sp == base)
# Line 484 | Line 489 | public class ForkJoinWorkerThread extend
489  
490      /*
491       * Intrinsics-based atomic writes for queue slots. These are
492 <     * basically the same as methods in AtomicObjectArray, but
492 >     * basically the same as methods in AtomicReferenceArray, but
493       * specialized for (1) ForkJoinTask elements (2) requirement that
494       * nullness and bounds checks have already been performed by
495       * callers and (3) effective offsets are known not to overflow
496       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
497       * need corresponding version for reads: plain array reads are OK
498 <     * because they protected by other volatile reads and are
498 >     * because they are protected by other volatile reads and are
499       * confirmed by CASes.
500       *
501       * Most uses don't actually call these methods, but instead contain
# Line 514 | Line 519 | public class ForkJoinWorkerThread extend
519       * range. This method is used only during resets and backouts.
520       */
521      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
522 <                                              ForkJoinTask<?> t) {
522 >                                        ForkJoinTask<?> t) {
523          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
524      }
525  
# Line 559 | Line 564 | public class ForkJoinWorkerThread extend
564  
565      /**
566       * Tries to take a task from the base of own queue. Assumes active
567 <     * status.  Called only by current thread.
567 >     * status.  Called only by this thread.
568       *
569       * @return a task, or null if none
570       */
# Line 582 | Line 587 | public class ForkJoinWorkerThread extend
587  
588      /**
589       * Returns a popped task, or null if empty. Assumes active status.
590 <     * Called only by current thread.
590 >     * Called only by this thread.
591       */
592      private ForkJoinTask<?> popTask() {
593          ForkJoinTask<?>[] q = queue;
# Line 606 | Line 611 | public class ForkJoinWorkerThread extend
611  
612      /**
613       * Specialized version of popTask to pop only if topmost element
614 <     * is the given task. Called only by current thread while
610 <     * active.
614 >     * is the given task. Called only by this thread while active.
615       *
616       * @param t the task. Caller must ensure non-null.
617       */
# Line 617 | Line 621 | public class ForkJoinWorkerThread extend
621          if ((s = sp) != base && q != null &&
622              UNSAFE.compareAndSwapObject
623              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
624 <            sp = s;
624 >            sp = s; // putOrderedInt may encourage more timely write
625              // UNSAFE.putOrderedInt(this, spOffset, s);
626              return true;
627          }
# Line 625 | Line 629 | public class ForkJoinWorkerThread extend
629      }
630  
631      /**
632 <     * Returns next task or null if empty or contended
632 >     * Returns next task, or null if empty or contended.
633       */
634      final ForkJoinTask<?> peekTask() {
635          ForkJoinTask<?>[] q = queue;
# Line 667 | Line 671 | public class ForkJoinWorkerThread extend
671       * Computes next value for random victim probe in scan().  Scans
672       * don't require a very high quality generator, but also not a
673       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
674 <     * Note: This is manually inlined in scan()
674 >     * Note: This is manually inlined in scan().
675       */
676      private static final int xorShift(int r) {
677          r ^= r << 13;
# Line 706 | Line 710 | public class ForkJoinWorkerThread extend
710              for (;;) {
711                  ForkJoinWorkerThread v = ws[k & mask];
712                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
713 <                if (v != null && v.base != v.sp) {
714 <                    ForkJoinTask<?>[] q; int b;
715 <                    if ((canSteal ||       // ensure active status
716 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
717 <                        (q = v.queue) != null && (b = v.base) != v.sp) {
718 <                        int i = (q.length - 1) & b;
719 <                        long u = (i << qShift) + qBase; // raw offset
720 <                        ForkJoinTask<?> t = q[i];
721 <                        if (v.base == b && t != null &&
713 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
714 >                if (v != null && (b = v.base) != v.sp &&
715 >                    (q = v.queue) != null) {
716 >                    int i = (q.length - 1) & b;
717 >                    long u = (i << qShift) + qBase; // raw offset
718 >                    int pid = poolIndex;
719 >                    if ((t = q[i]) != null) {
720 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
721 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
722 >                                                     a = p.runState, a + 1))
723 >                            canSteal = active = true;
724 >                        if (canSteal && v.base == b++ &&
725                              UNSAFE.compareAndSwapObject(q, u, t, null)) {
726 <                            int pid = poolIndex;
720 <                            currentSteal = t;
726 >                            v.base = b;
727                              v.stealHint = pid;
728 <                            v.base = b + 1;
728 >                            UNSAFE.putOrderedObject(this,
729 >                                                    currentStealOffset, t);
730                              seed = r;
731                              ++stealCount;
732                              return t;
# Line 742 | Line 749 | public class ForkJoinWorkerThread extend
749      // Run State management
750  
751      // status check methods used mainly by ForkJoinPool
752 <    final boolean isRunning()     { return runState == 0; }
753 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
754 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
755 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
756 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
752 >    final boolean isRunning()    { return runState == 0; }
753 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
754 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
755 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
756 >
757 >    final boolean isTerminating() {
758 >        if ((runState & TERMINATING) != 0)
759 >            return true;
760 >        if (pool.isAtLeastTerminating()) { // propagate pool state
761 >            shutdown();
762 >            return true;
763 >        }
764 >        return false;
765 >    }
766  
767      /**
768 <     * Sets state to TERMINATING, also, unless "quiet", unparking if
769 <     * not already terminated
754 <     *
755 <     * @param quiet don't unpark (used for faster status updates on
756 <     * pool termination)
768 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
769 >     * to wake up if currently blocked. Callers must do so if desired.
770       */
771 <    final void shutdown(boolean quiet) {
771 >    final void shutdown() {
772          for (;;) {
773              int s = runState;
774              if ((s & (TERMINATING|TERMINATED)) != 0)
# Line 770 | Line 783 | public class ForkJoinWorkerThread extend
783                                                s | TERMINATING))
784                  break;
785          }
773        if (!quiet && (runState & TERMINATED) != 0)
774            LockSupport.unpark(this);
786      }
787  
788      /**
789 <     * Sets state to TERMINATED. Called only by onTermination()
789 >     * Sets state to TERMINATED. Called only by onTermination().
790       */
791      private void setTerminated() {
792          int s;
# Line 785 | Line 796 | public class ForkJoinWorkerThread extend
796      }
797  
798      /**
799 <     * If suspended, tries to set status to unsuspended and unparks.
799 >     * If suspended, tries to set status to unsuspended.
800 >     * Does NOT wake up if blocked.
801       *
802       * @return true if successful
803       */
# Line 802 | Line 814 | public class ForkJoinWorkerThread extend
814      /**
815       * Sets suspended status and blocks as spare until resumed
816       * or shutdown.
805     * @returns true if still running on exit
817       */
818 <    final boolean suspendAsSpare() {
808 <        lastEventCount = 0;         // reset upon resume
818 >    final void suspendAsSpare() {
819          for (;;) {                  // set suspended unless terminating
820              int s = runState;
821              if ((s & TERMINATING) != 0) { // must kill
822                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
823                                               s | (TRIMMED | TERMINATING)))
824 <                    return false;
824 >                    return;
825              }
826              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
827                                                s | SUSPENDED))
# Line 820 | Line 830 | public class ForkJoinWorkerThread extend
830          ForkJoinPool p = pool;
831          p.pushSpare(this);
832          while ((runState & SUSPENDED) != 0) {
833 <            if (!p.tryAccumulateStealCount(this))
834 <                continue;
825 <            interrupted();          // clear/ignore interrupts
826 <            if ((runState & SUSPENDED) == 0)
827 <                break;
828 <            if (nextSpare != 0)     // untimed
829 <                LockSupport.park(this);
830 <            else {
831 <                long startTime = System.nanoTime();
832 <                LockSupport.parkNanos(this, TRIM_RATE_NANOS);
833 >            if (p.tryAccumulateStealCount(this)) {
834 >                interrupted();          // clear/ignore interrupts
835                  if ((runState & SUSPENDED) == 0)
836                      break;
837 <                long now = System.nanoTime();
836 <                if (now - startTime >= TRIM_RATE_NANOS)
837 <                    pool.tryTrimSpare(now);
837 >                LockSupport.park(this);
838              }
839          }
840        return runState == 0;
840      }
841  
842      // Misc support methods for ForkJoinPool
# Line 902 | Line 901 | public class ForkJoinWorkerThread extend
901       * @return a task, if available
902       */
903      final ForkJoinTask<?> pollLocalTask() {
904 +        ForkJoinPool p = pool;
905          while (sp != base) {
906 <            if (active || (active = pool.tryIncrementActiveCount()))
907 <                return locallyFifo? locallyDeqTask() : popTask();
906 >            int a; // inline p.tryIncrementActiveCount
907 >            if (active ||
908 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
909 >                                                   a = p.runState, a + 1)))
910 >                return locallyFifo ? locallyDeqTask() : popTask();
911          }
912          return null;
913      }
# Line 918 | Line 921 | public class ForkJoinWorkerThread extend
921          ForkJoinTask<?> t = pollLocalTask();
922          if (t == null) {
923              t = scan();
924 <            currentSteal = null; // cannot retain/track/help
924 >            // cannot retain/track/help steal
925 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
926          }
927          return t;
928      }
# Line 927 | Line 931 | public class ForkJoinWorkerThread extend
931       * Possibly runs some tasks and/or blocks, until task is done.
932       *
933       * @param joinMe the task to join
934 +     * @param timed true if use timed wait
935 +     * @param nanos wait time if timed
936       */
937 <    final void joinTask(ForkJoinTask<?> joinMe) {
937 >    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
938          // currentJoin only written by this thread; only need ordered store
939          ForkJoinTask<?> prevJoin = currentJoin;
940          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
941 <        if (sp != base)
942 <            localHelpJoinTask(joinMe);
943 <        if (joinMe.status >= 0)
944 <            pool.awaitJoin(joinMe, this);
941 >        if (isTerminating())                // cancel if shutting down
942 >            joinMe.cancelIgnoringExceptions();
943 >        else
944 >            pool.awaitJoin(joinMe, this, timed, nanos);
945          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
946      }
947  
948      /**
949       * Run tasks in local queue until given task is done.
950 +     * Not currently used because it complicates semantics.
951       *
952       * @param joinMe the task to join
953       */
# Line 979 | Line 986 | public class ForkJoinWorkerThread extend
986       * a descendant of the given task and with a non-empty queue to
987       * steal back and execute tasks from.
988       *
989 <     * The implementation is very branchy to cope with the potential
989 >     * The implementation is very branchy to cope with potential
990       * inconsistencies or loops encountering chains that are stale,
991       * unknown, or of length greater than MAX_HELP_DEPTH links.  All
992       * of these cases are dealt with by just returning back to the
# Line 989 | Line 996 | public class ForkJoinWorkerThread extend
996       * @param joinMe the task to join
997       */
998      final void helpJoinTask(ForkJoinTask<?> joinMe) {
999 <        ForkJoinWorkerThread[] ws = pool.workers;
1000 <        int n; // need at least 2 workers
1001 <        if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) {
1002 <            ForkJoinTask<?> task = joinMe;        // base of chain
1003 <            ForkJoinWorkerThread thread = this;   // thread with stolen task
1004 <            for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1005 <                // Try to find v, the stealer of task, by first using hint
1006 <                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1007 <                if (v == null || v.currentSteal != task) {
1008 <                    for (int j = 0; ; ++j) {      // search array
1009 <                        if (j < n) {
1010 <                            if ((v = ws[j]) != null) {
1011 <                                if (task.status < 0)
1012 <                                    return;       // stale or done
1013 <                                if (v.currentSteal == task) {
1014 <                                    thread.stealHint = j;
1015 <                                    break;        // save hint for next time
1016 <                                }
999 >        ForkJoinWorkerThread[] ws;
1000 >        int n;
1001 >        if (joinMe.status < 0)                // already done
1002 >            return;
1003 >        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1004 >            return;                           // need at least 2 workers
1005 >
1006 >        ForkJoinTask<?> task = joinMe;        // base of chain
1007 >        ForkJoinWorkerThread thread = this;   // thread with stolen task
1008 >        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1009 >            // Try to find v, the stealer of task, by first using hint
1010 >            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1011 >            if (v == null || v.currentSteal != task) {
1012 >                for (int j = 0; ; ++j) {      // search array
1013 >                    if (j < n) {
1014 >                        ForkJoinTask<?> vs;
1015 >                        if ((v = ws[j]) != null &&
1016 >                            (vs = v.currentSteal) != null) {
1017 >                            if (joinMe.status < 0 || task.status < 0)
1018 >                                return;       // stale or done
1019 >                            if (vs == task) {
1020 >                                thread.stealHint = j;
1021 >                                break;        // save hint for next time
1022                              }
1023                          }
1012                        else
1013                            return;               // no stealer
1024                      }
1025 +                    else
1026 +                        return;               // no stealer
1027                  }
1028 <                // Try to help v, using specialized form of deqTask
1029 <                int b;
1030 <                ForkJoinTask<?>[] q;
1031 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1032 <                    int i = (q.length - 1) & b;
1033 <                    long u = (i << qShift) + qBase;
1034 <                    ForkJoinTask<?> t = q[i];
1035 <                    if (task.status < 0)
1036 <                        return;                   // stale or done
1037 <                    if (v.base == b) {
1038 <                        if (t == null)
1039 <                            return;               // producer stalled
1040 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1041 <                            if (joinMe.status < 0) {
1042 <                                UNSAFE.putObjectVolatile(q, u, t);
1043 <                                return;           // back out on cancel
1044 <                            }
1045 <                            int pid = poolIndex;
1046 <                            ForkJoinTask<?> prevSteal = currentSteal;
1047 <                            currentSteal = t;
1036 <                            v.stealHint = pid;
1037 <                            v.base = b + 1;
1038 <                            t.quietlyExec();
1039 <                            currentSteal = prevSteal;
1040 <                        }
1028 >            }
1029 >            for (;;) { // Try to help v, using specialized form of deqTask
1030 >                if (joinMe.status < 0)
1031 >                    return;
1032 >                int b = v.base;
1033 >                ForkJoinTask<?>[] q = v.queue;
1034 >                if (b == v.sp || q == null)
1035 >                    break;
1036 >                int i = (q.length - 1) & b;
1037 >                long u = (i << qShift) + qBase;
1038 >                ForkJoinTask<?> t = q[i];
1039 >                int pid = poolIndex;
1040 >                ForkJoinTask<?> ps = currentSteal;
1041 >                if (task.status < 0)
1042 >                    return;                   // stale or done
1043 >                if (t != null && v.base == b++ &&
1044 >                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1045 >                    if (joinMe.status < 0) {
1046 >                        UNSAFE.putObjectVolatile(q, u, t);
1047 >                        return;               // back out on cancel
1048                      }
1049 <                    if (joinMe.status < 0)
1050 <                        return;
1049 >                    v.base = b;
1050 >                    v.stealHint = pid;
1051 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1052 >                    t.quietlyExec();
1053 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1054                  }
1045                // Try to descend to find v's stealer
1046                ForkJoinTask<?> next = v.currentJoin;
1047                if (task.status < 0 || next == null || next == task ||
1048                    joinMe.status < 0)
1049                    return;
1050                task = next;
1051                thread = v;
1055              }
1056 +            // Try to descend to find v's stealer
1057 +            ForkJoinTask<?> next = v.currentJoin;
1058 +            if (task.status < 0 || next == null || next == task ||
1059 +                joinMe.status < 0)
1060 +                return;
1061 +            task = next;
1062 +            thread = v;
1063          }
1064      }
1065  
1066      /**
1067 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1068       * Returns an estimate of the number of tasks, offset by a
1069       * function of number of idle workers.
1070       *
# Line 1108 | Line 1119 | public class ForkJoinWorkerThread extend
1119       * Runs tasks until {@code pool.isQuiescent()}.
1120       */
1121      final void helpQuiescePool() {
1122 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1123          for (;;) {
1124              ForkJoinTask<?> t = pollLocalTask();
1125 <            if (t != null || (t = scan()) != null) {
1125 >            if (t != null || (t = scan()) != null)
1126                  t.quietlyExec();
1115                currentSteal = null;
1116            }
1127              else {
1128                  ForkJoinPool p = pool;
1129 +                int a; // to inline CASes
1130                  if (active) {
1131 <                    if (!p.tryDecrementActiveCount())
1131 >                    if (!UNSAFE.compareAndSwapInt
1132 >                        (p, poolRunStateOffset, a = p.runState, a - 1))
1133                          continue;   // retry later
1134                      active = false; // inactivate
1135 +                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1136                  }
1137                  if (p.isQuiescent()) {
1138                      active = true; // re-activate
1139 <                    do {} while (!p.tryIncrementActiveCount());
1139 >                    do {} while (!UNSAFE.compareAndSwapInt
1140 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1141                      return;
1142                  }
1143              }
# Line 1143 | Line 1157 | public class ForkJoinWorkerThread extend
1157          objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1158      private static final long qBase =
1159          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1160 +    private static final long poolRunStateOffset = // to inline CAS
1161 +        objectFieldOffset("runState", ForkJoinPool.class);
1162  
1163      private static final int qShift;
1164  
# Line 1151 | Line 1167 | public class ForkJoinWorkerThread extend
1167          if ((s & (s-1)) != 0)
1168              throw new Error("data type scale not a power of two");
1169          qShift = 31 - Integer.numberOfLeadingZeros(s);
1170 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1171      }
1172  
1173      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines