ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.36 by dl, Fri Jul 23 13:07:43 2010 UTC vs.
Revision 1.43 by jsr166, Wed Sep 1 20:12:39 2010 UTC

# Line 97 | Line 97 | public class ForkJoinWorkerThread extend
97       * technique for implementing efficient futures" SIGPLAN Notices,
98       * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99       * in that: (1) We only maintain dependency links across workers
100 <     * upon steals, rather than maintain per-task bookkeeping.  This
101 <     * may require a linear scan of workers array to locate stealers,
102 <     * but usually doesn't because stealers leave hints (that may
103 <     * become stale/wrong) of where to locate the kathem. This
104 <     * isolates cost to when it is needed, rather than adding to
105 <     * per-task overhead.  (2) It is "shallow", ignoring nesting and
106 <     * potentially cyclic mutual steals.  (3) It is intentionally
107 <     * racy: field currentJoin is updated only while actively joining,
108 <     * which means that we could miss links in the chain during
109 <     * long-lived tasks, GC stalls etc.  (4) We bound the number of
110 <     * attempts to find work (see MAX_HELP_DEPTH) and fall back to
111 <     * suspending the worker and if necessary replacing it with a
112 <     * spare (see ForkJoinPool.tryAwaitJoin).
100 >     * upon steals, rather than use per-task bookkeeping.  This may
101 >     * require a linear scan of workers array to locate stealers, but
102 >     * usually doesn't because stealers leave hints (that may become
103 >     * stale/wrong) of where to locate them. This isolates cost to
104 >     * when it is needed, rather than adding to per-task overhead.
105 >     * (2) It is "shallow", ignoring nesting and potentially cyclic
106 >     * mutual steals.  (3) It is intentionally racy: field currentJoin
107 >     * is updated only while actively joining, which means that we
108 >     * miss links in the chain during long-lived tasks, GC stalls etc
109 >     * (which is OK since blocking in such cases is usually a good
110 >     * idea).  (4) We bound the number of attempts to find work (see
111 >     * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112 >     * necessary replacing it with a spare (see
113 >     * ForkJoinPool.awaitJoin).
114       *
115       * Efficient implementation of these algorithms currently relies
116       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 154 | Line 155 | public class ForkJoinWorkerThread extend
155      private static final Random seedGenerator = new Random();
156  
157      /**
157     * The timeout value for suspending spares. Spare workers that
158     * remain unsignalled for more than this time may be trimmed
159     * (killed and removed from pool).  Since our goal is to avoid
160     * long-term thread buildup, the exact value of timeout does not
161     * matter too much so long as it avoids most false-alarm timeouts
162     * under GC stalls or momentarily high system load.
163     */
164    private static final long SPARE_KEEPALIVE_NANOS =
165        5L * 1000L * 1000L * 1000L; // 5 secs
166
167    /**
158       * The maximum stolen->joining link depth allowed in helpJoinTask.
159       * Depths for legitimate chains are unbounded, but we use a fixed
160       * constant to avoid (otherwise unchecked) cycles and bound
# Line 198 | Line 188 | public class ForkJoinWorkerThread extend
188       * Initialized in onStart, to improve memory locality.
189       */
190      private ForkJoinTask<?>[] queue;
191 <    
191 >
192      /**
193       * Index (mod queue.length) of least valid queue slot, which is
194       * always the next position to steal from if nonempty.
# Line 227 | Line 217 | public class ForkJoinWorkerThread extend
217       * Run state of this worker. In addition to the usual run levels,
218       * tracks if this worker is suspended as a spare, and if it was
219       * killed (trimmed) while suspended. However, "active" status is
220 <     * maintained separately.
220 >     * maintained separately and modified only in conjunction with
221 >     * CASes of the pool's runState (which are currently sadly
222 >     * manually inlined for performance.)  Accessed directly by pool
223 >     * to simplify checks for normal (zero) status.
224       */
225 <    private volatile int runState;
225 >    volatile int runState;
226  
227      private static final int TERMINATING = 0x01;
228      private static final int TERMINATED  = 0x02;
# Line 237 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
240     * Number of LockSupport.park calls to block this thread for
241     * suspension or event waits. Used for internal instrumention;
242     * currently not exported but included because volatile write upon
243     * park also provides a workaround for a JVM bug.
244     */
245    volatile int parkCount;
246
247    /**
233       * Number of steals, transferred and reset in pool callbacks pool
234       * when idle Accessed directly by pool.
235       */
# Line 256 | Line 241 | public class ForkJoinWorkerThread extend
241       */
242      private int seed;
243  
259
244      /**
245       * Activity status. When true, this worker is considered active.
246       * Accessed directly by pool.  Must be false upon construction.
# Line 265 | Line 249 | public class ForkJoinWorkerThread extend
249  
250      /**
251       * True if use local fifo, not default lifo, for local polling.
252 <     * Shadows value from ForkJoinPool, which resets it if changed
269 <     * pool-wide.
252 >     * Shadows value from ForkJoinPool.
253       */
254      private final boolean locallyFifo;
255 <    
255 >
256      /**
257       * Index of this worker in pool array. Set once by pool before
258       * running, and accessed directly by pool to locate this worker in
# Line 284 | Line 267 | public class ForkJoinWorkerThread extend
267      int lastEventCount;
268  
269      /**
270 <     * Encoded index and event count of next event waiter. Used only
271 <     * by ForkJoinPool for managing event waiters.
270 >     * Encoded index and event count of next event waiter. Accessed
271 >     * only by ForkJoinPool for managing event waiters.
272       */
273      volatile long nextWaiter;
274  
275      /**
276 +     * Number of times this thread suspended as spare. Accessed only
277 +     * by pool.
278 +     */
279 +    int spareCount;
280 +
281 +    /**
282 +     * Encoded index and count of next spare waiter. Accessed only
283 +     * by ForkJoinPool for managing spares.
284 +     */
285 +    volatile int nextSpare;
286 +
287 +    /**
288       * The task currently being joined, set only when actively trying
289       * to helpStealer. Written only by current thread, but read by
290       * others.
291       */
292      private volatile ForkJoinTask<?> currentJoin;
293 <    
293 >
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Not volatile because always read/written in
297 <     * presence of related volatiles in those cases where it matters.
296 >     * submission queue).  Written only by current thread, but read by
297 >     * others.
298       */
299 <    private ForkJoinTask<?> currentSteal;
299 >    private volatile ForkJoinTask<?> currentSteal;
300  
301      /**
302       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 312 | Line 307 | public class ForkJoinWorkerThread extend
307      protected ForkJoinWorkerThread(ForkJoinPool pool) {
308          this.pool = pool;
309          this.locallyFifo = pool.locallyFifo;
310 +        setDaemon(true);
311          // To avoid exposing construction details to subclasses,
312          // remaining initialization is in start() and onStart()
313      }
# Line 323 | Line 319 | public class ForkJoinWorkerThread extend
319          this.poolIndex = poolIndex;
320          if (ueh != null)
321              setUncaughtExceptionHandler(ueh);
326        setDaemon(true);
322          start();
323      }
324  
# Line 382 | Line 377 | public class ForkJoinWorkerThread extend
377       */
378      protected void onTermination(Throwable exception) {
379          try {
380 +            ForkJoinPool p = pool;
381 +            if (active) {
382 +                int a; // inline p.tryDecrementActiveCount
383 +                active = false;
384 +                do {} while (!UNSAFE.compareAndSwapInt
385 +                             (p, poolRunStateOffset, a = p.runState, a - 1));
386 +            }
387              cancelTasks();
388              setTerminated();
389 <            pool.workerTerminated(this);
389 >            p.workerTerminated(this);
390          } catch (Throwable ex) {        // Shouldn't ever happen
391              if (exception == null)      // but if so, at least rethrown
392                  exception = ex;
# Line 417 | Line 419 | public class ForkJoinWorkerThread extend
419       * Find and execute tasks and check status while running
420       */
421      private void mainLoop() {
422 <        int emptyScans = 0; // consecutive times failed to find work
422 >        boolean ran = false; // true if ran a task on last step
423          ForkJoinPool p = pool;
424          for (;;) {
425 <            p.preStep(this, emptyScans);
425 >            p.preStep(this, ran);
426              if (runState != 0)
427 <                return;
428 <            ForkJoinTask<?> t; // try to get and run stolen or submitted task
427 <            if ((t = scan()) != null || (t = pollSubmission()) != null) {
428 <                t.tryExec();
429 <                if (base != sp)
430 <                    runLocalTasks();
431 <                currentSteal = null;
432 <                emptyScans = 0;
433 <            }
434 <            else
435 <                ++emptyScans;
427 >                break;
428 >            ran = tryExecSteal() || tryExecSubmission();
429          }
430      }
431  
432      /**
433 <     * Runs local tasks until queue is empty or shut down.  Call only
434 <     * while active.
433 >     * Try to steal a task and execute it
434 >     *
435 >     * @return true if ran a task
436       */
437 <    private void runLocalTasks() {
438 <        while (runState == 0) {
439 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
440 <            if (t != null)
441 <                t.tryExec();
442 <            else if (base == sp)
443 <                break;
437 >    private boolean tryExecSteal() {
438 >        ForkJoinTask<?> t;
439 >        if ((t = scan()) != null) {
440 >            t.quietlyExec();
441 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
442 >            if (sp != base)
443 >                execLocalTasks();
444 >            return true;
445          }
446 +        return false;
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and take it
450 >     * If a submission exists, try to activate and run it;
451       *
452 <     * @return a task, if available
452 >     * @return true if ran a task
453       */
454 <    private ForkJoinTask<?> pollSubmission() {
454 >    private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456          while (p.hasQueuedSubmissions()) {
457 <            if (active || (active = p.tryIncrementActiveCount())) {
458 <                ForkJoinTask<?> t = p.pollSubmission();
459 <                if (t != null) {
460 <                    currentSteal = t;
461 <                    return t;
457 >            ForkJoinTask<?> t; int a;
458 >            if (active || // inline p.tryIncrementActiveCount
459 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
460 >                                                   a = p.runState, a + 1))) {
461 >                if ((t = p.pollSubmission()) != null) {
462 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
463 >                    t.quietlyExec();
464 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
465 >                    if (sp != base)
466 >                        execLocalTasks();
467 >                    return true;
468                  }
467                return scan(); // if missed, rescan
469              }
470          }
471 <        return null;
471 >        return false;
472 >    }
473 >
474 >    /**
475 >     * Runs local tasks until queue is empty or shut down.  Call only
476 >     * while active.
477 >     */
478 >    private void execLocalTasks() {
479 >        while (runState == 0) {
480 >            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
481 >            if (t != null)
482 >                t.quietlyExec();
483 >            else if (sp == base)
484 >                break;
485 >        }
486      }
487  
488      /*
# Line 535 | Line 550 | public class ForkJoinWorkerThread extend
550          ForkJoinTask<?> t;
551          ForkJoinTask<?>[] q;
552          int b, i;
553 <        if ((b = base) != sp &&
553 >        if (sp != (b = base) &&
554              (q = queue) != null && // must read q after b
555              (t = q[i = (q.length - 1) & b]) != null && base == b &&
556              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
# Line 570 | Line 585 | public class ForkJoinWorkerThread extend
585  
586      /**
587       * Returns a popped task, or null if empty. Assumes active status.
588 <     * Called only by current thread.
588 >     * Called only by current thread.
589       */
590 <    final ForkJoinTask<?> popTask() {
591 <        int s;
592 <        ForkJoinTask<?>[] q;
593 <        if (base != (s = sp) && (q = queue) != null) {
594 <            int i = (q.length - 1) & --s;
595 <            ForkJoinTask<?> t = q[i];
596 <            if (t != null && UNSAFE.compareAndSwapObject
597 <                (q, (i << qShift) + qBase, t, null)) {
598 <                sp = s;
599 <                return t;
590 >    private ForkJoinTask<?> popTask() {
591 >        ForkJoinTask<?>[] q = queue;
592 >        if (q != null) {
593 >            int s;
594 >            while ((s = sp) != base) {
595 >                int i = (q.length - 1) & --s;
596 >                long u = (i << qShift) + qBase; // raw offset
597 >                ForkJoinTask<?> t = q[i];
598 >                if (t == null)   // lost to stealer
599 >                    break;
600 >                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
601 >                    sp = s; // putOrderedInt may encourage more timely write
602 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
603 >                    return t;
604 >                }
605              }
606          }
607          return null;
# Line 596 | Line 616 | public class ForkJoinWorkerThread extend
616       */
617      final boolean unpushTask(ForkJoinTask<?> t) {
618          int s;
619 <        ForkJoinTask<?>[] q;
620 <        if (base != (s = sp) && (q = queue) != null &&
619 >        ForkJoinTask<?>[] q = queue;
620 >        if ((s = sp) != base && q != null &&
621              UNSAFE.compareAndSwapObject
622              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
623 <            sp = s;
623 >            sp = s; // putOrderedInt may encourage more timely write
624 >            // UNSAFE.putOrderedInt(this, spOffset, s);
625              return true;
626          }
627          return false;
# Line 688 | Line 709 | public class ForkJoinWorkerThread extend
709              for (;;) {
710                  ForkJoinWorkerThread v = ws[k & mask];
711                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
712 <                if (v != null && v.base != v.sp) {
713 <                    if (canSteal ||       // ensure active status
714 <                        (canSteal = active = p.tryIncrementActiveCount())) {
715 <                        int b = v.base;   // inline specialized deqTask
716 <                        ForkJoinTask<?>[] q;
717 <                        if (b != v.sp && (q = v.queue) != null) {
718 <                            ForkJoinTask<?> t;
719 <                            int i = (q.length - 1) & b;
720 <                            long u = (i << qShift) + qBase; // raw offset
721 <                            if ((t = q[i]) != null && v.base == b &&
722 <                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
723 <                                currentSteal = t;
724 <                                v.stealHint = poolIndex;
725 <                                v.base = b + 1;
726 <                                seed = r;
727 <                                ++stealCount;
728 <                                return t;
729 <                            }
712 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
713 >                if (v != null && (b = v.base) != v.sp &&
714 >                    (q = v.queue) != null) {
715 >                    int i = (q.length - 1) & b;
716 >                    long u = (i << qShift) + qBase; // raw offset
717 >                    int pid = poolIndex;
718 >                    if ((t = q[i]) != null) {
719 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
720 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
721 >                                                     a = p.runState, a + 1))
722 >                            canSteal = active = true;
723 >                        if (canSteal && v.base == b++ &&
724 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
725 >                            v.base = b;
726 >                            v.stealHint = pid;
727 >                            UNSAFE.putOrderedObject(this,
728 >                                                    currentStealOffset, t);
729 >                            seed = r;
730 >                            ++stealCount;
731 >                            return t;
732                          }
733                      }
734                      j = -n;
# Line 725 | Line 748 | public class ForkJoinWorkerThread extend
748      // Run State management
749  
750      // status check methods used mainly by ForkJoinPool
751 +    final boolean isRunning()     { return runState == 0; }
752      final boolean isTerminating() { return (runState & TERMINATING) != 0; }
753      final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
754      final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
755      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
756  
757      /**
758 <     * Sets state to TERMINATING, also resuming if suspended.
758 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
759 >     * to wake up if currently blocked. Callers must do so if desired.
760       */
761      final void shutdown() {
762          for (;;) {
763              int s = runState;
764 +            if ((s & (TERMINATING|TERMINATED)) != 0)
765 +                break;
766              if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
767                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
768                                               (s & ~SUSPENDED) |
769 <                                             (TRIMMED|TERMINATING))) {
743 <                    LockSupport.unpark(this);
769 >                                             (TRIMMED|TERMINATING)))
770                      break;
745                }
771              }
772              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
773                                                s | TERMINATING))
# Line 751 | Line 776 | public class ForkJoinWorkerThread extend
776      }
777  
778      /**
779 <     * Sets state to TERMINATED. Called only by this thread.
779 >     * Sets state to TERMINATED. Called only by onTermination()
780       */
781      private void setTerminated() {
782          int s;
# Line 761 | Line 786 | public class ForkJoinWorkerThread extend
786      }
787  
788      /**
789 <     * Instrumented version of park used by ForkJoinPool.awaitEvent
790 <     */
766 <    final void doPark() {
767 <        ++parkCount;
768 <        LockSupport.park(this);
769 <    }
770 <
771 <    /**
772 <     * If suspended, tries to set status to unsuspended and unparks.
789 >     * If suspended, tries to set status to unsuspended.
790 >     * Does NOT wake up if blocked.
791       *
792       * @return true if successful
793       */
794 <    final boolean tryResumeSpare() {
795 <        int s = runState;
796 <        if ((s & SUSPENDED) != 0 &&
797 <            UNSAFE.compareAndSwapInt(this, runStateOffset, s,
798 <                                     s & ~SUSPENDED)) {
799 <            LockSupport.unpark(this);
782 <            return true;
794 >    final boolean tryUnsuspend() {
795 >        int s;
796 >        while (((s = runState) & SUSPENDED) != 0) {
797 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
798 >                                         s & ~SUSPENDED))
799 >                return true;
800          }
801          return false;
802      }
803  
804      /**
805 <     * Sets suspended status and blocks as spare until resumed,
806 <     * shutdown, or timed out.
790 <     *
791 <     * @return false if trimmed
805 >     * Sets suspended status and blocks as spare until resumed
806 >     * or shutdown.
807       */
808 <    final boolean suspendAsSpare() {
809 <        for (;;) {               // set suspended unless terminating
808 >    final void suspendAsSpare() {
809 >        for (;;) {                  // set suspended unless terminating
810              int s = runState;
811              if ((s & TERMINATING) != 0) { // must kill
812                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
813                                               s | (TRIMMED | TERMINATING)))
814 <                    return false;
814 >                    return;
815              }
816              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
817                                                s | SUSPENDED))
818                  break;
819          }
820 <        boolean timed;
821 <        long nanos;
807 <        long startTime;
808 <        if (poolIndex < pool.parallelism) {
809 <            timed = false;
810 <            nanos = 0L;
811 <            startTime = 0L;
812 <        }
813 <        else {
814 <            timed = true;
815 <            nanos = SPARE_KEEPALIVE_NANOS;
816 <            startTime = System.nanoTime();
817 <        }
818 <        pool.accumulateStealCount(this);
819 <        lastEventCount = 0;      // reset upon resume
820 <        interrupted();           // clear/ignore interrupts
820 >        ForkJoinPool p = pool;
821 >        p.pushSpare(this);
822          while ((runState & SUSPENDED) != 0) {
823 <            ++parkCount;
824 <            if (!timed)
823 >            if (p.tryAccumulateStealCount(this)) {
824 >                interrupted();          // clear/ignore interrupts
825 >                if ((runState & SUSPENDED) == 0)
826 >                    break;
827                  LockSupport.park(this);
825            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
826                LockSupport.parkNanos(this, nanos);
827            else { // try to trim on timeout
828                int s = runState;
829                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
830                                             (s & ~SUSPENDED) |
831                                             (TRIMMED|TERMINATING)))
832                    return false;
828              }
829          }
835        return true;
830      }
831  
832      // Misc support methods for ForkJoinPool
# Line 842 | Line 836 | public class ForkJoinWorkerThread extend
836       * used by ForkJoinTask.
837       */
838      final int getQueueSize() {
839 <        return -base + sp;
839 >        int n; // external calls must read base first
840 >        return (n = -base + sp) <= 0 ? 0 : n;
841      }
842  
843      /**
# Line 850 | Line 845 | public class ForkJoinWorkerThread extend
845       * thread.
846       */
847      final void cancelTasks() {
848 <        ForkJoinTask<?> cj = currentJoin; // try to kill live tasks
848 >        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
849          if (cj != null) {
850              currentJoin = null;
851              cj.cancelIgnoringExceptions();
852 +            try {
853 +                this.interrupt(); // awaken wait
854 +            } catch (SecurityException ignore) {
855 +            }
856          }
857          ForkJoinTask<?> cs = currentSteal;
858          if (cs != null) {
# Line 892 | Line 891 | public class ForkJoinWorkerThread extend
891       * @return a task, if available
892       */
893      final ForkJoinTask<?> pollLocalTask() {
894 +        ForkJoinPool p = pool;
895          while (sp != base) {
896 <            if (active || (active = pool.tryIncrementActiveCount()))
896 >            int a; // inline p.tryIncrementActiveCount
897 >            if (active ||
898 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
899 >                                                   a = p.runState, a + 1)))
900                  return locallyFifo? locallyDeqTask() : popTask();
901          }
902          return null;
# Line 905 | Line 908 | public class ForkJoinWorkerThread extend
908       * @return a task, if available
909       */
910      final ForkJoinTask<?> pollTask() {
911 <        ForkJoinTask<?> t;
912 <        return (t = pollLocalTask()) != null ? t : scan();
911 >        ForkJoinTask<?> t = pollLocalTask();
912 >        if (t == null) {
913 >            t = scan();
914 >            // cannot retain/track/help steal
915 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
916 >        }
917 >        return t;
918      }
919  
920      /**
921       * Possibly runs some tasks and/or blocks, until task is done.
914     * The main body is basically a big spinloop, alternating between
915     * calls to helpJoinTask and pool.tryAwaitJoin with increased
916     * patience parameters until either the task is done without
917     * waiting, or we have, if necessary, created or resumed a
918     * replacement for this thread while it blocks.
922       *
923       * @param joinMe the task to join
921     * @return task status on exit
924       */
925 <    final int joinTask(ForkJoinTask<?> joinMe) {
926 <        int stat;
925 >    final void joinTask(ForkJoinTask<?> joinMe) {
926 >        // currentJoin only written by this thread; only need ordered store
927          ForkJoinTask<?> prevJoin = currentJoin;
928 <        currentJoin = joinMe;
929 <        if ((stat = joinMe.status) >= 0 &&
930 <            (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
931 <            ForkJoinPool p = pool;
932 <            int helpRetries = 2;     // initial patience values
933 <            int awaitRetries = -1;   // -1 is sentinel for replace-check only
932 <            do {
933 <                helpJoinTask(joinMe, helpRetries);
934 <                if ((stat = joinMe.status) < 0)
935 <                    break;
936 <                boolean busy = p.tryAwaitJoin(joinMe, awaitRetries);
937 <                if ((stat = joinMe.status) < 0)
938 <                    break;
939 <                if (awaitRetries == -1)
940 <                    awaitRetries = 0;
941 <                else if (busy)
942 <                    ++awaitRetries;
943 <                if (helpRetries < p.parallelism)
944 <                    helpRetries <<= 1;
945 <                Thread.yield(); // tame unbounded loop
946 <            } while (joinMe.status >= 0);
947 <        }
948 <        currentJoin = prevJoin;
949 <        return stat;
928 >        UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
929 >        if (sp != base)
930 >            localHelpJoinTask(joinMe);
931 >        if (joinMe.status >= 0)
932 >            pool.awaitJoin(joinMe, this);
933 >        UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
934      }
935  
936      /**
937       * Run tasks in local queue until given task is done.
938       *
939       * @param joinMe the task to join
956     * @return task status on exit
940       */
941 <    private int localHelpJoinTask(ForkJoinTask<?> joinMe) {
942 <        int stat, s;
941 >    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
942 >        int s;
943          ForkJoinTask<?>[] q;
944 <        while ((stat = joinMe.status) >= 0 &&
962 <               base != (s = sp) && (q = queue) != null) {
963 <            ForkJoinTask<?> t;
944 >        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
945              int i = (q.length - 1) & --s;
946              long u = (i << qShift) + qBase; // raw offset
947 <            if ((t = q[i]) != null &&
948 <                UNSAFE.compareAndSwapObject(q, u, t, null)) {
947 >            ForkJoinTask<?> t = q[i];
948 >            if (t == null)  // lost to a stealer
949 >                break;
950 >            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
951                  /*
952                   * This recheck (and similarly in helpJoinTask)
953                   * handles cases where joinMe is independently
# Line 972 | Line 955 | public class ForkJoinWorkerThread extend
955                   * available. Back out of the pop by putting t back
956                   * into slot before we commit by writing sp.
957                   */
958 <                if ((stat = joinMe.status) < 0) {
958 >                if (joinMe.status < 0) {
959                      UNSAFE.putObjectVolatile(q, u, t);
960                      break;
961                  }
962                  sp = s;
963 <                t.tryExec();
963 >                // UNSAFE.putOrderedInt(this, spOffset, s);
964 >                t.quietlyExec();
965              }
966          }
983        return stat;
967      }
968  
969      /**
970 <     * Tries to locate and help perform tasks for a stealer of the
971 <     * given task, or in turn one of its stealers.  Traces
972 <     * currentSteal->currentJoin links looking for a thread working on
973 <     * a descendant of the given task and with a non-empty queue to
974 <     * steal back and execute tasks from. Restarts search upon
975 <     * encountering chains that are stale, unknown, or of length
976 <     * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles.
977 <     *
978 <     * The implementation is very branchy to cope with the restart
979 <     * cases.  Returns void, not task status (which must be reread by
980 <     * caller anyway) to slightly simplify control paths.
970 >     * Unless terminating, tries to locate and help perform tasks for
971 >     * a stealer of the given task, or in turn one of its stealers.
972 >     * Traces currentSteal->currentJoin links looking for a thread
973 >     * working on a descendant of the given task and with a non-empty
974 >     * queue to steal back and execute tasks from.
975 >     *
976 >     * The implementation is very branchy to cope with potential
977 >     * inconsistencies or loops encountering chains that are stale,
978 >     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
979 >     * of these cases are dealt with by just returning back to the
980 >     * caller, who is expected to retry if other join mechanisms also
981 >     * don't work out.
982       *
983       * @param joinMe the task to join
984       */
985 <    final void helpJoinTask(ForkJoinTask<?> joinMe, int retries) {
986 <        ForkJoinWorkerThread[] ws = pool.workers;
985 >    final void helpJoinTask(ForkJoinTask<?> joinMe) {
986 >        ForkJoinWorkerThread[] ws;
987          int n;
988 <        if (ws == null || (n = ws.length) <= 1)
989 <            return;                   // need at least 2 workers
990 <
991 <        restart:while (joinMe.status >= 0 && --retries >= 0) {
992 <            ForkJoinTask<?> task = joinMe;        // base of chain
993 <            ForkJoinWorkerThread thread = this;   // thread with stolen task
994 <            for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
995 <                // Try to find v, the stealer of task, by first using hint
996 <                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
997 <                if (v == null || v.currentSteal != task) {
998 <                    for (int j = 0; ; ++j) {      // search array
999 <                        if (task.status < 0 || j == n)
1000 <                            continue restart;     // stale or no stealer
1001 <                        if ((v = ws[j]) != null && v.currentSteal == task) {
1002 <                            thread.stealHint = j; // save for next time
1003 <                            break;
1004 <                        }
1005 <                    }
1006 <                }
1007 <                // Try to help v, using specialized form of deqTask
1008 <                int b;
1009 <                ForkJoinTask<?>[] q;
1010 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1011 <                    int i = (q.length - 1) & b;
1012 <                    long u = (i << qShift) + qBase;
1029 <                    ForkJoinTask<?> t = q[i];
1030 <                    if (task.status < 0)          // stale
1031 <                        continue restart;
1032 <                    if (v.base == b) {            // recheck after reading t
1033 <                        if (t == null)            // producer stalled
1034 <                            continue restart;     // retry via restart
1035 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1036 <                            if (joinMe.status < 0) {
1037 <                                UNSAFE.putObjectVolatile(q, u, t);
1038 <                                return;           // back out on cancel
988 >        if (joinMe.status < 0)                // already done
989 >            return;
990 >        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
991 >            joinMe.cancelIgnoringExceptions();
992 >            return;
993 >        }
994 >        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
995 >            return;                           // need at least 2 workers
996 >
997 >        ForkJoinTask<?> task = joinMe;        // base of chain
998 >        ForkJoinWorkerThread thread = this;   // thread with stolen task
999 >        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1000 >            // Try to find v, the stealer of task, by first using hint
1001 >            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1002 >            if (v == null || v.currentSteal != task) {
1003 >                for (int j = 0; ; ++j) {      // search array
1004 >                    if (j < n) {
1005 >                        ForkJoinTask<?> vs;
1006 >                        if ((v = ws[j]) != null &&
1007 >                            (vs = v.currentSteal) != null) {
1008 >                            if (joinMe.status < 0 || task.status < 0)
1009 >                                return;       // stale or done
1010 >                            if (vs == task) {
1011 >                                thread.stealHint = j;
1012 >                                break;        // save hint for next time
1013                              }
1040                            ForkJoinTask<?> prevSteal = currentSteal;
1041                            currentSteal = t;
1042                            v.stealHint = poolIndex;
1043                            v.base = b + 1;
1044                            t.tryExec();
1045                            currentSteal = prevSteal;
1014                          }
1015                      }
1016 <                    if (joinMe.status < 0)
1017 <                        return;
1016 >                    else
1017 >                        return;               // no stealer
1018                  }
1019 <                // Try to descend to find v's stealer
1020 <                ForkJoinTask<?> next = v.currentJoin;
1053 <                if (next == null || task.status < 0)
1054 <                    continue restart;             // no descendent or stale
1019 >            }
1020 >            for (;;) { // Try to help v, using specialized form of deqTask
1021                  if (joinMe.status < 0)
1022                      return;
1023 <                task = next;
1024 <                thread = v;
1023 >                int b = v.base;
1024 >                ForkJoinTask<?>[] q = v.queue;
1025 >                if (b == v.sp || q == null)
1026 >                    break;
1027 >                int i = (q.length - 1) & b;
1028 >                long u = (i << qShift) + qBase;
1029 >                ForkJoinTask<?> t = q[i];
1030 >                int pid = poolIndex;
1031 >                ForkJoinTask<?> ps = currentSteal;
1032 >                if (task.status < 0)
1033 >                    return;                   // stale or done
1034 >                if (t != null && v.base == b++ &&
1035 >                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1036 >                    if (joinMe.status < 0) {
1037 >                        UNSAFE.putObjectVolatile(q, u, t);
1038 >                        return;               // back out on cancel
1039 >                    }
1040 >                    v.base = b;
1041 >                    v.stealHint = pid;
1042 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1043 >                    t.quietlyExec();
1044 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1045 >                }
1046              }
1047 +            // Try to descend to find v's stealer
1048 +            ForkJoinTask<?> next = v.currentJoin;
1049 +            if (task.status < 0 || next == null || next == task ||
1050 +                joinMe.status < 0)
1051 +                return;
1052 +            task = next;
1053 +            thread = v;
1054          }
1055      }
1056  
# Line 1115 | Line 1109 | public class ForkJoinWorkerThread extend
1109       * Runs tasks until {@code pool.isQuiescent()}.
1110       */
1111      final void helpQuiescePool() {
1112 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1113          for (;;) {
1114              ForkJoinTask<?> t = pollLocalTask();
1115 <            if (t != null || (t = scan()) != null) {
1116 <                t.tryExec();
1122 <                currentSteal = null;
1123 <            }
1115 >            if (t != null || (t = scan()) != null)
1116 >                t.quietlyExec();
1117              else {
1118                  ForkJoinPool p = pool;
1119 +                int a; // to inline CASes
1120                  if (active) {
1121 +                    if (!UNSAFE.compareAndSwapInt
1122 +                        (p, poolRunStateOffset, a = p.runState, a - 1))
1123 +                        continue;   // retry later
1124                      active = false; // inactivate
1125 <                    do {} while (!p.tryDecrementActiveCount());
1125 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1126                  }
1127                  if (p.isQuiescent()) {
1128                      active = true; // re-activate
1129 <                    do {} while (!p.tryIncrementActiveCount());
1129 >                    do {} while (!UNSAFE.compareAndSwapInt
1130 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1131                      return;
1132                  }
1133              }
# Line 1139 | Line 1137 | public class ForkJoinWorkerThread extend
1137      // Unsafe mechanics
1138  
1139      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1140 +    private static final long spOffset =
1141 +        objectFieldOffset("sp", ForkJoinWorkerThread.class);
1142      private static final long runStateOffset =
1143          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1144 +    private static final long currentJoinOffset =
1145 +        objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1146 +    private static final long currentStealOffset =
1147 +        objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1148      private static final long qBase =
1149          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1150 <    private static final long threadStatusOffset =
1151 <        objectFieldOffset("threadStatus", Thread.class);
1150 >    private static final long poolRunStateOffset = // to inline CAS
1151 >        objectFieldOffset("runState", ForkJoinPool.class);
1152 >
1153      private static final int qShift;
1154  
1155      static {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines