ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.39 by dl, Sat Jul 24 20:28:18 2010 UTC vs.
Revision 1.48 by jsr166, Tue Sep 7 14:37:28 2010 UTC

# Line 110 | Line 110 | public class ForkJoinWorkerThread extend
110       * idea).  (4) We bound the number of attempts to find work (see
111       * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112       * necessary replacing it with a spare (see
113 <     * ForkJoinPool.tryAwaitJoin).
113 >     * ForkJoinPool.awaitJoin).
114       *
115       * Efficient implementation of these algorithms currently relies
116       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 155 | Line 155 | public class ForkJoinWorkerThread extend
155      private static final Random seedGenerator = new Random();
156  
157      /**
158     * The timeout value for suspending spares. Spare workers that
159     * remain unsignalled for more than this time may be trimmed
160     * (killed and removed from pool).  Since our goal is to avoid
161     * long-term thread buildup, the exact value of timeout does not
162     * matter too much so long as it avoids most false-alarm timeouts
163     * under GC stalls or momentarily high system load.
164     */
165    private static final long SPARE_KEEPALIVE_NANOS =
166        5L * 1000L * 1000L * 1000L; // 5 secs
167
168    /**
158       * The maximum stolen->joining link depth allowed in helpJoinTask.
159       * Depths for legitimate chains are unbounded, but we use a fixed
160       * constant to avoid (otherwise unchecked) cycles and bound
# Line 183 | Line 172 | public class ForkJoinWorkerThread extend
172  
173      /**
174       * Maximum work-stealing queue array size.  Must be less than or
175 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
176 <     * is less than usual bounds, because we need leftshift by 3
177 <     * to be in int range).
175 >     * equal to 1 << (31 - width of array entry) to ensure lack of
176 >     * index wraparound. The value is set in the static block
177 >     * at the end of this file after obtaining width.
178       */
179 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
179 >    private static final int MAXIMUM_QUEUE_CAPACITY;
180  
181      /**
182       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 228 | Line 217 | public class ForkJoinWorkerThread extend
217       * Run state of this worker. In addition to the usual run levels,
218       * tracks if this worker is suspended as a spare, and if it was
219       * killed (trimmed) while suspended. However, "active" status is
220 <     * maintained separately.
220 >     * maintained separately and modified only in conjunction with
221 >     * CASes of the pool's runState (which are currently sadly
222 >     * manually inlined for performance.)  Accessed directly by pool
223 >     * to simplify checks for normal (zero) status.
224       */
225 <    private volatile int runState;
225 >    volatile int runState;
226  
227      private static final int TERMINATING = 0x01;
228      private static final int TERMINATED  = 0x02;
# Line 238 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
233 <     * Number of LockSupport.park calls to block this thread for
234 <     * suspension or event waits. Used for internal instrumention;
243 <     * currently not exported but included because volatile write upon
244 <     * park also provides a workaround for a JVM bug.
245 <     */
246 <    volatile int parkCount;
247 <
248 <    /**
249 <     * Number of steals, transferred and reset in pool callbacks pool
250 <     * when idle Accessed directly by pool.
233 >     * Number of steals. Directly accessed (and reset) by
234 >     * pool.tryAccumulateStealCount when idle.
235       */
236      int stealCount;
237  
# Line 283 | Line 267 | public class ForkJoinWorkerThread extend
267      int lastEventCount;
268  
269      /**
270 <     * Encoded index and event count of next event waiter. Used only
271 <     * by ForkJoinPool for managing event waiters.
270 >     * Encoded index and event count of next event waiter. Accessed
271 >     * only by ForkJoinPool for managing event waiters.
272       */
273      volatile long nextWaiter;
274  
275      /**
276 +     * Number of times this thread suspended as spare. Accessed only
277 +     * by pool.
278 +     */
279 +    int spareCount;
280 +
281 +    /**
282 +     * Encoded index and count of next spare waiter. Accessed only
283 +     * by ForkJoinPool for managing spares.
284 +     */
285 +    volatile int nextSpare;
286 +
287 +    /**
288       * The task currently being joined, set only when actively trying
289 <     * to helpStealer. Written only by current thread, but read by
290 <     * others.
289 >     * to help other stealers in helpJoinTask. Written only by this
290 >     * thread, but read by others.
291       */
292      private volatile ForkJoinTask<?> currentJoin;
293  
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Not volatile because always read/written in
297 <     * presence of related volatiles in those cases where it matters.
296 >     * submission queue).  Written only by this thread, but read by
297 >     * others.
298       */
299 <    private ForkJoinTask<?> currentSteal;
299 >    private volatile ForkJoinTask<?> currentSteal;
300  
301      /**
302       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 311 | Line 307 | public class ForkJoinWorkerThread extend
307      protected ForkJoinWorkerThread(ForkJoinPool pool) {
308          this.pool = pool;
309          this.locallyFifo = pool.locallyFifo;
310 +        setDaemon(true);
311          // To avoid exposing construction details to subclasses,
312          // remaining initialization is in start() and onStart()
313      }
314  
315      /**
316 <     * Performs additional initialization and starts this thread
316 >     * Performs additional initialization and starts this thread.
317       */
318      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
319          this.poolIndex = poolIndex;
323        setDaemon(true);
320          if (ueh != null)
321              setUncaughtExceptionHandler(ueh);
322          start();
# Line 353 | Line 349 | public class ForkJoinWorkerThread extend
349      /**
350       * Initializes internal state after construction but before
351       * processing any tasks. If you override this method, you must
352 <     * invoke super.onStart() at the beginning of the method.
352 >     * invoke @code{super.onStart()} at the beginning of the method.
353       * Initialization requires care: Most fields must have legal
354       * default values, to ensure that attempted accesses from other
355       * threads work correctly even before this thread starts
# Line 381 | Line 377 | public class ForkJoinWorkerThread extend
377       */
378      protected void onTermination(Throwable exception) {
379          try {
380 +            ForkJoinPool p = pool;
381 +            if (active) {
382 +                int a; // inline p.tryDecrementActiveCount
383 +                active = false;
384 +                do {} while (!UNSAFE.compareAndSwapInt
385 +                             (p, poolRunStateOffset, a = p.runState, a - 1));
386 +            }
387              cancelTasks();
388              setTerminated();
389 <            pool.workerTerminated(this);
389 >            p.workerTerminated(this);
390          } catch (Throwable ex) {        // Shouldn't ever happen
391              if (exception == null)      // but if so, at least rethrown
392                  exception = ex;
# Line 413 | Line 416 | public class ForkJoinWorkerThread extend
416      // helpers for run()
417  
418      /**
419 <     * Find and execute tasks and check status while running
419 >     * Finds and executes tasks, and checks status while running.
420       */
421      private void mainLoop() {
422 <        int emptyScans = 0; // consecutive times failed to find work
422 >        boolean ran = false; // true if ran a task on last step
423          ForkJoinPool p = pool;
424          for (;;) {
425 <            p.preStep(this, emptyScans);
425 >            p.preStep(this, ran);
426              if (runState != 0)
427 <                return;
428 <            ForkJoinTask<?> t; // try to get and run stolen or submitted task
426 <            if ((t = scan()) != null || (t = pollSubmission()) != null) {
427 <                t.tryExec();
428 <                if (base != sp)
429 <                    runLocalTasks();
430 <                currentSteal = null;
431 <                emptyScans = 0;
432 <            }
433 <            else
434 <                ++emptyScans;
427 >                break;
428 >            ran = tryExecSteal() || tryExecSubmission();
429          }
430      }
431  
432      /**
433 <     * Runs local tasks until queue is empty or shut down.  Call only
434 <     * while active.
433 >     * Tries to steal a task and execute it.
434 >     *
435 >     * @return true if ran a task
436       */
437 <    private void runLocalTasks() {
438 <        while (runState == 0) {
439 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
440 <            if (t != null)
441 <                t.tryExec();
442 <            else if (base == sp)
443 <                break;
437 >    private boolean tryExecSteal() {
438 >        ForkJoinTask<?> t;
439 >        if ((t = scan()) != null) {
440 >            t.quietlyExec();
441 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
442 >            if (sp != base)
443 >                execLocalTasks();
444 >            return true;
445          }
446 +        return false;
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and take it
450 >     * If a submission exists, try to activate and run it.
451       *
452 <     * @return a task, if available
452 >     * @return true if ran a task
453       */
454 <    private ForkJoinTask<?> pollSubmission() {
454 >    private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456 +        // This loop is needed in case attempt to activate fails, in
457 +        // which case we only retry if there still appears to be a
458 +        // submission.
459          while (p.hasQueuedSubmissions()) {
460 <            if (active || (active = p.tryIncrementActiveCount())) {
461 <                ForkJoinTask<?> t = p.pollSubmission();
462 <                if (t != null) {
463 <                    currentSteal = t;
464 <                    return t;
460 >            ForkJoinTask<?> t; int a;
461 >            if (active || // inline p.tryIncrementActiveCount
462 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
463 >                                                   a = p.runState, a + 1))) {
464 >                if ((t = p.pollSubmission()) != null) {
465 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
466 >                    t.quietlyExec();
467 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
468 >                    if (sp != base)
469 >                        execLocalTasks();
470 >                    return true;
471                  }
466                return scan(); // if missed, rescan
472              }
473          }
474 <        return null;
474 >        return false;
475 >    }
476 >
477 >    /**
478 >     * Runs local tasks until queue is empty or shut down.  Call only
479 >     * while active.
480 >     */
481 >    private void execLocalTasks() {
482 >        while (runState == 0) {
483 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
484 >            if (t != null)
485 >                t.quietlyExec();
486 >            else if (sp == base)
487 >                break;
488 >        }
489      }
490  
491      /*
492       * Intrinsics-based atomic writes for queue slots. These are
493 <     * basically the same as methods in AtomicObjectArray, but
493 >     * basically the same as methods in AtomicReferenceArray, but
494       * specialized for (1) ForkJoinTask elements (2) requirement that
495       * nullness and bounds checks have already been performed by
496       * callers and (3) effective offsets are known not to overflow
497       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
498       * need corresponding version for reads: plain array reads are OK
499 <     * because they protected by other volatile reads and are
499 >     * because they are protected by other volatile reads and are
500       * confirmed by CASes.
501       *
502       * Most uses don't actually call these methods, but instead contain
# Line 501 | Line 520 | public class ForkJoinWorkerThread extend
520       * range. This method is used only during resets and backouts.
521       */
522      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
523 <                                              ForkJoinTask<?> t) {
523 >                                        ForkJoinTask<?> t) {
524          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
525      }
526  
# Line 534 | Line 553 | public class ForkJoinWorkerThread extend
553          ForkJoinTask<?> t;
554          ForkJoinTask<?>[] q;
555          int b, i;
556 <        if ((b = base) != sp &&
556 >        if (sp != (b = base) &&
557              (q = queue) != null && // must read q after b
558              (t = q[i = (q.length - 1) & b]) != null && base == b &&
559              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
# Line 546 | Line 565 | public class ForkJoinWorkerThread extend
565  
566      /**
567       * Tries to take a task from the base of own queue. Assumes active
568 <     * status.  Called only by current thread.
568 >     * status.  Called only by this thread.
569       *
570       * @return a task, or null if none
571       */
# Line 569 | Line 588 | public class ForkJoinWorkerThread extend
588  
589      /**
590       * Returns a popped task, or null if empty. Assumes active status.
591 <     * Called only by current thread.
591 >     * Called only by this thread.
592       */
593 <    final ForkJoinTask<?> popTask() {
594 <        int s;
595 <        ForkJoinTask<?>[] q;
596 <        if (base != (s = sp) && (q = queue) != null) {
597 <            int i = (q.length - 1) & --s;
598 <            ForkJoinTask<?> t = q[i];
599 <            if (t != null && UNSAFE.compareAndSwapObject
600 <                (q, (i << qShift) + qBase, t, null)) {
601 <                sp = s;
602 <                return t;
593 >    private ForkJoinTask<?> popTask() {
594 >        ForkJoinTask<?>[] q = queue;
595 >        if (q != null) {
596 >            int s;
597 >            while ((s = sp) != base) {
598 >                int i = (q.length - 1) & --s;
599 >                long u = (i << qShift) + qBase; // raw offset
600 >                ForkJoinTask<?> t = q[i];
601 >                if (t == null)   // lost to stealer
602 >                    break;
603 >                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
604 >                    sp = s; // putOrderedInt may encourage more timely write
605 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
606 >                    return t;
607 >                }
608              }
609          }
610          return null;
# Line 588 | Line 612 | public class ForkJoinWorkerThread extend
612  
613      /**
614       * Specialized version of popTask to pop only if topmost element
615 <     * is the given task. Called only by current thread while
592 <     * active.
615 >     * is the given task. Called only by this thread while active.
616       *
617       * @param t the task. Caller must ensure non-null.
618       */
619      final boolean unpushTask(ForkJoinTask<?> t) {
620          int s;
621 <        ForkJoinTask<?>[] q;
622 <        if (base != (s = sp) && (q = queue) != null &&
621 >        ForkJoinTask<?>[] q = queue;
622 >        if ((s = sp) != base && q != null &&
623              UNSAFE.compareAndSwapObject
624              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
625 <            sp = s;
625 >            sp = s; // putOrderedInt may encourage more timely write
626 >            // UNSAFE.putOrderedInt(this, spOffset, s);
627              return true;
628          }
629          return false;
630      }
631  
632      /**
633 <     * Returns next task or null if empty or contended
633 >     * Returns next task, or null if empty or contended.
634       */
635      final ForkJoinTask<?> peekTask() {
636          ForkJoinTask<?>[] q = queue;
# Line 648 | Line 672 | public class ForkJoinWorkerThread extend
672       * Computes next value for random victim probe in scan().  Scans
673       * don't require a very high quality generator, but also not a
674       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
675 <     * Note: This is manually inlined in scan()
675 >     * Note: This is manually inlined in scan().
676       */
677      private static final int xorShift(int r) {
678          r ^= r << 13;
# Line 687 | Line 711 | public class ForkJoinWorkerThread extend
711              for (;;) {
712                  ForkJoinWorkerThread v = ws[k & mask];
713                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
714 <                if (v != null && v.base != v.sp) {
715 <                    if (canSteal ||       // ensure active status
716 <                        (canSteal = active = p.tryIncrementActiveCount())) {
717 <                        int b = v.base;   // inline specialized deqTask
718 <                        ForkJoinTask<?>[] q;
719 <                        if (b != v.sp && (q = v.queue) != null) {
720 <                            ForkJoinTask<?> t;
721 <                            int i = (q.length - 1) & b;
722 <                            long u = (i << qShift) + qBase; // raw offset
723 <                            if ((t = q[i]) != null && v.base == b &&
724 <                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
725 <                                currentSteal = t;
726 <                                v.stealHint = poolIndex;
727 <                                v.base = b + 1;
728 <                                seed = r;
729 <                                ++stealCount;
730 <                                return t;
731 <                            }
714 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
715 >                if (v != null && (b = v.base) != v.sp &&
716 >                    (q = v.queue) != null) {
717 >                    int i = (q.length - 1) & b;
718 >                    long u = (i << qShift) + qBase; // raw offset
719 >                    int pid = poolIndex;
720 >                    if ((t = q[i]) != null) {
721 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
722 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
723 >                                                     a = p.runState, a + 1))
724 >                            canSteal = active = true;
725 >                        if (canSteal && v.base == b++ &&
726 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
727 >                            v.base = b;
728 >                            v.stealHint = pid;
729 >                            UNSAFE.putOrderedObject(this,
730 >                                                    currentStealOffset, t);
731 >                            seed = r;
732 >                            ++stealCount;
733 >                            return t;
734                          }
735                      }
736                      j = -n;
# Line 724 | Line 750 | public class ForkJoinWorkerThread extend
750      // Run State management
751  
752      // status check methods used mainly by ForkJoinPool
753 +    final boolean isRunning()     { return runState == 0; }
754      final boolean isTerminating() { return (runState & TERMINATING) != 0; }
755      final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
756      final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
757      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
758  
759      /**
760 <     * Sets state to TERMINATING, also resuming if suspended.
760 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
761 >     * to wake up if currently blocked. Callers must do so if desired.
762       */
763      final void shutdown() {
764          for (;;) {
765              int s = runState;
766 +            if ((s & (TERMINATING|TERMINATED)) != 0)
767 +                break;
768              if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
769                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
770                                               (s & ~SUSPENDED) |
771 <                                             (TRIMMED|TERMINATING))) {
742 <                    LockSupport.unpark(this);
771 >                                             (TRIMMED|TERMINATING)))
772                      break;
744                }
773              }
774              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
775                                                s | TERMINATING))
# Line 750 | Line 778 | public class ForkJoinWorkerThread extend
778      }
779  
780      /**
781 <     * Sets state to TERMINATED. Called only by this thread.
781 >     * Sets state to TERMINATED. Called only by onTermination().
782       */
783      private void setTerminated() {
784          int s;
# Line 760 | Line 788 | public class ForkJoinWorkerThread extend
788      }
789  
790      /**
791 <     * Instrumented version of park used by ForkJoinPool.eventSync
792 <     */
765 <    final void doPark() {
766 <        ++parkCount;
767 <        LockSupport.park(this);
768 <    }
769 <
770 <    /**
771 <     * If suspended, tries to set status to unsuspended and unparks.
791 >     * If suspended, tries to set status to unsuspended.
792 >     * Does NOT wake up if blocked.
793       *
794       * @return true if successful
795       */
796 <    final boolean tryResumeSpare() {
797 <        int s = runState;
798 <        if ((s & SUSPENDED) != 0 &&
799 <            UNSAFE.compareAndSwapInt(this, runStateOffset, s,
800 <                                     s & ~SUSPENDED)) {
801 <            LockSupport.unpark(this);
781 <            return true;
796 >    final boolean tryUnsuspend() {
797 >        int s;
798 >        while (((s = runState) & SUSPENDED) != 0) {
799 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
800 >                                         s & ~SUSPENDED))
801 >                return true;
802          }
803          return false;
804      }
805  
806      /**
807 <     * Sets suspended status and blocks as spare until resumed,
808 <     * shutdown, or timed out.
789 <     *
790 <     * @return false if trimmed
807 >     * Sets suspended status and blocks as spare until resumed
808 >     * or shutdown.
809       */
810 <    final boolean suspendAsSpare() {
811 <        for (;;) {               // set suspended unless terminating
810 >    final void suspendAsSpare() {
811 >        for (;;) {                  // set suspended unless terminating
812              int s = runState;
813              if ((s & TERMINATING) != 0) { // must kill
814                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
815                                               s | (TRIMMED | TERMINATING)))
816 <                    return false;
816 >                    return;
817              }
818              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
819                                                s | SUSPENDED))
820                  break;
821          }
822 <        int pc = pool.parallelism;
823 <        pool.accumulateStealCount(this);
806 <        boolean timed;
807 <        long nanos;
808 <        long startTime;
809 <        if (poolIndex < pc) { // untimed wait for core threads
810 <            timed = false;
811 <            nanos = 0L;
812 <            startTime = 0L;
813 <        }
814 <        else {                // timed wait for added threads
815 <            timed = true;
816 <            nanos = SPARE_KEEPALIVE_NANOS;
817 <            startTime = System.nanoTime();
818 <        }
819 <        lastEventCount = 0;      // reset upon resume
820 <        interrupted();           // clear/ignore interrupts
822 >        ForkJoinPool p = pool;
823 >        p.pushSpare(this);
824          while ((runState & SUSPENDED) != 0) {
825 <            ++parkCount;
826 <            if (!timed)
825 >            if (p.tryAccumulateStealCount(this)) {
826 >                interrupted();          // clear/ignore interrupts
827 >                if ((runState & SUSPENDED) == 0)
828 >                    break;
829                  LockSupport.park(this);
825            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
826                LockSupport.parkNanos(this, nanos);
827            else { // try to trim on timeout
828                int s = runState;
829                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
830                                             (s & ~SUSPENDED) |
831                                             (TRIMMED|TERMINATING)))
832                    return false;
830              }
831          }
835        return true;
832      }
833  
834      // Misc support methods for ForkJoinPool
# Line 842 | Line 838 | public class ForkJoinWorkerThread extend
838       * used by ForkJoinTask.
839       */
840      final int getQueueSize() {
841 <        return -base + sp;
841 >        int n; // external calls must read base first
842 >        return (n = -base + sp) <= 0 ? 0 : n;
843      }
844  
845      /**
# Line 850 | Line 847 | public class ForkJoinWorkerThread extend
847       * thread.
848       */
849      final void cancelTasks() {
850 <        ForkJoinTask<?> cj = currentJoin; // try to kill live tasks
850 >        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
851          if (cj != null) {
852              currentJoin = null;
853              cj.cancelIgnoringExceptions();
854 +            try {
855 +                this.interrupt(); // awaken wait
856 +            } catch (SecurityException ignore) {
857 +            }
858          }
859          ForkJoinTask<?> cs = currentSteal;
860          if (cs != null) {
# Line 892 | Line 893 | public class ForkJoinWorkerThread extend
893       * @return a task, if available
894       */
895      final ForkJoinTask<?> pollLocalTask() {
896 +        ForkJoinPool p = pool;
897          while (sp != base) {
898 <            if (active || (active = pool.tryIncrementActiveCount()))
899 <                return locallyFifo? locallyDeqTask() : popTask();
898 >            int a; // inline p.tryIncrementActiveCount
899 >            if (active ||
900 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
901 >                                                   a = p.runState, a + 1)))
902 >                return locallyFifo ? locallyDeqTask() : popTask();
903          }
904          return null;
905      }
# Line 908 | Line 913 | public class ForkJoinWorkerThread extend
913          ForkJoinTask<?> t = pollLocalTask();
914          if (t == null) {
915              t = scan();
916 <            currentSteal = null; // cannot retain/track
916 >            // cannot retain/track/help steal
917 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
918          }
919          return t;
920      }
921  
922      /**
923       * Possibly runs some tasks and/or blocks, until task is done.
918     * The main body is basically a big spinloop, alternating between
919     * calls to helpJoinTask and pool.tryAwaitJoin with increased
920     * patience parameters until either the task is done without
921     * waiting, or we have, if necessary, created or resumed a
922     * replacement for this thread while it blocks.
924       *
925       * @param joinMe the task to join
925     * @return task status on exit
926       */
927 <     final int joinTask(ForkJoinTask<?> joinMe) {
928 <        int stat;
927 >    final void joinTask(ForkJoinTask<?> joinMe) {
928 >        // currentJoin only written by this thread; only need ordered store
929          ForkJoinTask<?> prevJoin = currentJoin;
930        // Only written by this thread; only need ordered store
930          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
931 <        if ((stat = joinMe.status) >= 0 &&
932 <            (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
933 <            for (int retries = 0; ; ++retries) {
934 <                helpJoinTask(joinMe, retries);
936 <                if ((stat = joinMe.status) < 0)
937 <                    break;
938 <                pool.tryAwaitJoin(joinMe, retries);
939 <                if ((stat = joinMe.status) < 0)
940 <                    break;
941 <                Thread.yield(); // tame unbounded loop
942 <            }
943 <        }
931 >        if (sp != base)
932 >            localHelpJoinTask(joinMe);
933 >        if (joinMe.status >= 0)
934 >            pool.awaitJoin(joinMe, this);
935          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
945        return stat;
936      }
937  
938      /**
939       * Run tasks in local queue until given task is done.
940       *
941       * @param joinMe the task to join
952     * @return task status on exit
942       */
943 <    private int localHelpJoinTask(ForkJoinTask<?> joinMe) {
944 <        int stat, s;
943 >    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
944 >        int s;
945          ForkJoinTask<?>[] q;
946 <        while ((stat = joinMe.status) >= 0 &&
958 <               base != (s = sp) && (q = queue) != null) {
959 <            ForkJoinTask<?> t;
946 >        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
947              int i = (q.length - 1) & --s;
948              long u = (i << qShift) + qBase; // raw offset
949 <            if ((t = q[i]) != null &&
950 <                UNSAFE.compareAndSwapObject(q, u, t, null)) {
949 >            ForkJoinTask<?> t = q[i];
950 >            if (t == null)  // lost to a stealer
951 >                break;
952 >            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
953                  /*
954                   * This recheck (and similarly in helpJoinTask)
955                   * handles cases where joinMe is independently
# Line 968 | Line 957 | public class ForkJoinWorkerThread extend
957                   * available. Back out of the pop by putting t back
958                   * into slot before we commit by writing sp.
959                   */
960 <                if ((stat = joinMe.status) < 0) {
960 >                if (joinMe.status < 0) {
961                      UNSAFE.putObjectVolatile(q, u, t);
962                      break;
963                  }
964                  sp = s;
965 <                t.tryExec();
965 >                // UNSAFE.putOrderedInt(this, spOffset, s);
966 >                t.quietlyExec();
967              }
968          }
979        return stat;
969      }
970  
971      /**
972 <     * Tries to locate and help perform tasks for a stealer of the
973 <     * given task, or in turn one of its stealers.  Traces
974 <     * currentSteal->currentJoin links looking for a thread working on
975 <     * a descendant of the given task and with a non-empty queue to
976 <     * steal back and execute tasks from. Restarts search upon
977 <     * encountering chains that are stale, unknown, or of length
978 <     * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles.
979 <     *
980 <     * The implementation is very branchy to cope with the restart
981 <     * cases.  Returns void, not task status (which must be reread by
982 <     * caller anyway) to slightly simplify control paths.
972 >     * Unless terminating, tries to locate and help perform tasks for
973 >     * a stealer of the given task, or in turn one of its stealers.
974 >     * Traces currentSteal->currentJoin links looking for a thread
975 >     * working on a descendant of the given task and with a non-empty
976 >     * queue to steal back and execute tasks from.
977 >     *
978 >     * The implementation is very branchy to cope with potential
979 >     * inconsistencies or loops encountering chains that are stale,
980 >     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
981 >     * of these cases are dealt with by just returning back to the
982 >     * caller, who is expected to retry if other join mechanisms also
983 >     * don't work out.
984       *
985       * @param joinMe the task to join
996     * @param rescans the number of times to recheck for work
986       */
987 <    private void helpJoinTask(ForkJoinTask<?> joinMe, int rescans) {
988 <        ForkJoinWorkerThread[] ws = pool.workers;
987 >    final void helpJoinTask(ForkJoinTask<?> joinMe) {
988 >        ForkJoinWorkerThread[] ws;
989          int n;
990 <        if (ws == null || (n = ws.length) <= 1)
991 <            return;                   // need at least 2 workers
992 <        restart:while (rescans-- >= 0 && joinMe.status >= 0) {
993 <            ForkJoinTask<?> task = joinMe;        // base of chain
994 <            ForkJoinWorkerThread thread = this;   // thread with stolen task
995 <            for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
996 <                // Try to find v, the stealer of task, by first using hint
997 <                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
998 <                if (v == null || v.currentSteal != task) {
999 <                    for (int j = 0; ; ++j) {      // search array
1000 <                        if (task.status < 0 || j == n)
1001 <                            continue restart;     // stale or no stealer
1002 <                        if ((v = ws[j]) != null && v.currentSteal == task) {
1003 <                            thread.stealHint = j; // save for next time
1004 <                            break;
1005 <                        }
1006 <                    }
1007 <                }
1008 <                // Try to help v, using specialized form of deqTask
1009 <                int b;
1010 <                ForkJoinTask<?>[] q;
1011 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1012 <                    int i = (q.length - 1) & b;
1013 <                    long u = (i << qShift) + qBase;
1014 <                    ForkJoinTask<?> t = q[i];
1026 <                    if (task.status < 0)          // stale
1027 <                        continue restart;
1028 <                    if (t != null) {
1029 <                        if (v.base == b &&
1030 <                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
1031 <                            if (joinMe.status < 0) {
1032 <                                UNSAFE.putObjectVolatile(q, u, t);
1033 <                                return;           // back out on cancel
990 >        if (joinMe.status < 0)                // already done
991 >            return;
992 >        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
993 >            joinMe.cancelIgnoringExceptions();
994 >            return;
995 >        }
996 >        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
997 >            return;                           // need at least 2 workers
998 >
999 >        ForkJoinTask<?> task = joinMe;        // base of chain
1000 >        ForkJoinWorkerThread thread = this;   // thread with stolen task
1001 >        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1002 >            // Try to find v, the stealer of task, by first using hint
1003 >            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1004 >            if (v == null || v.currentSteal != task) {
1005 >                for (int j = 0; ; ++j) {      // search array
1006 >                    if (j < n) {
1007 >                        ForkJoinTask<?> vs;
1008 >                        if ((v = ws[j]) != null &&
1009 >                            (vs = v.currentSteal) != null) {
1010 >                            if (joinMe.status < 0 || task.status < 0)
1011 >                                return;       // stale or done
1012 >                            if (vs == task) {
1013 >                                thread.stealHint = j;
1014 >                                break;        // save hint for next time
1015                              }
1035                            ForkJoinTask<?> prevSteal = currentSteal;
1036                            currentSteal = t;
1037                            v.stealHint = poolIndex;
1038                            v.base = b + 1;
1039                            t.tryExec();
1040                            currentSteal = prevSteal;
1016                          }
1017                      }
1018 <                    else if (v.base == b)          // producer stalled
1019 <                        continue restart;          // retry via restart
1045 <                    if (joinMe.status < 0)
1046 <                        return;
1018 >                    else
1019 >                        return;               // no stealer
1020                  }
1021 <                // Try to descend to find v's stealer
1022 <                ForkJoinTask<?> next = v.currentJoin;
1050 <                if (next == null || next == task || task.status < 0)
1051 <                    continue restart;             // no descendent or stale
1021 >            }
1022 >            for (;;) { // Try to help v, using specialized form of deqTask
1023                  if (joinMe.status < 0)
1024                      return;
1025 <                task = next;
1026 <                thread = v;
1025 >                int b = v.base;
1026 >                ForkJoinTask<?>[] q = v.queue;
1027 >                if (b == v.sp || q == null)
1028 >                    break;
1029 >                int i = (q.length - 1) & b;
1030 >                long u = (i << qShift) + qBase;
1031 >                ForkJoinTask<?> t = q[i];
1032 >                int pid = poolIndex;
1033 >                ForkJoinTask<?> ps = currentSteal;
1034 >                if (task.status < 0)
1035 >                    return;                   // stale or done
1036 >                if (t != null && v.base == b++ &&
1037 >                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1038 >                    if (joinMe.status < 0) {
1039 >                        UNSAFE.putObjectVolatile(q, u, t);
1040 >                        return;               // back out on cancel
1041 >                    }
1042 >                    v.base = b;
1043 >                    v.stealHint = pid;
1044 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1045 >                    t.quietlyExec();
1046 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1047 >                }
1048              }
1049 +            // Try to descend to find v's stealer
1050 +            ForkJoinTask<?> next = v.currentJoin;
1051 +            if (task.status < 0 || next == null || next == task ||
1052 +                joinMe.status < 0)
1053 +                return;
1054 +            task = next;
1055 +            thread = v;
1056          }
1057      }
1058  
1059      /**
1060 +     * Implements ForJoinTask.getSurplusQueuedTaskCount().
1061       * Returns an estimate of the number of tasks, offset by a
1062       * function of number of idle workers.
1063       *
# Line 1112 | Line 1112 | public class ForkJoinWorkerThread extend
1112       * Runs tasks until {@code pool.isQuiescent()}.
1113       */
1114      final void helpQuiescePool() {
1115 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1116          for (;;) {
1117              ForkJoinTask<?> t = pollLocalTask();
1118 <            if (t != null || (t = scan()) != null) {
1119 <                t.tryExec();
1119 <                currentSteal = null;
1120 <            }
1118 >            if (t != null || (t = scan()) != null)
1119 >                t.quietlyExec();
1120              else {
1121                  ForkJoinPool p = pool;
1122 +                int a; // to inline CASes
1123                  if (active) {
1124 +                    if (!UNSAFE.compareAndSwapInt
1125 +                        (p, poolRunStateOffset, a = p.runState, a - 1))
1126 +                        continue;   // retry later
1127                      active = false; // inactivate
1128 <                    do {} while (!p.tryDecrementActiveCount());
1128 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1129                  }
1130                  if (p.isQuiescent()) {
1131                      active = true; // re-activate
1132 <                    do {} while (!p.tryIncrementActiveCount());
1132 >                    do {} while (!UNSAFE.compareAndSwapInt
1133 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1134                      return;
1135                  }
1136              }
# Line 1136 | Line 1140 | public class ForkJoinWorkerThread extend
1140      // Unsafe mechanics
1141  
1142      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1143 +    private static final long spOffset =
1144 +        objectFieldOffset("sp", ForkJoinWorkerThread.class);
1145      private static final long runStateOffset =
1146          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1147      private static final long currentJoinOffset =
1148          objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1149 +    private static final long currentStealOffset =
1150 +        objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1151      private static final long qBase =
1152          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1153 +    private static final long poolRunStateOffset = // to inline CAS
1154 +        objectFieldOffset("runState", ForkJoinPool.class);
1155 +
1156      private static final int qShift;
1157  
1158      static {
# Line 1149 | Line 1160 | public class ForkJoinWorkerThread extend
1160          if ((s & (s-1)) != 0)
1161              throw new Error("data type scale not a power of two");
1162          qShift = 31 - Integer.numberOfLeadingZeros(s);
1163 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1164      }
1165  
1166      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines