ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.32 by dl, Sun Apr 18 12:51:18 2010 UTC vs.
Revision 1.36 by dl, Fri Jul 23 13:07:43 2010 UTC

# Line 83 | Line 83 | public class ForkJoinWorkerThread extend
83       * by the ForkJoinPool).  This allows use in message-passing
84       * frameworks in which tasks are never joined.
85       *
86 <     * Efficient implementation of this approach currently relies on
87 <     * an uncomfortable amount of "Unsafe" mechanics. To maintain
86 >     * When a worker would otherwise be blocked waiting to join a
87 >     * task, it first tries a form of linear helping: Each worker
88 >     * records (in field currentSteal) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field currentJoin)
90 >     * the task it is currently actively joining. Method joinTask uses
91 >     * these markers to try to find a worker to help (i.e., steal back
92 >     * a task from and execute it) that could hasten completion of the
93 >     * actively joined task. In essence, the joiner executes a task
94 >     * that would be on its own local deque had the to-be-joined task
95 >     * not been stolen. This may be seen as a conservative variant of
96 >     * the approach in Wagner & Calder "Leapfrogging: a portable
97 >     * technique for implementing efficient futures" SIGPLAN Notices,
98 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 >     * in that: (1) We only maintain dependency links across workers
100 >     * upon steals, rather than maintain per-task bookkeeping.  This
101 >     * may require a linear scan of workers array to locate stealers,
102 >     * but usually doesn't because stealers leave hints (that may
103 >     * become stale/wrong) of where to locate the kathem. This
104 >     * isolates cost to when it is needed, rather than adding to
105 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
106 >     * potentially cyclic mutual steals.  (3) It is intentionally
107 >     * racy: field currentJoin is updated only while actively joining,
108 >     * which means that we could miss links in the chain during
109 >     * long-lived tasks, GC stalls etc.  (4) We bound the number of
110 >     * attempts to find work (see MAX_HELP_DEPTH) and fall back to
111 >     * suspending the worker and if necessary replacing it with a
112 >     * spare (see ForkJoinPool.tryAwaitJoin).
113 >     *
114 >     * Efficient implementation of these algorithms currently relies
115 >     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
116       * correct orderings, reads and writes of variable base require
117       * volatile ordering.  Variable sp does not require volatile
118       * writes but still needs store-ordering, which we accomplish by
119       * pre-incrementing sp before filling the slot with an ordered
120       * store.  (Pre-incrementing also enables backouts used in
121 <     * scanWhileJoining.)  Because they are protected by volatile base
122 <     * reads, reads of the queue array and its slots by other threads
123 <     * do not need volatile load semantics, but writes (in push)
124 <     * require store order and CASes (in pop and deq) require
125 <     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
126 <     * algorithm has similar properties, but without support for
127 <     * nulling slots.)  Since these combinations aren't supported
128 <     * using ordinary volatiles, the only way to accomplish these
129 <     * efficiently is to use direct Unsafe calls. (Using external
130 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
131 <     * array is significantly slower because of memory locality and
132 <     * indirection effects.)
121 >     * joinTask.)  Because they are protected by volatile base reads,
122 >     * reads of the queue array and its slots by other threads do not
123 >     * need volatile load semantics, but writes (in push) require
124 >     * store order and CASes (in pop and deq) require (volatile) CAS
125 >     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
126 >     * similar properties, but without support for nulling slots.)
127 >     * Since these combinations aren't supported using ordinary
128 >     * volatiles, the only way to accomplish these efficiently is to
129 >     * use direct Unsafe calls. (Using external AtomicIntegers and
130 >     * AtomicReferenceArrays for the indices and array is
131 >     * significantly slower because of memory locality and indirection
132 >     * effects.)
133       *
134       * Further, performance on most platforms is very sensitive to
135       * placement and sizing of the (resizable) queue array.  Even
# Line 137 | Line 165 | public class ForkJoinWorkerThread extend
165          5L * 1000L * 1000L * 1000L; // 5 secs
166  
167      /**
168 +     * The maximum stolen->joining link depth allowed in helpJoinTask.
169 +     * Depths for legitimate chains are unbounded, but we use a fixed
170 +     * constant to avoid (otherwise unchecked) cycles and bound
171 +     * staleness of traversal parameters at the expense of sometimes
172 +     * blocking when we could be helping.
173 +     */
174 +    private static final int MAX_HELP_DEPTH = 8;
175 +
176 +    /**
177       * Capacity of work-stealing queue array upon initialization.
178 <     * Must be a power of two. Initial size must be at least 2, but is
178 >     * Must be a power of two. Initial size must be at least 4, but is
179       * padded to minimize cache effects.
180       */
181      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
# Line 161 | Line 198 | public class ForkJoinWorkerThread extend
198       * Initialized in onStart, to improve memory locality.
199       */
200      private ForkJoinTask<?>[] queue;
201 <
201 >    
202      /**
203       * Index (mod queue.length) of least valid queue slot, which is
204       * always the next position to steal from if nonempty.
# Line 178 | Line 215 | public class ForkJoinWorkerThread extend
215      private int sp;
216  
217      /**
218 +     * The index of most recent stealer, used as a hint to avoid
219 +     * traversal in method helpJoinTask. This is only a hint because a
220 +     * worker might have had multiple steals and this only holds one
221 +     * of them (usually the most current). Declared non-volatile,
222 +     * relying on other prevailing sync to keep reasonably current.
223 +     */
224 +    private int stealHint;
225 +
226 +    /**
227       * Run state of this worker. In addition to the usual run levels,
228       * tracks if this worker is suspended as a spare, and if it was
229       * killed (trimmed) while suspended. However, "active" status is
# Line 196 | Line 242 | public class ForkJoinWorkerThread extend
242       * currently not exported but included because volatile write upon
243       * park also provides a workaround for a JVM bug.
244       */
245 <    private volatile int parkCount;
245 >    volatile int parkCount;
246  
247      /**
248       * Number of steals, transferred and reset in pool callbacks pool
# Line 210 | Line 256 | public class ForkJoinWorkerThread extend
256       */
257      private int seed;
258  
259 +
260      /**
261       * Activity status. When true, this worker is considered active.
262       * Accessed directly by pool.  Must be false upon construction.
# Line 221 | Line 268 | public class ForkJoinWorkerThread extend
268       * Shadows value from ForkJoinPool, which resets it if changed
269       * pool-wide.
270       */
271 <    private boolean locallyFifo;
272 <
271 >    private final boolean locallyFifo;
272 >    
273      /**
274       * Index of this worker in pool array. Set once by pool before
275       * running, and accessed directly by pool to locate this worker in
# Line 243 | Line 290 | public class ForkJoinWorkerThread extend
290      volatile long nextWaiter;
291  
292      /**
293 +     * The task currently being joined, set only when actively trying
294 +     * to helpStealer. Written only by current thread, but read by
295 +     * others.
296 +     */
297 +    private volatile ForkJoinTask<?> currentJoin;
298 +    
299 +    /**
300 +     * The task most recently stolen from another worker (or
301 +     * submission queue).  Not volatile because always read/written in
302 +     * presence of related volatiles in those cases where it matters.
303 +     */
304 +    private ForkJoinTask<?> currentSteal;
305 +
306 +    /**
307       * Creates a ForkJoinWorkerThread operating in the given pool.
308       *
309       * @param pool the pool this thread works in
310       * @throws NullPointerException if pool is null
311       */
312      protected ForkJoinWorkerThread(ForkJoinPool pool) {
252        if (pool == null) throw new NullPointerException();
313          this.pool = pool;
314 +        this.locallyFifo = pool.locallyFifo;
315          // To avoid exposing construction details to subclasses,
316          // remaining initialization is in start() and onStart()
317      }
# Line 258 | Line 319 | public class ForkJoinWorkerThread extend
319      /**
320       * Performs additional initialization and starts this thread
321       */
322 <    final void start(int poolIndex, boolean locallyFifo,
262 <                     UncaughtExceptionHandler ueh) {
322 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
323          this.poolIndex = poolIndex;
264        this.locallyFifo = locallyFifo;
324          if (ueh != null)
325              setUncaughtExceptionHandler(ueh);
326          setDaemon(true);
# Line 305 | Line 364 | public class ForkJoinWorkerThread extend
364          int rs = seedGenerator.nextInt();
365          seed = rs == 0? 1 : rs; // seed must be nonzero
366  
367 <        // Allocate name string and queue array in this thread
367 >        // Allocate name string and arrays in this thread
368          String pid = Integer.toString(pool.getPoolNumber());
369          String wid = Integer.toString(poolIndex);
370          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 358 | Line 417 | public class ForkJoinWorkerThread extend
417       * Find and execute tasks and check status while running
418       */
419      private void mainLoop() {
420 <        boolean ran = false; // true if ran task on previous step
420 >        int emptyScans = 0; // consecutive times failed to find work
421          ForkJoinPool p = pool;
422          for (;;) {
423 <            p.preStep(this, ran);
423 >            p.preStep(this, emptyScans);
424              if (runState != 0)
425                  return;
426              ForkJoinTask<?> t; // try to get and run stolen or submitted task
427 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
427 >            if ((t = scan()) != null || (t = pollSubmission()) != null) {
428                  t.tryExec();
429                  if (base != sp)
430                      runLocalTasks();
431 +                currentSteal = null;
432 +                emptyScans = 0;
433              }
434 +            else
435 +                ++emptyScans;
436          }
437      }
438  
# Line 397 | Line 460 | public class ForkJoinWorkerThread extend
460          while (p.hasQueuedSubmissions()) {
461              if (active || (active = p.tryIncrementActiveCount())) {
462                  ForkJoinTask<?> t = p.pollSubmission();
463 <                return t != null ? t : scan(); // if missed, rescan
463 >                if (t != null) {
464 >                    currentSteal = t;
465 >                    return t;
466 >                }
467 >                return scan(); // if missed, rescan
468              }
469          }
470          return null;
# Line 447 | Line 514 | public class ForkJoinWorkerThread extend
514       * @param t the task. Caller must ensure non-null.
515       */
516      final void pushTask(ForkJoinTask<?> t) {
450        int s;
517          ForkJoinTask<?>[] q = queue;
518          int mask = q.length - 1; // implicit assert q != null
519 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
520 <        if ((s -= base) <= 0)
521 <            pool.signalWork();
522 <        else if (s + 1 >= mask)
523 <            growQueue();
519 >        int s = sp++;            // ok to increment sp before slot write
520 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
521 >        if ((s -= base) == 0)
522 >            pool.signalWork();   // was empty
523 >        else if (s == mask)
524 >            growQueue();         // is full
525      }
526  
527      /**
528       * Tries to take a task from the base of the queue, failing if
529       * empty or contended. Note: Specializations of this code appear
530 <     * in scan and scanWhileJoining.
530 >     * in locallyDeqTask and elsewhere.
531       *
532       * @return a task, or null if none or contended
533       */
# Line 470 | Line 537 | public class ForkJoinWorkerThread extend
537          int b, i;
538          if ((b = base) != sp &&
539              (q = queue) != null && // must read q after b
540 <            (t = q[i = (q.length - 1) & b]) != null &&
540 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
541              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
542              base = b + 1;
543              return t;
# Line 490 | Line 557 | public class ForkJoinWorkerThread extend
557              ForkJoinTask<?> t;
558              int b, i;
559              while (sp != (b = base)) {
560 <                if ((t = q[i = (q.length - 1) & b]) != null &&
560 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
561                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
562                                                  t, null)) {
563                      base = b + 1;
# Line 503 | Line 570 | public class ForkJoinWorkerThread extend
570  
571      /**
572       * Returns a popped task, or null if empty. Assumes active status.
573 <     * Called only by current thread. (Note: a specialization of this
507 <     * code appears in scanWhileJoining.)
573 >     * Called only by current thread.
574       */
575      final ForkJoinTask<?> popTask() {
576          int s;
577 <        ForkJoinTask<?>[] q = queue;
578 <        if (q != null && (s = sp) != base) {
577 >        ForkJoinTask<?>[] q;
578 >        if (base != (s = sp) && (q = queue) != null) {
579              int i = (q.length - 1) & --s;
580              ForkJoinTask<?> t = q[i];
581              if (t != null && UNSAFE.compareAndSwapObject
# Line 522 | Line 588 | public class ForkJoinWorkerThread extend
588      }
589  
590      /**
591 <     * Specialized version of popTask to pop only if
592 <     * topmost element is the given task. Called only
593 <     * by current thread while active.
591 >     * Specialized version of popTask to pop only if topmost element
592 >     * is the given task. Called only by current thread while
593 >     * active.
594       *
595       * @param t the task. Caller must ensure non-null.
596       */
597      final boolean unpushTask(ForkJoinTask<?> t) {
598          int s;
599 <        ForkJoinTask<?>[] q = queue;
600 <        if (q != null && UNSAFE.compareAndSwapObject
601 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
599 >        ForkJoinTask<?>[] q;
600 >        if (base != (s = sp) && (q = queue) != null &&
601 >            UNSAFE.compareAndSwapObject
602 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
603              sp = s;
604              return true;
605          }
# Line 610 | Line 677 | public class ForkJoinWorkerThread extend
677       */
678      private ForkJoinTask<?> scan() {
679          ForkJoinPool p = pool;
680 <        ForkJoinWorkerThread[] ws = p.workers;
681 <        int n = ws.length;            // upper bound of #workers
682 <        boolean canSteal = active;    // shadow active status
683 <        int r = seed;                 // extract seed once
684 <        int k = r;                    // index: random if j<0 else step
685 <        for (int j = -n; j < n; ++j) {
686 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
687 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
688 <            if (v != null && v.base != v.sp) {
689 <                if (canSteal ||       // ensure active status
690 <                    (canSteal = active = p.tryIncrementActiveCount())) {
691 <                    int b, i;         // inlined specialization of deqTask
692 <                    ForkJoinTask<?> t;
693 <                    ForkJoinTask<?>[] q;
694 <                    if ((b = v.base) != v.sp &&  // recheck
695 <                        (q = v.queue) != null &&
696 <                        (t = q[i = (q.length - 1) & b]) != null &&
697 <                        UNSAFE.compareAndSwapObject
698 <                        (q, (i << qShift) + qBase, t, null)) {
699 <                        v.base = b + 1;
700 <                        seed = r;
701 <                        ++stealCount;
702 <                        return t;
680 >        ForkJoinWorkerThread[] ws;        // worker array
681 >        int n;                            // upper bound of #workers
682 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
683 >            boolean canSteal = active;    // shadow active status
684 >            int r = seed;                 // extract seed once
685 >            int mask = n - 1;
686 >            int j = -n;                   // loop counter
687 >            int k = r;                    // worker index, random if j < 0
688 >            for (;;) {
689 >                ForkJoinWorkerThread v = ws[k & mask];
690 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
691 >                if (v != null && v.base != v.sp) {
692 >                    if (canSteal ||       // ensure active status
693 >                        (canSteal = active = p.tryIncrementActiveCount())) {
694 >                        int b = v.base;   // inline specialized deqTask
695 >                        ForkJoinTask<?>[] q;
696 >                        if (b != v.sp && (q = v.queue) != null) {
697 >                            ForkJoinTask<?> t;
698 >                            int i = (q.length - 1) & b;
699 >                            long u = (i << qShift) + qBase; // raw offset
700 >                            if ((t = q[i]) != null && v.base == b &&
701 >                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
702 >                                currentSteal = t;
703 >                                v.stealHint = poolIndex;
704 >                                v.base = b + 1;
705 >                                seed = r;
706 >                                ++stealCount;
707 >                                return t;
708 >                            }
709 >                        }
710                      }
711 +                    j = -n;
712 +                    k = r;                // restart on contention
713                  }
714 <                j = -n;           // reset on contention
714 >                else if (++j <= 0)
715 >                    k = r;
716 >                else if (j <= n)
717 >                    k += (n >>> 1) | 1;
718 >                else
719 >                    break;
720              }
640            k = j >= 0? k + ((n >>> 1) | 1) : r;
721          }
722          return null;
723      }
# Line 681 | Line 761 | public class ForkJoinWorkerThread extend
761      }
762  
763      /**
764 <     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
764 >     * Instrumented version of park used by ForkJoinPool.awaitEvent
765       */
766      final void doPark() {
767          ++parkCount;
# Line 689 | Line 769 | public class ForkJoinWorkerThread extend
769      }
770  
771      /**
772 <     * If suspended, tries to set status to unsuspended.
693 <     * Caller must unpark to actually resume
772 >     * If suspended, tries to set status to unsuspended and unparks.
773       *
774       * @return true if successful
775       */
776 <    final boolean tryUnsuspend() {
777 <        int s;
778 <        return (((s = runState) & SUSPENDED) != 0 &&
779 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
780 <                                         s & ~SUSPENDED));
776 >    final boolean tryResumeSpare() {
777 >        int s = runState;
778 >        if ((s & SUSPENDED) != 0 &&
779 >            UNSAFE.compareAndSwapInt(this, runStateOffset, s,
780 >                                     s & ~SUSPENDED)) {
781 >            LockSupport.unpark(this);
782 >            return true;
783 >        }
784 >        return false;
785      }
786  
787      /**
# Line 719 | Line 802 | public class ForkJoinWorkerThread extend
802                                                s | SUSPENDED))
803                  break;
804          }
805 +        boolean timed;
806 +        long nanos;
807 +        long startTime;
808 +        if (poolIndex < pool.parallelism) {
809 +            timed = false;
810 +            nanos = 0L;
811 +            startTime = 0L;
812 +        }
813 +        else {
814 +            timed = true;
815 +            nanos = SPARE_KEEPALIVE_NANOS;
816 +            startTime = System.nanoTime();
817 +        }
818 +        pool.accumulateStealCount(this);
819          lastEventCount = 0;      // reset upon resume
723        ForkJoinPool p = pool;
724        p.releaseWaiters();      // help others progress
725        p.accumulateStealCount(this);
820          interrupted();           // clear/ignore interrupts
727        if (poolIndex < p.getParallelism()) { // untimed wait
728            while ((runState & SUSPENDED) != 0)
729                doPark();
730            return true;
731        }
732        return timedSuspend();   // timed wait if apparently non-core
733    }
734
735    /**
736     * Blocks as spare until resumed or timed out
737     * @return false if trimmed
738     */
739    private boolean timedSuspend() {
740        long nanos = SPARE_KEEPALIVE_NANOS;
741        long startTime = System.nanoTime();
821          while ((runState & SUSPENDED) != 0) {
822              ++parkCount;
823 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
823 >            if (!timed)
824 >                LockSupport.park(this);
825 >            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
826                  LockSupport.parkNanos(this, nanos);
827              else { // try to trim on timeout
828                  int s = runState;
# Line 765 | Line 846 | public class ForkJoinWorkerThread extend
846      }
847  
848      /**
768     * Set locallyFifo mode. Called only by ForkJoinPool
769     */
770    final void setAsyncMode(boolean async) {
771        locallyFifo = async;
772    }
773
774    /**
849       * Removes and cancels all tasks in queue.  Can be called from any
850       * thread.
851       */
852      final void cancelTasks() {
853 +        ForkJoinTask<?> cj = currentJoin; // try to kill live tasks
854 +        if (cj != null) {
855 +            currentJoin = null;
856 +            cj.cancelIgnoringExceptions();
857 +        }
858 +        ForkJoinTask<?> cs = currentSteal;
859 +        if (cs != null) {
860 +            currentSteal = null;
861 +            cs.cancelIgnoringExceptions();
862 +        }
863          while (base != sp) {
864              ForkJoinTask<?> t = deqTask();
865              if (t != null)
# Line 803 | Line 887 | public class ForkJoinWorkerThread extend
887      // Support methods for ForkJoinTask
888  
889      /**
890 +     * Gets and removes a local task.
891 +     *
892 +     * @return a task, if available
893 +     */
894 +    final ForkJoinTask<?> pollLocalTask() {
895 +        while (sp != base) {
896 +            if (active || (active = pool.tryIncrementActiveCount()))
897 +                return locallyFifo? locallyDeqTask() : popTask();
898 +        }
899 +        return null;
900 +    }
901 +
902 +    /**
903 +     * Gets and removes a local or stolen task.
904 +     *
905 +     * @return a task, if available
906 +     */
907 +    final ForkJoinTask<?> pollTask() {
908 +        ForkJoinTask<?> t;
909 +        return (t = pollLocalTask()) != null ? t : scan();
910 +    }
911 +
912 +    /**
913 +     * Possibly runs some tasks and/or blocks, until task is done.
914 +     * The main body is basically a big spinloop, alternating between
915 +     * calls to helpJoinTask and pool.tryAwaitJoin with increased
916 +     * patience parameters until either the task is done without
917 +     * waiting, or we have, if necessary, created or resumed a
918 +     * replacement for this thread while it blocks.
919 +     *
920 +     * @param joinMe the task to join
921 +     * @return task status on exit
922 +     */
923 +    final int joinTask(ForkJoinTask<?> joinMe) {
924 +        int stat;
925 +        ForkJoinTask<?> prevJoin = currentJoin;
926 +        currentJoin = joinMe;
927 +        if ((stat = joinMe.status) >= 0 &&
928 +            (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
929 +            ForkJoinPool p = pool;
930 +            int helpRetries = 2;     // initial patience values
931 +            int awaitRetries = -1;   // -1 is sentinel for replace-check only
932 +            do {
933 +                helpJoinTask(joinMe, helpRetries);
934 +                if ((stat = joinMe.status) < 0)
935 +                    break;
936 +                boolean busy = p.tryAwaitJoin(joinMe, awaitRetries);
937 +                if ((stat = joinMe.status) < 0)
938 +                    break;
939 +                if (awaitRetries == -1)
940 +                    awaitRetries = 0;
941 +                else if (busy)
942 +                    ++awaitRetries;
943 +                if (helpRetries < p.parallelism)
944 +                    helpRetries <<= 1;
945 +                Thread.yield(); // tame unbounded loop
946 +            } while (joinMe.status >= 0);
947 +        }
948 +        currentJoin = prevJoin;
949 +        return stat;
950 +    }
951 +
952 +    /**
953 +     * Run tasks in local queue until given task is done.
954 +     *
955 +     * @param joinMe the task to join
956 +     * @return task status on exit
957 +     */
958 +    private int localHelpJoinTask(ForkJoinTask<?> joinMe) {
959 +        int stat, s;
960 +        ForkJoinTask<?>[] q;
961 +        while ((stat = joinMe.status) >= 0 &&
962 +               base != (s = sp) && (q = queue) != null) {
963 +            ForkJoinTask<?> t;
964 +            int i = (q.length - 1) & --s;
965 +            long u = (i << qShift) + qBase; // raw offset
966 +            if ((t = q[i]) != null &&
967 +                UNSAFE.compareAndSwapObject(q, u, t, null)) {
968 +                /*
969 +                 * This recheck (and similarly in helpJoinTask)
970 +                 * handles cases where joinMe is independently
971 +                 * cancelled or forced even though there is other work
972 +                 * available. Back out of the pop by putting t back
973 +                 * into slot before we commit by writing sp.
974 +                 */
975 +                if ((stat = joinMe.status) < 0) {
976 +                    UNSAFE.putObjectVolatile(q, u, t);
977 +                    break;
978 +                }
979 +                sp = s;
980 +                t.tryExec();
981 +            }
982 +        }
983 +        return stat;
984 +    }
985 +
986 +    /**
987 +     * Tries to locate and help perform tasks for a stealer of the
988 +     * given task, or in turn one of its stealers.  Traces
989 +     * currentSteal->currentJoin links looking for a thread working on
990 +     * a descendant of the given task and with a non-empty queue to
991 +     * steal back and execute tasks from. Restarts search upon
992 +     * encountering chains that are stale, unknown, or of length
993 +     * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles.
994 +     *
995 +     * The implementation is very branchy to cope with the restart
996 +     * cases.  Returns void, not task status (which must be reread by
997 +     * caller anyway) to slightly simplify control paths.
998 +     *
999 +     * @param joinMe the task to join
1000 +     */
1001 +    final void helpJoinTask(ForkJoinTask<?> joinMe, int retries) {
1002 +        ForkJoinWorkerThread[] ws = pool.workers;
1003 +        int n;
1004 +        if (ws == null || (n = ws.length) <= 1)
1005 +            return;                   // need at least 2 workers
1006 +
1007 +        restart:while (joinMe.status >= 0 && --retries >= 0) {
1008 +            ForkJoinTask<?> task = joinMe;        // base of chain
1009 +            ForkJoinWorkerThread thread = this;   // thread with stolen task
1010 +            for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
1011 +                // Try to find v, the stealer of task, by first using hint
1012 +                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1013 +                if (v == null || v.currentSteal != task) {
1014 +                    for (int j = 0; ; ++j) {      // search array
1015 +                        if (task.status < 0 || j == n)
1016 +                            continue restart;     // stale or no stealer
1017 +                        if ((v = ws[j]) != null && v.currentSteal == task) {
1018 +                            thread.stealHint = j; // save for next time
1019 +                            break;
1020 +                        }
1021 +                    }
1022 +                }
1023 +                // Try to help v, using specialized form of deqTask
1024 +                int b;
1025 +                ForkJoinTask<?>[] q;
1026 +                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1027 +                    int i = (q.length - 1) & b;
1028 +                    long u = (i << qShift) + qBase;
1029 +                    ForkJoinTask<?> t = q[i];
1030 +                    if (task.status < 0)          // stale
1031 +                        continue restart;
1032 +                    if (v.base == b) {            // recheck after reading t
1033 +                        if (t == null)            // producer stalled
1034 +                            continue restart;     // retry via restart
1035 +                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1036 +                            if (joinMe.status < 0) {
1037 +                                UNSAFE.putObjectVolatile(q, u, t);
1038 +                                return;           // back out on cancel
1039 +                            }
1040 +                            ForkJoinTask<?> prevSteal = currentSteal;
1041 +                            currentSteal = t;
1042 +                            v.stealHint = poolIndex;
1043 +                            v.base = b + 1;
1044 +                            t.tryExec();
1045 +                            currentSteal = prevSteal;
1046 +                        }
1047 +                    }
1048 +                    if (joinMe.status < 0)
1049 +                        return;
1050 +                }
1051 +                // Try to descend to find v's stealer
1052 +                ForkJoinTask<?> next = v.currentJoin;
1053 +                if (next == null || task.status < 0)
1054 +                    continue restart;             // no descendent or stale
1055 +                if (joinMe.status < 0)
1056 +                    return;
1057 +                task = next;
1058 +                thread = v;
1059 +            }
1060 +        }
1061 +    }
1062 +
1063 +    /**
1064       * Returns an estimate of the number of tasks, offset by a
1065       * function of number of idle workers.
1066       *
# Line 854 | Line 1112 | public class ForkJoinWorkerThread extend
1112      }
1113  
1114      /**
857     * Gets and removes a local task.
858     *
859     * @return a task, if available
860     */
861    final ForkJoinTask<?> pollLocalTask() {
862        while (base != sp) {
863            if (active || (active = pool.tryIncrementActiveCount()))
864                return locallyFifo? locallyDeqTask() : popTask();
865        }
866        return null;
867    }
868
869    /**
870     * Gets and removes a local or stolen task.
871     *
872     * @return a task, if available
873     */
874    final ForkJoinTask<?> pollTask() {
875        ForkJoinTask<?> t;
876        return (t = pollLocalTask()) != null ? t : scan();
877    }
878
879    /**
880     * Returns a popped or stolen task, if available, unless joinMe is done
881     *
882     * This method is intrinsically nonmodular. To maintain the
883     * property that tasks are never stolen if the awaited task is
884     * ready, we must interleave mechanics of scan with status
885     * checks. We rely here on the commit points of deq that allow us
886     * to cancel a steal even after CASing slot to null, but before
887     * adjusting base index: If, after the CAS, we see that joinMe is
888     * ready, we can back out by placing the task back into the slot,
889     * without adjusting index. The scan loop is otherwise the same as
890     * in scan.
891     *
892     */
893    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
894        ForkJoinTask<?> popped; // prefer local tasks
895        if (base != sp && (popped = popWhileJoining(joinMe)) != null)
896            return popped;
897        if (joinMe.status >= 0) {
898            ForkJoinPool p = pool;
899            ForkJoinWorkerThread[] ws = p.workers;
900            int n = ws.length;
901            int r = seed;
902            int k = r;
903            for (int j = -n; j < n && joinMe.status >= 0; ++j) {
904                ForkJoinWorkerThread v = ws[k & (n - 1)];
905                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
906                if (v != null) {
907                    int b = v.base;
908                    ForkJoinTask<?>[] q;
909                    if (b != v.sp && (q = v.queue) != null) {
910                        int i = (q.length - 1) & b;
911                        ForkJoinTask<?> t = q[i];
912                        if (t != null && UNSAFE.compareAndSwapObject
913                            (q, (i << qShift) + qBase, t, null)) {
914                            if (joinMe.status >= 0) {
915                                v.base = b + 1;
916                                seed = r;
917                                ++stealCount;
918                                return t;
919                            }
920                            UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
921                            break; // back out
922                        }
923                        j = -n;
924                    }
925                }
926                k = j >= 0? k + ((n >>> 1) | 1) : r;
927            }
928        }
929        return null;
930    }
931
932    /**
933     * Version of popTask with join checks surrounding extraction.
934     * Uses the same backout strategy as scanWhileJoining. Note that
935     * we ignore locallyFifo flag for local tasks here since helping
936     * joins only make sense in LIFO mode.
937     *
938     * @return a popped task, if available, unless joinMe is done
939     */
940    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
941        int s;
942        ForkJoinTask<?>[] q;
943        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
944            int i = (q.length - 1) & --s;
945            ForkJoinTask<?> t = q[i];
946            if (t != null && UNSAFE.compareAndSwapObject
947                (q, (i << qShift) + qBase, t, null)) {
948                if (joinMe.status >= 0) {
949                    sp = s;
950                    return t;
951                }
952                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
953                break;  // back out
954            }
955        }
956        return null;
957    }
958
959    /**
1115       * Runs tasks until {@code pool.isQuiescent()}.
1116       */
1117      final void helpQuiescePool() {
1118          for (;;) {
1119              ForkJoinTask<?> t = pollLocalTask();
1120 <            if (t != null || (t = scan()) != null)
1120 >            if (t != null || (t = scan()) != null) {
1121                  t.tryExec();
1122 +                currentSteal = null;
1123 +            }
1124              else {
1125                  ForkJoinPool p = pool;
1126                  if (active) {
# Line 986 | Line 1143 | public class ForkJoinWorkerThread extend
1143          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1144      private static final long qBase =
1145          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1146 +    private static final long threadStatusOffset =
1147 +        objectFieldOffset("threadStatus", Thread.class);
1148      private static final int qShift;
1149  
1150      static {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines