ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.33 by dl, Thu May 27 16:46:49 2010 UTC vs.
Revision 1.39 by dl, Sat Jul 24 20:28:18 2010 UTC

# Line 83 | Line 83 | public class ForkJoinWorkerThread extend
83       * by the ForkJoinPool).  This allows use in message-passing
84       * frameworks in which tasks are never joined.
85       *
86 <     * Efficient implementation of this approach currently relies on
87 <     * an uncomfortable amount of "Unsafe" mechanics. To maintain
86 >     * When a worker would otherwise be blocked waiting to join a
87 >     * task, it first tries a form of linear helping: Each worker
88 >     * records (in field currentSteal) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field currentJoin)
90 >     * the task it is currently actively joining. Method joinTask uses
91 >     * these markers to try to find a worker to help (i.e., steal back
92 >     * a task from and execute it) that could hasten completion of the
93 >     * actively joined task. In essence, the joiner executes a task
94 >     * that would be on its own local deque had the to-be-joined task
95 >     * not been stolen. This may be seen as a conservative variant of
96 >     * the approach in Wagner & Calder "Leapfrogging: a portable
97 >     * technique for implementing efficient futures" SIGPLAN Notices,
98 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 >     * in that: (1) We only maintain dependency links across workers
100 >     * upon steals, rather than use per-task bookkeeping.  This may
101 >     * require a linear scan of workers array to locate stealers, but
102 >     * usually doesn't because stealers leave hints (that may become
103 >     * stale/wrong) of where to locate them. This isolates cost to
104 >     * when it is needed, rather than adding to per-task overhead.
105 >     * (2) It is "shallow", ignoring nesting and potentially cyclic
106 >     * mutual steals.  (3) It is intentionally racy: field currentJoin
107 >     * is updated only while actively joining, which means that we
108 >     * miss links in the chain during long-lived tasks, GC stalls etc
109 >     * (which is OK since blocking in such cases is usually a good
110 >     * idea).  (4) We bound the number of attempts to find work (see
111 >     * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112 >     * necessary replacing it with a spare (see
113 >     * ForkJoinPool.tryAwaitJoin).
114 >     *
115 >     * Efficient implementation of these algorithms currently relies
116 >     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
117       * correct orderings, reads and writes of variable base require
118       * volatile ordering.  Variable sp does not require volatile
119       * writes but still needs store-ordering, which we accomplish by
120       * pre-incrementing sp before filling the slot with an ordered
121       * store.  (Pre-incrementing also enables backouts used in
122 <     * scanWhileJoining.)  Because they are protected by volatile base
123 <     * reads, reads of the queue array and its slots by other threads
124 <     * do not need volatile load semantics, but writes (in push)
125 <     * require store order and CASes (in pop and deq) require
126 <     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
127 <     * algorithm has similar properties, but without support for
128 <     * nulling slots.)  Since these combinations aren't supported
129 <     * using ordinary volatiles, the only way to accomplish these
130 <     * efficiently is to use direct Unsafe calls. (Using external
131 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
132 <     * array is significantly slower because of memory locality and
133 <     * indirection effects.)
122 >     * joinTask.)  Because they are protected by volatile base reads,
123 >     * reads of the queue array and its slots by other threads do not
124 >     * need volatile load semantics, but writes (in push) require
125 >     * store order and CASes (in pop and deq) require (volatile) CAS
126 >     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
127 >     * similar properties, but without support for nulling slots.)
128 >     * Since these combinations aren't supported using ordinary
129 >     * volatiles, the only way to accomplish these efficiently is to
130 >     * use direct Unsafe calls. (Using external AtomicIntegers and
131 >     * AtomicReferenceArrays for the indices and array is
132 >     * significantly slower because of memory locality and indirection
133 >     * effects.)
134       *
135       * Further, performance on most platforms is very sensitive to
136       * placement and sizing of the (resizable) queue array.  Even
# Line 137 | Line 166 | public class ForkJoinWorkerThread extend
166          5L * 1000L * 1000L * 1000L; // 5 secs
167  
168      /**
169 +     * The maximum stolen->joining link depth allowed in helpJoinTask.
170 +     * Depths for legitimate chains are unbounded, but we use a fixed
171 +     * constant to avoid (otherwise unchecked) cycles and bound
172 +     * staleness of traversal parameters at the expense of sometimes
173 +     * blocking when we could be helping.
174 +     */
175 +    private static final int MAX_HELP_DEPTH = 8;
176 +
177 +    /**
178       * Capacity of work-stealing queue array upon initialization.
179 <     * Must be a power of two. Initial size must be at least 2, but is
179 >     * Must be a power of two. Initial size must be at least 4, but is
180       * padded to minimize cache effects.
181       */
182      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
# Line 178 | Line 216 | public class ForkJoinWorkerThread extend
216      private int sp;
217  
218      /**
219 +     * The index of most recent stealer, used as a hint to avoid
220 +     * traversal in method helpJoinTask. This is only a hint because a
221 +     * worker might have had multiple steals and this only holds one
222 +     * of them (usually the most current). Declared non-volatile,
223 +     * relying on other prevailing sync to keep reasonably current.
224 +     */
225 +    private int stealHint;
226 +
227 +    /**
228       * Run state of this worker. In addition to the usual run levels,
229       * tracks if this worker is suspended as a spare, and if it was
230       * killed (trimmed) while suspended. However, "active" status is
# Line 196 | Line 243 | public class ForkJoinWorkerThread extend
243       * currently not exported but included because volatile write upon
244       * park also provides a workaround for a JVM bug.
245       */
246 <    private volatile int parkCount;
246 >    volatile int parkCount;
247  
248      /**
249       * Number of steals, transferred and reset in pool callbacks pool
# Line 218 | Line 265 | public class ForkJoinWorkerThread extend
265  
266      /**
267       * True if use local fifo, not default lifo, for local polling.
268 <     * Shadows value from ForkJoinPool, which resets it if changed
222 <     * pool-wide.
268 >     * Shadows value from ForkJoinPool.
269       */
270 <    private boolean locallyFifo;
270 >    private final boolean locallyFifo;
271  
272      /**
273       * Index of this worker in pool array. Set once by pool before
# Line 243 | Line 289 | public class ForkJoinWorkerThread extend
289      volatile long nextWaiter;
290  
291      /**
292 +     * The task currently being joined, set only when actively trying
293 +     * to helpStealer. Written only by current thread, but read by
294 +     * others.
295 +     */
296 +    private volatile ForkJoinTask<?> currentJoin;
297 +
298 +    /**
299 +     * The task most recently stolen from another worker (or
300 +     * submission queue).  Not volatile because always read/written in
301 +     * presence of related volatiles in those cases where it matters.
302 +     */
303 +    private ForkJoinTask<?> currentSteal;
304 +
305 +    /**
306       * Creates a ForkJoinWorkerThread operating in the given pool.
307       *
308       * @param pool the pool this thread works in
309       * @throws NullPointerException if pool is null
310       */
311      protected ForkJoinWorkerThread(ForkJoinPool pool) {
252        if (pool == null) throw new NullPointerException();
312          this.pool = pool;
313 +        this.locallyFifo = pool.locallyFifo;
314          // To avoid exposing construction details to subclasses,
315          // remaining initialization is in start() and onStart()
316      }
# Line 258 | Line 318 | public class ForkJoinWorkerThread extend
318      /**
319       * Performs additional initialization and starts this thread
320       */
321 <    final void start(int poolIndex, boolean locallyFifo,
262 <                     UncaughtExceptionHandler ueh) {
321 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
322          this.poolIndex = poolIndex;
323 <        this.locallyFifo = locallyFifo;
323 >        setDaemon(true);
324          if (ueh != null)
325              setUncaughtExceptionHandler(ueh);
267        setDaemon(true);
326          start();
327      }
328  
# Line 305 | Line 363 | public class ForkJoinWorkerThread extend
363          int rs = seedGenerator.nextInt();
364          seed = rs == 0? 1 : rs; // seed must be nonzero
365  
366 <        // Allocate name string and queue array in this thread
366 >        // Allocate name string and arrays in this thread
367          String pid = Integer.toString(pool.getPoolNumber());
368          String wid = Integer.toString(poolIndex);
369          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 358 | Line 416 | public class ForkJoinWorkerThread extend
416       * Find and execute tasks and check status while running
417       */
418      private void mainLoop() {
419 <        boolean ran = false; // true if ran task on previous step
419 >        int emptyScans = 0; // consecutive times failed to find work
420          ForkJoinPool p = pool;
421          for (;;) {
422 <            p.preStep(this, ran);
422 >            p.preStep(this, emptyScans);
423              if (runState != 0)
424                  return;
425              ForkJoinTask<?> t; // try to get and run stolen or submitted task
426 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
426 >            if ((t = scan()) != null || (t = pollSubmission()) != null) {
427                  t.tryExec();
428                  if (base != sp)
429                      runLocalTasks();
430 +                currentSteal = null;
431 +                emptyScans = 0;
432              }
433 +            else
434 +                ++emptyScans;
435          }
436      }
437  
# Line 397 | Line 459 | public class ForkJoinWorkerThread extend
459          while (p.hasQueuedSubmissions()) {
460              if (active || (active = p.tryIncrementActiveCount())) {
461                  ForkJoinTask<?> t = p.pollSubmission();
462 <                return t != null ? t : scan(); // if missed, rescan
462 >                if (t != null) {
463 >                    currentSteal = t;
464 >                    return t;
465 >                }
466 >                return scan(); // if missed, rescan
467              }
468          }
469          return null;
# Line 447 | Line 513 | public class ForkJoinWorkerThread extend
513       * @param t the task. Caller must ensure non-null.
514       */
515      final void pushTask(ForkJoinTask<?> t) {
450        int s;
516          ForkJoinTask<?>[] q = queue;
517          int mask = q.length - 1; // implicit assert q != null
518 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
519 <        if ((s -= base) <= 0)
520 <            pool.signalWork();
521 <        else if (s + 1 >= mask)
522 <            growQueue();
518 >        int s = sp++;            // ok to increment sp before slot write
519 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
520 >        if ((s -= base) == 0)
521 >            pool.signalWork();   // was empty
522 >        else if (s == mask)
523 >            growQueue();         // is full
524      }
525  
526      /**
527       * Tries to take a task from the base of the queue, failing if
528       * empty or contended. Note: Specializations of this code appear
529 <     * in scan and scanWhileJoining.
529 >     * in locallyDeqTask and elsewhere.
530       *
531       * @return a task, or null if none or contended
532       */
# Line 470 | Line 536 | public class ForkJoinWorkerThread extend
536          int b, i;
537          if ((b = base) != sp &&
538              (q = queue) != null && // must read q after b
539 <            (t = q[i = (q.length - 1) & b]) != null &&
539 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
540              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
541              base = b + 1;
542              return t;
# Line 490 | Line 556 | public class ForkJoinWorkerThread extend
556              ForkJoinTask<?> t;
557              int b, i;
558              while (sp != (b = base)) {
559 <                if ((t = q[i = (q.length - 1) & b]) != null &&
559 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
560                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
561                                                  t, null)) {
562                      base = b + 1;
# Line 503 | Line 569 | public class ForkJoinWorkerThread extend
569  
570      /**
571       * Returns a popped task, or null if empty. Assumes active status.
572 <     * Called only by current thread. (Note: a specialization of this
507 <     * code appears in popWhileJoining.)
572 >     * Called only by current thread.
573       */
574      final ForkJoinTask<?> popTask() {
575          int s;
# Line 623 | Line 688 | public class ForkJoinWorkerThread extend
688                  ForkJoinWorkerThread v = ws[k & mask];
689                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
690                  if (v != null && v.base != v.sp) {
691 <                    int b, i;             // inline specialized deqTask
692 <                    ForkJoinTask<?>[] q;
693 <                    ForkJoinTask<?> t;
694 <                    if ((canSteal ||      // ensure active status
695 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
696 <                        (q = v.queue) != null &&
697 <                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
698 <                        UNSAFE.compareAndSwapObject
699 <                        (q, (i << qShift) + qBase, t, null)) {
700 <                        v.base = b + 1;
701 <                        seed = r;
702 <                        ++stealCount;
703 <                        return t;
691 >                    if (canSteal ||       // ensure active status
692 >                        (canSteal = active = p.tryIncrementActiveCount())) {
693 >                        int b = v.base;   // inline specialized deqTask
694 >                        ForkJoinTask<?>[] q;
695 >                        if (b != v.sp && (q = v.queue) != null) {
696 >                            ForkJoinTask<?> t;
697 >                            int i = (q.length - 1) & b;
698 >                            long u = (i << qShift) + qBase; // raw offset
699 >                            if ((t = q[i]) != null && v.base == b &&
700 >                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
701 >                                currentSteal = t;
702 >                                v.stealHint = poolIndex;
703 >                                v.base = b + 1;
704 >                                seed = r;
705 >                                ++stealCount;
706 >                                return t;
707 >                            }
708 >                        }
709                      }
710                      j = -n;
711                      k = r;                // restart on contention
# Line 690 | Line 760 | public class ForkJoinWorkerThread extend
760      }
761  
762      /**
763 <     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
763 >     * Instrumented version of park used by ForkJoinPool.eventSync
764       */
765      final void doPark() {
766          ++parkCount;
# Line 698 | Line 768 | public class ForkJoinWorkerThread extend
768      }
769  
770      /**
771 <     * If suspended, tries to set status to unsuspended.
702 <     * Caller must unpark to actually resume
771 >     * If suspended, tries to set status to unsuspended and unparks.
772       *
773       * @return true if successful
774       */
775 <    final boolean tryUnsuspend() {
776 <        int s;
777 <        return (((s = runState) & SUSPENDED) != 0 &&
778 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
779 <                                         s & ~SUSPENDED));
775 >    final boolean tryResumeSpare() {
776 >        int s = runState;
777 >        if ((s & SUSPENDED) != 0 &&
778 >            UNSAFE.compareAndSwapInt(this, runStateOffset, s,
779 >                                     s & ~SUSPENDED)) {
780 >            LockSupport.unpark(this);
781 >            return true;
782 >        }
783 >        return false;
784      }
785  
786      /**
# Line 728 | Line 801 | public class ForkJoinWorkerThread extend
801                                                s | SUSPENDED))
802                  break;
803          }
804 +        int pc = pool.parallelism;
805 +        pool.accumulateStealCount(this);
806 +        boolean timed;
807 +        long nanos;
808 +        long startTime;
809 +        if (poolIndex < pc) { // untimed wait for core threads
810 +            timed = false;
811 +            nanos = 0L;
812 +            startTime = 0L;
813 +        }
814 +        else {                // timed wait for added threads
815 +            timed = true;
816 +            nanos = SPARE_KEEPALIVE_NANOS;
817 +            startTime = System.nanoTime();
818 +        }
819          lastEventCount = 0;      // reset upon resume
732        ForkJoinPool p = pool;
733        p.releaseWaiters();      // help others progress
734        p.accumulateStealCount(this);
820          interrupted();           // clear/ignore interrupts
736        if (poolIndex < p.getParallelism()) { // untimed wait
737            while ((runState & SUSPENDED) != 0)
738                doPark();
739            return true;
740        }
741        return timedSuspend();   // timed wait if apparently non-core
742    }
743
744    /**
745     * Blocks as spare until resumed or timed out
746     * @return false if trimmed
747     */
748    private boolean timedSuspend() {
749        long nanos = SPARE_KEEPALIVE_NANOS;
750        long startTime = System.nanoTime();
821          while ((runState & SUSPENDED) != 0) {
822              ++parkCount;
823 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
823 >            if (!timed)
824 >                LockSupport.park(this);
825 >            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
826                  LockSupport.parkNanos(this, nanos);
827              else { // try to trim on timeout
828                  int s = runState;
# Line 774 | Line 846 | public class ForkJoinWorkerThread extend
846      }
847  
848      /**
777     * Set locallyFifo mode. Called only by ForkJoinPool
778     */
779    final void setAsyncMode(boolean async) {
780        locallyFifo = async;
781    }
782
783    /**
849       * Removes and cancels all tasks in queue.  Can be called from any
850       * thread.
851       */
852      final void cancelTasks() {
853 +        ForkJoinTask<?> cj = currentJoin; // try to kill live tasks
854 +        if (cj != null) {
855 +            currentJoin = null;
856 +            cj.cancelIgnoringExceptions();
857 +        }
858 +        ForkJoinTask<?> cs = currentSteal;
859 +        if (cs != null) {
860 +            currentSteal = null;
861 +            cs.cancelIgnoringExceptions();
862 +        }
863          while (base != sp) {
864              ForkJoinTask<?> t = deqTask();
865              if (t != null)
# Line 812 | Line 887 | public class ForkJoinWorkerThread extend
887      // Support methods for ForkJoinTask
888  
889      /**
890 +     * Gets and removes a local task.
891 +     *
892 +     * @return a task, if available
893 +     */
894 +    final ForkJoinTask<?> pollLocalTask() {
895 +        while (sp != base) {
896 +            if (active || (active = pool.tryIncrementActiveCount()))
897 +                return locallyFifo? locallyDeqTask() : popTask();
898 +        }
899 +        return null;
900 +    }
901 +
902 +    /**
903 +     * Gets and removes a local or stolen task.
904 +     *
905 +     * @return a task, if available
906 +     */
907 +    final ForkJoinTask<?> pollTask() {
908 +        ForkJoinTask<?> t = pollLocalTask();
909 +        if (t == null) {
910 +            t = scan();
911 +            currentSteal = null; // cannot retain/track
912 +        }
913 +        return t;
914 +    }
915 +
916 +    /**
917 +     * Possibly runs some tasks and/or blocks, until task is done.
918 +     * The main body is basically a big spinloop, alternating between
919 +     * calls to helpJoinTask and pool.tryAwaitJoin with increased
920 +     * patience parameters until either the task is done without
921 +     * waiting, or we have, if necessary, created or resumed a
922 +     * replacement for this thread while it blocks.
923 +     *
924 +     * @param joinMe the task to join
925 +     * @return task status on exit
926 +     */
927 +     final int joinTask(ForkJoinTask<?> joinMe) {
928 +        int stat;
929 +        ForkJoinTask<?> prevJoin = currentJoin;
930 +        // Only written by this thread; only need ordered store
931 +        UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
932 +        if ((stat = joinMe.status) >= 0 &&
933 +            (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
934 +            for (int retries = 0; ; ++retries) {
935 +                helpJoinTask(joinMe, retries);
936 +                if ((stat = joinMe.status) < 0)
937 +                    break;
938 +                pool.tryAwaitJoin(joinMe, retries);
939 +                if ((stat = joinMe.status) < 0)
940 +                    break;
941 +                Thread.yield(); // tame unbounded loop
942 +            }
943 +        }
944 +        UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
945 +        return stat;
946 +    }
947 +
948 +    /**
949 +     * Run tasks in local queue until given task is done.
950 +     *
951 +     * @param joinMe the task to join
952 +     * @return task status on exit
953 +     */
954 +    private int localHelpJoinTask(ForkJoinTask<?> joinMe) {
955 +        int stat, s;
956 +        ForkJoinTask<?>[] q;
957 +        while ((stat = joinMe.status) >= 0 &&
958 +               base != (s = sp) && (q = queue) != null) {
959 +            ForkJoinTask<?> t;
960 +            int i = (q.length - 1) & --s;
961 +            long u = (i << qShift) + qBase; // raw offset
962 +            if ((t = q[i]) != null &&
963 +                UNSAFE.compareAndSwapObject(q, u, t, null)) {
964 +                /*
965 +                 * This recheck (and similarly in helpJoinTask)
966 +                 * handles cases where joinMe is independently
967 +                 * cancelled or forced even though there is other work
968 +                 * available. Back out of the pop by putting t back
969 +                 * into slot before we commit by writing sp.
970 +                 */
971 +                if ((stat = joinMe.status) < 0) {
972 +                    UNSAFE.putObjectVolatile(q, u, t);
973 +                    break;
974 +                }
975 +                sp = s;
976 +                t.tryExec();
977 +            }
978 +        }
979 +        return stat;
980 +    }
981 +
982 +    /**
983 +     * Tries to locate and help perform tasks for a stealer of the
984 +     * given task, or in turn one of its stealers.  Traces
985 +     * currentSteal->currentJoin links looking for a thread working on
986 +     * a descendant of the given task and with a non-empty queue to
987 +     * steal back and execute tasks from. Restarts search upon
988 +     * encountering chains that are stale, unknown, or of length
989 +     * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles.
990 +     *
991 +     * The implementation is very branchy to cope with the restart
992 +     * cases.  Returns void, not task status (which must be reread by
993 +     * caller anyway) to slightly simplify control paths.
994 +     *
995 +     * @param joinMe the task to join
996 +     * @param rescans the number of times to recheck for work
997 +     */
998 +    private void helpJoinTask(ForkJoinTask<?> joinMe, int rescans) {
999 +        ForkJoinWorkerThread[] ws = pool.workers;
1000 +        int n;
1001 +        if (ws == null || (n = ws.length) <= 1)
1002 +            return;                   // need at least 2 workers
1003 +        restart:while (rescans-- >= 0 && joinMe.status >= 0) {
1004 +            ForkJoinTask<?> task = joinMe;        // base of chain
1005 +            ForkJoinWorkerThread thread = this;   // thread with stolen task
1006 +            for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
1007 +                // Try to find v, the stealer of task, by first using hint
1008 +                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1009 +                if (v == null || v.currentSteal != task) {
1010 +                    for (int j = 0; ; ++j) {      // search array
1011 +                        if (task.status < 0 || j == n)
1012 +                            continue restart;     // stale or no stealer
1013 +                        if ((v = ws[j]) != null && v.currentSteal == task) {
1014 +                            thread.stealHint = j; // save for next time
1015 +                            break;
1016 +                        }
1017 +                    }
1018 +                }
1019 +                // Try to help v, using specialized form of deqTask
1020 +                int b;
1021 +                ForkJoinTask<?>[] q;
1022 +                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1023 +                    int i = (q.length - 1) & b;
1024 +                    long u = (i << qShift) + qBase;
1025 +                    ForkJoinTask<?> t = q[i];
1026 +                    if (task.status < 0)          // stale
1027 +                        continue restart;
1028 +                    if (t != null) {
1029 +                        if (v.base == b &&
1030 +                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
1031 +                            if (joinMe.status < 0) {
1032 +                                UNSAFE.putObjectVolatile(q, u, t);
1033 +                                return;           // back out on cancel
1034 +                            }
1035 +                            ForkJoinTask<?> prevSteal = currentSteal;
1036 +                            currentSteal = t;
1037 +                            v.stealHint = poolIndex;
1038 +                            v.base = b + 1;
1039 +                            t.tryExec();
1040 +                            currentSteal = prevSteal;
1041 +                        }
1042 +                    }
1043 +                    else if (v.base == b)          // producer stalled
1044 +                        continue restart;          // retry via restart
1045 +                    if (joinMe.status < 0)
1046 +                        return;
1047 +                }
1048 +                // Try to descend to find v's stealer
1049 +                ForkJoinTask<?> next = v.currentJoin;
1050 +                if (next == null || next == task || task.status < 0)
1051 +                    continue restart;             // no descendent or stale
1052 +                if (joinMe.status < 0)
1053 +                    return;
1054 +                task = next;
1055 +                thread = v;
1056 +            }
1057 +        }
1058 +    }
1059 +
1060 +    /**
1061       * Returns an estimate of the number of tasks, offset by a
1062       * function of number of idle workers.
1063       *
# Line 863 | Line 1109 | public class ForkJoinWorkerThread extend
1109      }
1110  
1111      /**
866     * Gets and removes a local task.
867     *
868     * @return a task, if available
869     */
870    final ForkJoinTask<?> pollLocalTask() {
871        while (base != sp) {
872            if (active || (active = pool.tryIncrementActiveCount()))
873                return locallyFifo? locallyDeqTask() : popTask();
874        }
875        return null;
876    }
877
878    /**
879     * Gets and removes a local or stolen task.
880     *
881     * @return a task, if available
882     */
883    final ForkJoinTask<?> pollTask() {
884        ForkJoinTask<?> t;
885        return (t = pollLocalTask()) != null ? t : scan();
886    }
887
888    /**
889     * Executes or processes other tasks awaiting the given task
890     * @return task completion status
891     */
892    final int execWhileJoining(ForkJoinTask<?> joinMe) {
893        int s;
894        while ((s = joinMe.status) >= 0) {
895            ForkJoinTask<?> t = base != sp?
896                popWhileJoining(joinMe) :
897                scanWhileJoining(joinMe);
898            if (t != null)
899                t.tryExec();
900        }
901        return s;
902    }
903
904    /**
905     * Returns or stolen task, if available, unless joinMe is done
906     *
907     * This method is intrinsically nonmodular. To maintain the
908     * property that tasks are never stolen if the awaited task is
909     * ready, we must interleave mechanics of scan with status
910     * checks. We rely here on the commit points of deq that allow us
911     * to cancel a steal even after CASing slot to null, but before
912     * adjusting base index: If, after the CAS, we see that joinMe is
913     * ready, we can back out by placing the task back into the slot,
914     * without adjusting index. The loop is otherwise a variant of the
915     * one in scan().
916     *
917     */
918    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
919        int r = seed;
920        ForkJoinPool p = pool;
921        ForkJoinWorkerThread[] ws;
922        int n;
923        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
924            int mask = n - 1;
925            int k = r;
926            boolean contended = false; // to retry loop if deq contends
927            for (int j = -n; j <= n; ++j) {
928                if (joinMe.status < 0)
929                    break outer;
930                int b;
931                ForkJoinTask<?>[] q;
932                ForkJoinWorkerThread v = ws[k & mask];
933                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
934                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
935                    int i = (q.length - 1) & b;
936                    ForkJoinTask<?> t = q[i];
937                    if (t != null && UNSAFE.compareAndSwapObject
938                        (q, (i << qShift) + qBase, t, null)) {
939                        if (joinMe.status >= 0) {
940                            v.base = b + 1;
941                            seed = r;
942                            ++stealCount;
943                            return t;
944                        }
945                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
946                        break outer; // back out
947                    }
948                    contended = true;
949                }
950                k = j < 0 ? r : (k + ((n >>> 1) | 1));
951            }
952            if (!contended && p.tryAwaitBusyJoin(joinMe))
953                break;
954        }
955        return null;
956    }
957
958    /**
959     * Version of popTask with join checks surrounding extraction.
960     * Uses the same backout strategy as helpJoinTask. Note that
961     * we ignore locallyFifo flag for local tasks here since helping
962     * joins only make sense in LIFO mode.
963     *
964     * @return a popped task, if available, unless joinMe is done
965     */
966    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
967        int s;
968        ForkJoinTask<?>[] q;
969        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
970            int i = (q.length - 1) & --s;
971            ForkJoinTask<?> t = q[i];
972            if (t != null && UNSAFE.compareAndSwapObject
973                (q, (i << qShift) + qBase, t, null)) {
974                if (joinMe.status >= 0) {
975                    sp = s;
976                    return t;
977                }
978                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
979                break;  // back out
980            }
981        }
982        return null;
983    }
984
985    /**
1112       * Runs tasks until {@code pool.isQuiescent()}.
1113       */
1114      final void helpQuiescePool() {
1115          for (;;) {
1116              ForkJoinTask<?> t = pollLocalTask();
1117 <            if (t != null || (t = scan()) != null)
1117 >            if (t != null || (t = scan()) != null) {
1118                  t.tryExec();
1119 +                currentSteal = null;
1120 +            }
1121              else {
1122                  ForkJoinPool p = pool;
1123                  if (active) {
# Line 1010 | Line 1138 | public class ForkJoinWorkerThread extend
1138      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1139      private static final long runStateOffset =
1140          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1141 +    private static final long currentJoinOffset =
1142 +        objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1143      private static final long qBase =
1144          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1145      private static final int qShift;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines