ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.40 by dl, Wed Aug 11 18:45:12 2010 UTC vs.
Revision 1.60 by dl, Tue Nov 23 00:10:39 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15 < * A thread managed by a {@link ForkJoinPool}.  This class is
16 < * subclassable solely for the sake of adding functionality -- there
17 < * are no overridable methods dealing with scheduling or execution.
18 < * However, you can override initialization and termination methods
19 < * surrounding the main task processing loop.  If you do create such a
20 < * subclass, you will also need to supply a custom {@link
21 < * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
22 < * ForkJoinPool}.
15 > * A thread managed by a {@link ForkJoinPool}, which executes
16 > * {@link ForkJoinTask}s.
17 > * This class is subclassable solely for the sake of adding
18 > * functionality -- there are no overridable methods dealing with
19 > * scheduling or execution.  However, you can override initialization
20 > * and termination methods surrounding the main task processing loop.
21 > * If you do create such a subclass, you will also need to supply a
22 > * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
23 > * in a {@code ForkJoinPool}.
24   *
25   * @since 1.7
26   * @author Doug Lea
# Line 110 | Line 110 | public class ForkJoinWorkerThread extend
110       * idea).  (4) We bound the number of attempts to find work (see
111       * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112       * necessary replacing it with a spare (see
113 <     * ForkJoinPool.tryAwaitJoin).
113 >     * ForkJoinPool.awaitJoin).
114       *
115       * Efficient implementation of these algorithms currently relies
116       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 164 | Line 164 | public class ForkJoinWorkerThread extend
164      private static final int MAX_HELP_DEPTH = 8;
165  
166      /**
167     * The wakeup interval (in nanoseconds) for the first worker
168     * suspended as spare.  On each wakeup not signalled by a
169     * resumption, it may ask the pool to reduce the number of spares.
170     */
171    private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L;
172
173    /**
167       * Capacity of work-stealing queue array upon initialization.
168       * Must be a power of two. Initial size must be at least 4, but is
169       * padded to minimize cache effects.
# Line 179 | Line 172 | public class ForkJoinWorkerThread extend
172  
173      /**
174       * Maximum work-stealing queue array size.  Must be less than or
175 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
176 <     * is less than usual bounds, because we need leftshift by 3
177 <     * to be in int range).
175 >     * equal to 1 << (31 - width of array entry) to ensure lack of
176 >     * index wraparound. The value is set in the static block
177 >     * at the end of this file after obtaining width.
178       */
179 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
179 >    private static final int MAXIMUM_QUEUE_CAPACITY;
180  
181      /**
182       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 224 | Line 217 | public class ForkJoinWorkerThread extend
217       * Run state of this worker. In addition to the usual run levels,
218       * tracks if this worker is suspended as a spare, and if it was
219       * killed (trimmed) while suspended. However, "active" status is
220 <     * maintained separately.
220 >     * maintained separately and modified only in conjunction with
221 >     * CASes of the pool's runState (which are currently sadly
222 >     * manually inlined for performance.)  Accessed directly by pool
223 >     * to simplify checks for normal (zero) status.
224       */
225 <    private volatile int runState;
225 >    volatile int runState;
226  
227      private static final int TERMINATING = 0x01;
228      private static final int TERMINATED  = 0x02;
# Line 234 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
233 <     * Number of steals, transferred and reset in pool callbacks pool
234 <     * when idle Accessed directly by pool.
233 >     * Number of steals. Directly accessed (and reset) by
234 >     * pool.tryAccumulateStealCount when idle.
235       */
236      int stealCount;
237  
# Line 271 | Line 267 | public class ForkJoinWorkerThread extend
267      int lastEventCount;
268  
269      /**
270 <     * Encoded index and event count of next event waiter. Used only
271 <     * by ForkJoinPool for managing event waiters.
270 >     * Encoded index and event count of next event waiter. Accessed
271 >     * only by ForkJoinPool for managing event waiters.
272       */
273      volatile long nextWaiter;
274  
275      /**
276 <     * Number of times this thread suspended as spare
276 >     * Number of times this thread suspended as spare. Accessed only
277 >     * by pool.
278       */
279      int spareCount;
280  
281      /**
282 <     * Encoded index and count of next spare waiter. Used only
282 >     * Encoded index and count of next spare waiter. Accessed only
283       * by ForkJoinPool for managing spares.
284       */
285      volatile int nextSpare;
286  
287      /**
288       * The task currently being joined, set only when actively trying
289 <     * to helpStealer. Written only by current thread, but read by
290 <     * others.
289 >     * to help other stealers in helpJoinTask. Written only by this
290 >     * thread, but read by others.
291       */
292      private volatile ForkJoinTask<?> currentJoin;
293  
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Not volatile because always read/written in
297 <     * presence of related volatiles in those cases where it matters.
296 >     * submission queue).  Written only by this thread, but read by
297 >     * others.
298       */
299 <    private ForkJoinTask<?> currentSteal;
299 >    private volatile ForkJoinTask<?> currentSteal;
300  
301      /**
302       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 316 | Line 313 | public class ForkJoinWorkerThread extend
313      }
314  
315      /**
316 <     * Performs additional initialization and starts this thread
316 >     * Performs additional initialization and starts this thread.
317       */
318      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
319          this.poolIndex = poolIndex;
# Line 352 | Line 349 | public class ForkJoinWorkerThread extend
349      /**
350       * Initializes internal state after construction but before
351       * processing any tasks. If you override this method, you must
352 <     * invoke super.onStart() at the beginning of the method.
352 >     * invoke {@code super.onStart()} at the beginning of the method.
353       * Initialization requires care: Most fields must have legal
354       * default values, to ensure that attempted accesses from other
355       * threads work correctly even before this thread starts
# Line 380 | Line 377 | public class ForkJoinWorkerThread extend
377       */
378      protected void onTermination(Throwable exception) {
379          try {
380 +            ForkJoinPool p = pool;
381 +            if (active) {
382 +                int a; // inline p.tryDecrementActiveCount
383 +                active = false;
384 +                do {} while (!UNSAFE.compareAndSwapInt
385 +                             (p, poolRunStateOffset, a = p.runState, a - 1));
386 +            }
387              cancelTasks();
384            while (active)              // force inactive
385                active = !pool.tryDecrementActiveCount();
388              setTerminated();
389 <            pool.workerTerminated(this);
389 >            p.workerTerminated(this);
390          } catch (Throwable ex) {        // Shouldn't ever happen
391              if (exception == null)      // but if so, at least rethrown
392                  exception = ex;
# Line 397 | Line 399 | public class ForkJoinWorkerThread extend
399      /**
400       * This method is required to be public, but should never be
401       * called explicitly. It performs the main run loop to execute
402 <     * ForkJoinTasks.
402 >     * {@link ForkJoinTask}s.
403       */
404      public void run() {
405          Throwable exception = null;
# Line 414 | Line 416 | public class ForkJoinWorkerThread extend
416      // helpers for run()
417  
418      /**
419 <     * Find and execute tasks and check status while running
419 >     * Finds and executes tasks, and checks status while running.
420       */
421      private void mainLoop() {
422 <        int misses = 0; // track consecutive times failed to find work; max 2
422 >        boolean ran = false; // true if ran a task on last step
423          ForkJoinPool p = pool;
424          for (;;) {
425 <            p.preStep(this, misses);
425 >            p.preStep(this, ran);
426              if (runState != 0)
427                  break;
428 <            misses = ((tryExecSteal() || tryExecSubmission()) ? 0 :
427 <                      (misses < 2 ? misses + 1 : 2));
428 >            ran = tryExecSteal() || tryExecSubmission();
429          }
430      }
431  
432      /**
433 <     * Try to steal a task and execute it
433 >     * Tries to steal a task and execute it.
434       *
435       * @return true if ran a task
436       */
437      private boolean tryExecSteal() {
438          ForkJoinTask<?> t;
439 <        if ((t  = scan()) != null) {
439 >        if ((t = scan()) != null) {
440              t.quietlyExec();
441 <            currentSteal = null;
441 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
442              if (sp != base)
443                  execLocalTasks();
444              return true;
# Line 446 | Line 447 | public class ForkJoinWorkerThread extend
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and run it;
450 >     * If a submission exists, try to activate and run it.
451       *
452       * @return true if ran a task
453       */
454      private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456 +        // This loop is needed in case attempt to activate fails, in
457 +        // which case we only retry if there still appears to be a
458 +        // submission.
459          while (p.hasQueuedSubmissions()) {
460 <            ForkJoinTask<?> t;
461 <            if (active || (active = p.tryIncrementActiveCount())) {
460 >            ForkJoinTask<?> t; int a;
461 >            if (active || // inline p.tryIncrementActiveCount
462 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
463 >                                                   a = p.runState, a + 1))) {
464                  if ((t = p.pollSubmission()) != null) {
465 <                    currentSteal = t;
465 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
466                      t.quietlyExec();
467 <                    currentSteal = null;
467 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
468                      if (sp != base)
469                          execLocalTasks();
470                      return true;
# Line 474 | Line 480 | public class ForkJoinWorkerThread extend
480       */
481      private void execLocalTasks() {
482          while (runState == 0) {
483 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
483 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
484              if (t != null)
485                  t.quietlyExec();
486              else if (sp == base)
# Line 484 | Line 490 | public class ForkJoinWorkerThread extend
490  
491      /*
492       * Intrinsics-based atomic writes for queue slots. These are
493 <     * basically the same as methods in AtomicObjectArray, but
493 >     * basically the same as methods in AtomicReferenceArray, but
494       * specialized for (1) ForkJoinTask elements (2) requirement that
495       * nullness and bounds checks have already been performed by
496       * callers and (3) effective offsets are known not to overflow
497       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
498       * need corresponding version for reads: plain array reads are OK
499 <     * because they protected by other volatile reads and are
499 >     * because they are protected by other volatile reads and are
500       * confirmed by CASes.
501       *
502       * Most uses don't actually call these methods, but instead contain
# Line 514 | Line 520 | public class ForkJoinWorkerThread extend
520       * range. This method is used only during resets and backouts.
521       */
522      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
523 <                                              ForkJoinTask<?> t) {
523 >                                        ForkJoinTask<?> t) {
524          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
525      }
526  
# Line 559 | Line 565 | public class ForkJoinWorkerThread extend
565  
566      /**
567       * Tries to take a task from the base of own queue. Assumes active
568 <     * status.  Called only by current thread.
568 >     * status.  Called only by this thread.
569       *
570       * @return a task, or null if none
571       */
# Line 582 | Line 588 | public class ForkJoinWorkerThread extend
588  
589      /**
590       * Returns a popped task, or null if empty. Assumes active status.
591 <     * Called only by current thread.
591 >     * Called only by this thread.
592       */
593      private ForkJoinTask<?> popTask() {
594          ForkJoinTask<?>[] q = queue;
# Line 595 | Line 601 | public class ForkJoinWorkerThread extend
601                  if (t == null)   // lost to stealer
602                      break;
603                  if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
604 +                    /*
605 +                     * Note: here and in related methods, as a
606 +                     * performance (not correctness) issue, we'd like
607 +                     * to encourage compiler not to arbitrarily
608 +                     * postpone setting sp after successful CAS.
609 +                     * Currently there is no intrinsic for arranging
610 +                     * this, but using Unsafe putOrderedInt may be a
611 +                     * preferable strategy on some compilers even
612 +                     * though its main effect is a pre-, not post-
613 +                     * fence. To simplify possible changes, the option
614 +                     * is left in comments next to the associated
615 +                     * assignments.
616 +                     */
617                      sp = s; // putOrderedInt may encourage more timely write
618                      // UNSAFE.putOrderedInt(this, spOffset, s);
619                      return t;
# Line 606 | Line 625 | public class ForkJoinWorkerThread extend
625  
626      /**
627       * Specialized version of popTask to pop only if topmost element
628 <     * is the given task. Called only by current thread while
610 <     * active.
628 >     * is the given task. Called only by this thread while active.
629       *
630       * @param t the task. Caller must ensure non-null.
631       */
# Line 617 | Line 635 | public class ForkJoinWorkerThread extend
635          if ((s = sp) != base && q != null &&
636              UNSAFE.compareAndSwapObject
637              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
638 <            sp = s;
638 >            sp = s; // putOrderedInt may encourage more timely write
639              // UNSAFE.putOrderedInt(this, spOffset, s);
640              return true;
641          }
# Line 625 | Line 643 | public class ForkJoinWorkerThread extend
643      }
644  
645      /**
646 <     * Returns next task or null if empty or contended
646 >     * Returns next task, or null if empty or contended.
647       */
648      final ForkJoinTask<?> peekTask() {
649          ForkJoinTask<?>[] q = queue;
# Line 667 | Line 685 | public class ForkJoinWorkerThread extend
685       * Computes next value for random victim probe in scan().  Scans
686       * don't require a very high quality generator, but also not a
687       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
688 <     * Note: This is manually inlined in scan()
688 >     * Note: This is manually inlined in scan().
689       */
690      private static final int xorShift(int r) {
691          r ^= r << 13;
# Line 706 | Line 724 | public class ForkJoinWorkerThread extend
724              for (;;) {
725                  ForkJoinWorkerThread v = ws[k & mask];
726                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
727 <                if (v != null && v.base != v.sp) {
728 <                    ForkJoinTask<?>[] q; int b;
729 <                    if ((canSteal ||       // ensure active status
730 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
731 <                        (q = v.queue) != null && (b = v.base) != v.sp) {
732 <                        int i = (q.length - 1) & b;
733 <                        long u = (i << qShift) + qBase; // raw offset
734 <                        ForkJoinTask<?> t = q[i];
735 <                        if (v.base == b && t != null &&
727 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
728 >                if (v != null && (b = v.base) != v.sp &&
729 >                    (q = v.queue) != null) {
730 >                    int i = (q.length - 1) & b;
731 >                    long u = (i << qShift) + qBase; // raw offset
732 >                    int pid = poolIndex;
733 >                    if ((t = q[i]) != null) {
734 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
735 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
736 >                                                     a = p.runState, a + 1))
737 >                            canSteal = active = true;
738 >                        if (canSteal && v.base == b++ &&
739                              UNSAFE.compareAndSwapObject(q, u, t, null)) {
740 <                            int pid = poolIndex;
720 <                            currentSteal = t;
740 >                            v.base = b;
741                              v.stealHint = pid;
742 <                            v.base = b + 1;
742 >                            UNSAFE.putOrderedObject(this,
743 >                                                    currentStealOffset, t);
744                              seed = r;
745                              ++stealCount;
746                              return t;
# Line 742 | Line 763 | public class ForkJoinWorkerThread extend
763      // Run State management
764  
765      // status check methods used mainly by ForkJoinPool
766 <    final boolean isRunning()     { return runState == 0; }
767 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
768 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
769 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
770 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
766 >    final boolean isRunning()    { return runState == 0; }
767 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
768 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
769 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
770 >
771 >    final boolean isTerminating() {
772 >        if ((runState & TERMINATING) != 0)
773 >            return true;
774 >        if (pool.isAtLeastTerminating()) { // propagate pool state
775 >            shutdown();
776 >            return true;
777 >        }
778 >        return false;
779 >    }
780  
781      /**
782 <     * Sets state to TERMINATING, also, unless "quiet", unparking if
783 <     * not already terminated
754 <     *
755 <     * @param quiet don't unpark (used for faster status updates on
756 <     * pool termination)
782 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
783 >     * to wake up if currently blocked. Callers must do so if desired.
784       */
785 <    final void shutdown(boolean quiet) {
785 >    final void shutdown() {
786          for (;;) {
787              int s = runState;
788              if ((s & (TERMINATING|TERMINATED)) != 0)
# Line 770 | Line 797 | public class ForkJoinWorkerThread extend
797                                                s | TERMINATING))
798                  break;
799          }
773        if (!quiet && (runState & TERMINATED) != 0)
774            LockSupport.unpark(this);
800      }
801  
802      /**
803 <     * Sets state to TERMINATED. Called only by onTermination()
803 >     * Sets state to TERMINATED. Called only by onTermination().
804       */
805      private void setTerminated() {
806          int s;
# Line 785 | Line 810 | public class ForkJoinWorkerThread extend
810      }
811  
812      /**
813 <     * If suspended, tries to set status to unsuspended and unparks.
813 >     * If suspended, tries to set status to unsuspended.
814 >     * Does NOT wake up if blocked.
815       *
816       * @return true if successful
817       */
# Line 802 | Line 828 | public class ForkJoinWorkerThread extend
828      /**
829       * Sets suspended status and blocks as spare until resumed
830       * or shutdown.
805     * @returns true if still running on exit
831       */
832 <    final boolean suspendAsSpare() {
808 <        lastEventCount = 0;         // reset upon resume
832 >    final void suspendAsSpare() {
833          for (;;) {                  // set suspended unless terminating
834              int s = runState;
835              if ((s & TERMINATING) != 0) { // must kill
836                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
837                                               s | (TRIMMED | TERMINATING)))
838 <                    return false;
838 >                    return;
839              }
840              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
841                                                s | SUSPENDED))
# Line 820 | Line 844 | public class ForkJoinWorkerThread extend
844          ForkJoinPool p = pool;
845          p.pushSpare(this);
846          while ((runState & SUSPENDED) != 0) {
847 <            if (!p.tryAccumulateStealCount(this))
848 <                continue;
825 <            interrupted();          // clear/ignore interrupts
826 <            if ((runState & SUSPENDED) == 0)
827 <                break;
828 <            if (nextSpare != 0)     // untimed
829 <                LockSupport.park(this);
830 <            else {
831 <                long startTime = System.nanoTime();
832 <                LockSupport.parkNanos(this, TRIM_RATE_NANOS);
847 >            if (p.tryAccumulateStealCount(this)) {
848 >                interrupted();          // clear/ignore interrupts
849                  if ((runState & SUSPENDED) == 0)
850                      break;
851 <                long now = System.nanoTime();
836 <                if (now - startTime >= TRIM_RATE_NANOS)
837 <                    pool.tryTrimSpare(now);
851 >                LockSupport.park(this);
852              }
853          }
840        return runState == 0;
854      }
855  
856      // Misc support methods for ForkJoinPool
# Line 857 | Line 870 | public class ForkJoinWorkerThread extend
870       */
871      final void cancelTasks() {
872          ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
873 <        if (cj != null) {
861 <            currentJoin = null;
873 >        if (cj != null && cj.status >= 0) {
874              cj.cancelIgnoringExceptions();
875              try {
876                  this.interrupt(); // awaken wait
# Line 866 | Line 878 | public class ForkJoinWorkerThread extend
878              }
879          }
880          ForkJoinTask<?> cs = currentSteal;
881 <        if (cs != null) {
870 <            currentSteal = null;
881 >        if (cs != null && cs.status >= 0)
882              cs.cancelIgnoringExceptions();
872        }
883          while (base != sp) {
884              ForkJoinTask<?> t = deqTask();
885              if (t != null)
# Line 902 | Line 912 | public class ForkJoinWorkerThread extend
912       * @return a task, if available
913       */
914      final ForkJoinTask<?> pollLocalTask() {
915 +        ForkJoinPool p = pool;
916          while (sp != base) {
917 <            if (active || (active = pool.tryIncrementActiveCount()))
918 <                return locallyFifo? locallyDeqTask() : popTask();
917 >            int a; // inline p.tryIncrementActiveCount
918 >            if (active ||
919 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
920 >                                                   a = p.runState, a + 1)))
921 >                return locallyFifo ? locallyDeqTask() : popTask();
922          }
923          return null;
924      }
# Line 918 | Line 932 | public class ForkJoinWorkerThread extend
932          ForkJoinTask<?> t = pollLocalTask();
933          if (t == null) {
934              t = scan();
935 <            currentSteal = null; // cannot retain/track/help
935 >            // cannot retain/track/help steal
936 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
937          }
938          return t;
939      }
# Line 927 | Line 942 | public class ForkJoinWorkerThread extend
942       * Possibly runs some tasks and/or blocks, until task is done.
943       *
944       * @param joinMe the task to join
945 +     * @param timed true if use timed wait
946 +     * @param nanos wait time if timed
947       */
948 <    final void joinTask(ForkJoinTask<?> joinMe) {
948 >    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
949          // currentJoin only written by this thread; only need ordered store
950          ForkJoinTask<?> prevJoin = currentJoin;
951          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
952 <        if (sp != base)
936 <            localHelpJoinTask(joinMe);
937 <        if (joinMe.status >= 0)
938 <            pool.awaitJoin(joinMe, this);
952 >        pool.awaitJoin(joinMe, this, timed, nanos);
953          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
954      }
955  
956      /**
943     * Run tasks in local queue until given task is done.
944     *
945     * @param joinMe the task to join
946     */
947    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
948        int s;
949        ForkJoinTask<?>[] q;
950        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
951            int i = (q.length - 1) & --s;
952            long u = (i << qShift) + qBase; // raw offset
953            ForkJoinTask<?> t = q[i];
954            if (t == null)  // lost to a stealer
955                break;
956            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
957                /*
958                 * This recheck (and similarly in helpJoinTask)
959                 * handles cases where joinMe is independently
960                 * cancelled or forced even though there is other work
961                 * available. Back out of the pop by putting t back
962                 * into slot before we commit by writing sp.
963                 */
964                if (joinMe.status < 0) {
965                    UNSAFE.putObjectVolatile(q, u, t);
966                    break;
967                }
968                sp = s;
969                // UNSAFE.putOrderedInt(this, spOffset, s);
970                t.quietlyExec();
971            }
972        }
973    }
974
975    /**
957       * Tries to locate and help perform tasks for a stealer of the
958       * given task, or in turn one of its stealers.  Traces
959       * currentSteal->currentJoin links looking for a thread working on
960       * a descendant of the given task and with a non-empty queue to
961       * steal back and execute tasks from.
962       *
963 <     * The implementation is very branchy to cope with the potential
963 >     * The implementation is very branchy to cope with potential
964       * inconsistencies or loops encountering chains that are stale,
965       * unknown, or of length greater than MAX_HELP_DEPTH links.  All
966       * of these cases are dealt with by just returning back to the
# Line 987 | Line 968 | public class ForkJoinWorkerThread extend
968       * don't work out.
969       *
970       * @param joinMe the task to join
971 <     */
972 <    final void helpJoinTask(ForkJoinTask<?> joinMe) {
973 <        ForkJoinWorkerThread[] ws = pool.workers;
974 <        int n; // need at least 2 workers
975 <        if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) {
971 >     * @param running if false, then must update pool count upon
972 >     *  running a task
973 >     * @return value of running on exit
974 >     */
975 >    final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
976 >        /*
977 >         * Initial checks to (1) abort if terminating; (2) clean out
978 >         * old cancelled tasks from local queue; (3) if joinMe is next
979 >         * task, run it; (4) omit scan if local queue nonempty (since
980 >         * it may contain non-descendents of joinMe).
981 >         */
982 >        ForkJoinPool p = pool;
983 >        for (;;) {
984 >            ForkJoinTask<?>[] q;
985 >            int s;
986 >            if (joinMe.status < 0)
987 >                return running;
988 >            else if ((runState & TERMINATING) != 0) {
989 >                joinMe.cancelIgnoringExceptions();
990 >                return running;
991 >            }
992 >            else if ((s = sp) == base || (q = queue) == null)
993 >                break;                            // queue empty
994 >            else {
995 >                int i = (q.length - 1) & --s;
996 >                long u = (i << qShift) + qBase;   // raw offset
997 >                ForkJoinTask<?> t = q[i];
998 >                if (t == null)
999 >                    break;                        // lost to a stealer
1000 >                else if (t != joinMe && t.status >= 0)
1001 >                    return running;               // cannot safely help
1002 >                else if ((running ||
1003 >                          (running = p.tryIncrementRunningCount())) &&
1004 >                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
1005 >                    sp = s; // putOrderedInt may encourage more timely write
1006 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
1007 >                    t.quietlyExec();
1008 >                }
1009 >            }
1010 >        }
1011 >
1012 >        int n;                                    // worker array size
1013 >        ForkJoinWorkerThread[] ws = p.workers;
1014 >        if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers
1015              ForkJoinTask<?> task = joinMe;        // base of chain
1016              ForkJoinWorkerThread thread = this;   // thread with stolen task
1017 <            for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1017 >
1018 >            outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1019                  // Try to find v, the stealer of task, by first using hint
1020                  ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1021                  if (v == null || v.currentSteal != task) {
1022                      for (int j = 0; ; ++j) {      // search array
1023                          if (j < n) {
1024 <                            if ((v = ws[j]) != null) {
1025 <                                if (task.status < 0)
1026 <                                    return;       // stale or done
1027 <                                if (v.currentSteal == task) {
1024 >                            ForkJoinTask<?> vs;
1025 >                            if ((v = ws[j]) != null &&
1026 >                                (vs = v.currentSteal) != null) {
1027 >                                if (joinMe.status < 0)
1028 >                                    break outer;
1029 >                                if (vs == task) {
1030 >                                    if (task.status < 0)
1031 >                                        break outer; // stale
1032                                      thread.stealHint = j;
1033                                      break;        // save hint for next time
1034                                  }
1035                              }
1036                          }
1037                          else
1038 <                            return;               // no stealer
1038 >                            break outer;          // no stealer
1039                      }
1040                  }
1041 +
1042                  // Try to help v, using specialized form of deqTask
1043 <                int b;
1044 <                ForkJoinTask<?>[] q;
1045 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1043 >                for (;;) {
1044 >                    if (joinMe.status < 0)
1045 >                        break outer;
1046 >                    int b = v.base;
1047 >                    ForkJoinTask<?>[] q = v.queue;
1048 >                    if (b == v.sp || q == null)
1049 >                        break;                    // empty
1050                      int i = (q.length - 1) & b;
1051                      long u = (i << qShift) + qBase;
1052                      ForkJoinTask<?> t = q[i];
1053                      if (task.status < 0)
1054 <                        return;                   // stale or done
1055 <                    if (v.base == b) {
1056 <                        if (t == null)
1057 <                            return;               // producer stalled
1058 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1059 <                            if (joinMe.status < 0) {
1060 <                                UNSAFE.putObjectVolatile(q, u, t);
1061 <                                return;           // back out on cancel
1062 <                            }
1054 >                        break outer;              // stale
1055 >                    if (t != null &&
1056 >                        (running ||
1057 >                         (running = p.tryIncrementRunningCount())) &&
1058 >                        v.base == b++ &&
1059 >                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
1060 >                        if (t != joinMe && joinMe.status < 0) {
1061 >                            UNSAFE.putObjectVolatile(q, u, t);
1062 >                            break outer;          // joinMe cancelled; back out
1063 >                        }
1064 >                        v.base = b;
1065 >                        if (t.status >= 0) {
1066 >                            ForkJoinTask<?> ps = currentSteal;
1067                              int pid = poolIndex;
1034                            ForkJoinTask<?> prevSteal = currentSteal;
1035                            currentSteal = t;
1068                              v.stealHint = pid;
1069 <                            v.base = b + 1;
1069 >                            UNSAFE.putOrderedObject(this,
1070 >                                                    currentStealOffset, t);
1071                              t.quietlyExec();
1072 <                            currentSteal = prevSteal;
1072 >                            UNSAFE.putOrderedObject(this,
1073 >                                                    currentStealOffset, ps);
1074                          }
1075                      }
1076 <                    if (joinMe.status < 0)
1077 <                        return;
1076 >                    else if ((runState & TERMINATING) != 0) {
1077 >                        joinMe.cancelIgnoringExceptions();
1078 >                        break outer;
1079 >                    }
1080                  }
1081 +                
1082                  // Try to descend to find v's stealer
1083                  ForkJoinTask<?> next = v.currentJoin;
1084                  if (task.status < 0 || next == null || next == task ||
1085                      joinMe.status < 0)
1086 <                    return;
1086 >                    break;                 // done, stale, dead-end, or cyclic
1087                  task = next;
1088                  thread = v;
1089              }
1090          }
1091 +        return running;
1092      }
1093  
1094      /**
1095 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1096       * Returns an estimate of the number of tasks, offset by a
1097       * function of number of idle workers.
1098       *
# Line 1108 | Line 1147 | public class ForkJoinWorkerThread extend
1147       * Runs tasks until {@code pool.isQuiescent()}.
1148       */
1149      final void helpQuiescePool() {
1150 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1151          for (;;) {
1152              ForkJoinTask<?> t = pollLocalTask();
1153 <            if (t != null || (t = scan()) != null) {
1153 >            if (t != null || (t = scan()) != null)
1154                  t.quietlyExec();
1115                currentSteal = null;
1116            }
1155              else {
1156                  ForkJoinPool p = pool;
1157 +                int a; // to inline CASes
1158                  if (active) {
1159 <                    if (!p.tryDecrementActiveCount())
1159 >                    if (!UNSAFE.compareAndSwapInt
1160 >                        (p, poolRunStateOffset, a = p.runState, a - 1))
1161                          continue;   // retry later
1162                      active = false; // inactivate
1163 +                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1164                  }
1165                  if (p.isQuiescent()) {
1166                      active = true; // re-activate
1167 <                    do {} while (!p.tryIncrementActiveCount());
1167 >                    do {} while (!UNSAFE.compareAndSwapInt
1168 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1169                      return;
1170                  }
1171              }
# Line 1143 | Line 1185 | public class ForkJoinWorkerThread extend
1185          objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1186      private static final long qBase =
1187          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1188 +    private static final long poolRunStateOffset = // to inline CAS
1189 +        objectFieldOffset("runState", ForkJoinPool.class);
1190  
1191      private static final int qShift;
1192  
# Line 1151 | Line 1195 | public class ForkJoinWorkerThread extend
1195          if ((s & (s-1)) != 0)
1196              throw new Error("data type scale not a power of two");
1197          qShift = 31 - Integer.numberOfLeadingZeros(s);
1198 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1199      }
1200  
1201      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines