ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.17 by dl, Wed Jul 7 20:41:24 2010 UTC vs.
Revision 1.18 by dl, Wed Aug 11 18:45:45 2010 UTC

# Line 83 | Line 83 | public class ForkJoinWorkerThread extend
83       *
84       * When a worker would otherwise be blocked waiting to join a
85       * task, it first tries a form of linear helping: Each worker
86 <     * records (in field stolen) the most recent task it stole
87 <     * from some other worker. Plus, it records (in field joining) the
88 <     * task it is currently actively joining. Method joinTask uses
86 >     * records (in field currentSteal) the most recent task it stole
87 >     * from some other worker. Plus, it records (in field currentJoin)
88 >     * the task it is currently actively joining. Method joinTask uses
89       * these markers to try to find a worker to help (i.e., steal back
90       * a task from and execute it) that could hasten completion of the
91       * actively joined task. In essence, the joiner executes a task
# Line 95 | Line 95 | public class ForkJoinWorkerThread extend
95       * technique for implementing efficient futures" SIGPLAN Notices,
96       * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
97       * in that: (1) We only maintain dependency links across workers
98 <     * upon steals, rather than maintain per-task bookkeeping.  This
99 <     * requires a linear scan of workers array to locate stealers,
100 <     * which isolates cost to when it is needed, rather than adding to
101 <     * per-task overhead.  (2) It is "shallow", ignoring nesting and
102 <     * potentially cyclic mutual steals.  (3) It is intentionally
103 <     * racy: field joining is updated only while actively joining,
104 <     * which means that we could miss links in the chain during
105 <     * long-lived tasks, GC stalls etc.  (4) We fall back to
106 <     * suspending the worker and if necessary replacing it with a
107 <     * spare (see ForkJoinPool.tryAwaitJoin).
98 >     * upon steals, rather than use per-task bookkeeping.  This may
99 >     * require a linear scan of workers array to locate stealers, but
100 >     * usually doesn't because stealers leave hints (that may become
101 >     * stale/wrong) of where to locate them. This isolates cost to
102 >     * when it is needed, rather than adding to per-task overhead.
103 >     * (2) It is "shallow", ignoring nesting and potentially cyclic
104 >     * mutual steals.  (3) It is intentionally racy: field currentJoin
105 >     * is updated only while actively joining, which means that we
106 >     * miss links in the chain during long-lived tasks, GC stalls etc
107 >     * (which is OK since blocking in such cases is usually a good
108 >     * idea).  (4) We bound the number of attempts to find work (see
109 >     * MAX_HELP_DEPTH) and fall back to suspending the worker and if
110 >     * necessary replacing it with a spare (see
111 >     * ForkJoinPool.tryAwaitJoin).
112       *
113 <     * Efficient implementation of these algorithms currently relies on
114 <     * an uncomfortable amount of "Unsafe" mechanics. To maintain
113 >     * Efficient implementation of these algorithms currently relies
114 >     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
115       * correct orderings, reads and writes of variable base require
116       * volatile ordering.  Variable sp does not require volatile
117       * writes but still needs store-ordering, which we accomplish by
118       * pre-incrementing sp before filling the slot with an ordered
119       * store.  (Pre-incrementing also enables backouts used in
120 <     * scanWhileJoining.)  Because they are protected by volatile base
121 <     * reads, reads of the queue array and its slots by other threads
122 <     * do not need volatile load semantics, but writes (in push)
123 <     * require store order and CASes (in pop and deq) require
124 <     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
125 <     * algorithm has similar properties, but without support for
126 <     * nulling slots.)  Since these combinations aren't supported
127 <     * using ordinary volatiles, the only way to accomplish these
128 <     * efficiently is to use direct Unsafe calls. (Using external
129 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
130 <     * array is significantly slower because of memory locality and
131 <     * indirection effects.)
120 >     * joinTask.)  Because they are protected by volatile base reads,
121 >     * reads of the queue array and its slots by other threads do not
122 >     * need volatile load semantics, but writes (in push) require
123 >     * store order and CASes (in pop and deq) require (volatile) CAS
124 >     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
125 >     * similar properties, but without support for nulling slots.)
126 >     * Since these combinations aren't supported using ordinary
127 >     * volatiles, the only way to accomplish these efficiently is to
128 >     * use direct Unsafe calls. (Using external AtomicIntegers and
129 >     * AtomicReferenceArrays for the indices and array is
130 >     * significantly slower because of memory locality and indirection
131 >     * effects.)
132       *
133       * Further, performance on most platforms is very sensitive to
134       * placement and sizing of the (resizable) queue array.  Even
# Line 149 | Line 153 | public class ForkJoinWorkerThread extend
153      private static final Random seedGenerator = new Random();
154  
155      /**
156 <     * The timeout value for suspending spares. Spare workers that
157 <     * remain unsignalled for more than this time may be trimmed
158 <     * (killed and removed from pool).  Since our goal is to avoid
159 <     * long-term thread buildup, the exact value of timeout does not
160 <     * matter too much so long as it avoids most false-alarm timeouts
157 <     * under GC stalls or momentarily high system load.
156 >     * The maximum stolen->joining link depth allowed in helpJoinTask.
157 >     * Depths for legitimate chains are unbounded, but we use a fixed
158 >     * constant to avoid (otherwise unchecked) cycles and bound
159 >     * staleness of traversal parameters at the expense of sometimes
160 >     * blocking when we could be helping.
161       */
162 <    private static final long SPARE_KEEPALIVE_NANOS =
163 <        5L * 1000L * 1000L * 1000L; // 5 secs
162 >    private static final int MAX_HELP_DEPTH = 8;
163 >
164 >    /**
165 >     * The wakeup interval (in nanoseconds) for the first worker
166 >     * suspended as spare.  On each wakeup not signalled by a
167 >     * resumption, it may ask the pool to reduce the number of spares.
168 >     */
169 >    private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L;
170  
171      /**
172       * Capacity of work-stealing queue array upon initialization.
# Line 180 | Line 189 | public class ForkJoinWorkerThread extend
189      final ForkJoinPool pool;
190  
191      /**
183     * The task most recently stolen from another worker
184     */
185    private volatile ForkJoinTask<?> stolen;
186
187    /**
188     * The task currently being joined, set only when actively
189     * trying to helpStealer.
190     */
191    private volatile ForkJoinTask<?> joining;
192
193    /**
192       * The work-stealing queue array. Size must be a power of two.
193       * Initialized in onStart, to improve memory locality.
194       */
# Line 212 | Line 210 | public class ForkJoinWorkerThread extend
210      private int sp;
211  
212      /**
213 +     * The index of most recent stealer, used as a hint to avoid
214 +     * traversal in method helpJoinTask. This is only a hint because a
215 +     * worker might have had multiple steals and this only holds one
216 +     * of them (usually the most current). Declared non-volatile,
217 +     * relying on other prevailing sync to keep reasonably current.
218 +     */
219 +    private int stealHint;
220 +
221 +    /**
222       * Run state of this worker. In addition to the usual run levels,
223       * tracks if this worker is suspended as a spare, and if it was
224       * killed (trimmed) while suspended. However, "active" status is
# Line 225 | Line 232 | public class ForkJoinWorkerThread extend
232      private static final int TRIMMED     = 0x08; // killed while suspended
233  
234      /**
228     * Number of LockSupport.park calls to block this thread for
229     * suspension or event waits. Used for internal instrumention;
230     * currently not exported but included because volatile write upon
231     * park also provides a workaround for a JVM bug.
232     */
233    volatile int parkCount;
234
235    /**
235       * Number of steals, transferred and reset in pool callbacks pool
236       * when idle Accessed directly by pool.
237       */
# Line 252 | Line 251 | public class ForkJoinWorkerThread extend
251  
252      /**
253       * True if use local fifo, not default lifo, for local polling.
254 <     * Shadows value from ForkJoinPool, which resets it if changed
256 <     * pool-wide.
254 >     * Shadows value from ForkJoinPool.
255       */
256      private final boolean locallyFifo;
257 <    
257 >
258      /**
259       * Index of this worker in pool array. Set once by pool before
260       * running, and accessed directly by pool to locate this worker in
# Line 277 | Line 275 | public class ForkJoinWorkerThread extend
275      volatile long nextWaiter;
276  
277      /**
278 +     * Number of times this thread suspended as spare
279 +     */
280 +    int spareCount;
281 +
282 +    /**
283 +     * Encoded index and count of next spare waiter. Used only
284 +     * by ForkJoinPool for managing spares.
285 +     */
286 +    volatile int nextSpare;
287 +
288 +    /**
289 +     * The task currently being joined, set only when actively trying
290 +     * to helpStealer. Written only by current thread, but read by
291 +     * others.
292 +     */
293 +    private volatile ForkJoinTask<?> currentJoin;
294 +
295 +    /**
296 +     * The task most recently stolen from another worker (or
297 +     * submission queue).  Not volatile because always read/written in
298 +     * presence of related volatiles in those cases where it matters.
299 +     */
300 +    private ForkJoinTask<?> currentSteal;
301 +
302 +    /**
303       * Creates a ForkJoinWorkerThread operating in the given pool.
304       *
305       * @param pool the pool this thread works in
# Line 285 | Line 308 | public class ForkJoinWorkerThread extend
308      protected ForkJoinWorkerThread(ForkJoinPool pool) {
309          this.pool = pool;
310          this.locallyFifo = pool.locallyFifo;
311 +        setDaemon(true);
312          // To avoid exposing construction details to subclasses,
313          // remaining initialization is in start() and onStart()
314      }
# Line 296 | Line 320 | public class ForkJoinWorkerThread extend
320          this.poolIndex = poolIndex;
321          if (ueh != null)
322              setUncaughtExceptionHandler(ueh);
299        setDaemon(true);
323          start();
324      }
325  
# Line 355 | Line 378 | public class ForkJoinWorkerThread extend
378       */
379      protected void onTermination(Throwable exception) {
380          try {
358            stolen = null;
359            joining = null;
381              cancelTasks();
382 +            while (active)              // force inactive
383 +                active = !pool.tryDecrementActiveCount();
384              setTerminated();
385              pool.workerTerminated(this);
386          } catch (Throwable ex) {        // Shouldn't ever happen
# Line 392 | Line 415 | public class ForkJoinWorkerThread extend
415       * Find and execute tasks and check status while running
416       */
417      private void mainLoop() {
418 <        boolean ran = false;      // true if ran task in last loop iter
396 <        boolean prevRan = false;  // true if ran on last or previous step
418 >        int misses = 0; // track consecutive times failed to find work; max 2
419          ForkJoinPool p = pool;
420          for (;;) {
421 <            p.preStep(this, prevRan);
421 >            p.preStep(this, misses);
422              if (runState != 0)
423 <                return;
424 <            ForkJoinTask<?> t; // try to get and run stolen or submitted task
425 <            if ((t = scan()) != null || (t = pollSubmission()) != null) {
404 <                t.tryExec();
405 <                if (base != sp)
406 <                    runLocalTasks();
407 <                stolen = null;
408 <                prevRan = ran = true;
409 <            }
410 <            else {
411 <                prevRan = ran;
412 <                ran = false;
413 <            }
423 >                break;
424 >            misses = ((tryExecSteal() || tryExecSubmission()) ? 0 :
425 >                      (misses < 2 ? misses + 1 : 2));
426          }
427      }
428  
429      /**
430 <     * Runs local tasks until queue is empty or shut down.  Call only
431 <     * while active.
430 >     * Try to steal a task and execute it
431 >     *
432 >     * @return true if ran a task
433       */
434 <    private void runLocalTasks() {
435 <        while (runState == 0) {
436 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
437 <            if (t != null)
438 <                t.tryExec();
439 <            else if (base == sp)
440 <                break;
434 >    private boolean tryExecSteal() {
435 >        ForkJoinTask<?> t;
436 >        if ((t  = scan()) != null) {
437 >            t.quietlyExec();
438 >            currentSteal = null;
439 >            if (sp != base)
440 >                execLocalTasks();
441 >            return true;
442          }
443 +        return false;
444      }
445  
446      /**
447 <     * If a submission exists, try to activate and take it
447 >     * If a submission exists, try to activate and run it;
448       *
449 <     * @return a task, if available
449 >     * @return true if ran a task
450       */
451 <    private ForkJoinTask<?> pollSubmission() {
451 >    private boolean tryExecSubmission() {
452          ForkJoinPool p = pool;
453          while (p.hasQueuedSubmissions()) {
454 +            ForkJoinTask<?> t;
455              if (active || (active = p.tryIncrementActiveCount())) {
456 <                ForkJoinTask<?> t = p.pollSubmission();
457 <                return t != null ? t : scan(); // if missed, rescan
456 >                if ((t = p.pollSubmission()) != null) {
457 >                    currentSteal = t;
458 >                    t.quietlyExec();
459 >                    currentSteal = null;
460 >                    if (sp != base)
461 >                        execLocalTasks();
462 >                    return true;
463 >                }
464              }
465          }
466 <        return null;
466 >        return false;
467 >    }
468 >
469 >    /**
470 >     * Runs local tasks until queue is empty or shut down.  Call only
471 >     * while active.
472 >     */
473 >    private void execLocalTasks() {
474 >        while (runState == 0) {
475 >            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
476 >            if (t != null)
477 >                t.quietlyExec();
478 >            else if (sp == base)
479 >                break;
480 >        }
481      }
482  
483      /*
# Line 509 | Line 545 | public class ForkJoinWorkerThread extend
545          ForkJoinTask<?> t;
546          ForkJoinTask<?>[] q;
547          int b, i;
548 <        if ((b = base) != sp &&
548 >        if (sp != (b = base) &&
549              (q = queue) != null && // must read q after b
550              (t = q[i = (q.length - 1) & b]) != null && base == b &&
551              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
# Line 544 | Line 580 | public class ForkJoinWorkerThread extend
580  
581      /**
582       * Returns a popped task, or null if empty. Assumes active status.
583 <     * Called only by current thread. (Note: a specialization of this
548 <     * code appears in popWhileJoining.)
583 >     * Called only by current thread.
584       */
585 <    final ForkJoinTask<?> popTask() {
586 <        int s;
587 <        ForkJoinTask<?>[] q;
588 <        if (base != (s = sp) && (q = queue) != null) {
589 <            int i = (q.length - 1) & --s;
590 <            ForkJoinTask<?> t = q[i];
591 <            if (t != null && UNSAFE.compareAndSwapObject
592 <                (q, (i << qShift) + qBase, t, null)) {
593 <                sp = s;
594 <                return t;
585 >    private ForkJoinTask<?> popTask() {
586 >        ForkJoinTask<?>[] q = queue;
587 >        if (q != null) {
588 >            int s;
589 >            while ((s = sp) != base) {
590 >                int i = (q.length - 1) & --s;
591 >                long u = (i << qShift) + qBase; // raw offset
592 >                ForkJoinTask<?> t = q[i];
593 >                if (t == null)   // lost to stealer
594 >                    break;
595 >                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
596 >                    sp = s; // putOrderedInt may encourage more timely write
597 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
598 >                    return t;
599 >                }
600              }
601          }
602          return null;
# Line 571 | Line 611 | public class ForkJoinWorkerThread extend
611       */
612      final boolean unpushTask(ForkJoinTask<?> t) {
613          int s;
614 <        ForkJoinTask<?>[] q;
615 <        if (base != (s = sp) && (q = queue) != null &&
614 >        ForkJoinTask<?>[] q = queue;
615 >        if ((s = sp) != base && q != null &&
616              UNSAFE.compareAndSwapObject
617              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
618              sp = s;
619 +            // UNSAFE.putOrderedInt(this, spOffset, s);
620              return true;
621          }
622          return false;
# Line 664 | Line 705 | public class ForkJoinWorkerThread extend
705                  ForkJoinWorkerThread v = ws[k & mask];
706                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
707                  if (v != null && v.base != v.sp) {
708 <                    if (canSteal ||       // ensure active status
709 <                        (canSteal = active = p.tryIncrementActiveCount())) {
710 <                        int b = v.base;   // inline specialized deqTask
711 <                        ForkJoinTask<?>[] q;
712 <                        if (b != v.sp && (q = v.queue) != null) {
713 <                            ForkJoinTask<?> t;
714 <                            int i = (q.length - 1) & b;
715 <                            long u = (i << qShift) + qBase; // raw offset
716 <                            if ((t = q[i]) != null && v.base == b &&
717 <                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
718 <                                stolen = t;
719 <                                v.base = b + 1;
720 <                                seed = r;
721 <                                ++stealCount;
722 <                                return t;
723 <                            }
708 >                    ForkJoinTask<?>[] q; int b;
709 >                    if ((canSteal ||       // ensure active status
710 >                         (canSteal = active = p.tryIncrementActiveCount())) &&
711 >                        (q = v.queue) != null && (b = v.base) != v.sp) {
712 >                        int i = (q.length - 1) & b;
713 >                        long u = (i << qShift) + qBase; // raw offset
714 >                        ForkJoinTask<?> t = q[i];
715 >                        if (v.base == b && t != null &&
716 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
717 >                            int pid = poolIndex;
718 >                            currentSteal = t;
719 >                            v.stealHint = pid;
720 >                            v.base = b + 1;
721 >                            seed = r;
722 >                            ++stealCount;
723 >                            return t;
724                          }
725                      }
726                      j = -n;
# Line 699 | Line 740 | public class ForkJoinWorkerThread extend
740      // Run State management
741  
742      // status check methods used mainly by ForkJoinPool
743 +    final boolean isRunning()     { return runState == 0; }
744      final boolean isTerminating() { return (runState & TERMINATING) != 0; }
745      final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
746      final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
747      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
748  
749      /**
750 <     * Sets state to TERMINATING, also resuming if suspended.
750 >     * Sets state to TERMINATING, also, unless "quiet", unparking if
751 >     * not already terminated
752 >     *
753 >     * @param quiet don't unpark (used for faster status updates on
754 >     * pool termination)
755       */
756 <    final void shutdown() {
756 >    final void shutdown(boolean quiet) {
757          for (;;) {
758              int s = runState;
759 +            if ((s & (TERMINATING|TERMINATED)) != 0)
760 +                break;
761              if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
762                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
763                                               (s & ~SUSPENDED) |
764 <                                             (TRIMMED|TERMINATING))) {
717 <                    LockSupport.unpark(this);
764 >                                             (TRIMMED|TERMINATING)))
765                      break;
719                }
766              }
767              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
768                                                s | TERMINATING))
769                  break;
770          }
771 +        if (!quiet && (runState & TERMINATED) != 0)
772 +            LockSupport.unpark(this);
773      }
774  
775      /**
776 <     * Sets state to TERMINATED. Called only by this thread.
776 >     * Sets state to TERMINATED. Called only by onTermination()
777       */
778      private void setTerminated() {
779          int s;
# Line 735 | Line 783 | public class ForkJoinWorkerThread extend
783      }
784  
785      /**
786 <     * Instrumented version of park used by ForkJoinPool.awaitEvent
739 <     */
740 <    final void doPark() {
741 <        ++parkCount;
742 <        LockSupport.park(this);
743 <    }
744 <
745 <    /**
746 <     * If suspended, tries to set status to unsuspended.
747 <     * Caller must unpark to actually resume
786 >     * If suspended, tries to set status to unsuspended and unparks.
787       *
788       * @return true if successful
789       */
790      final boolean tryUnsuspend() {
791 <        int s = runState;
792 <        if ((s & SUSPENDED) != 0)
793 <            return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
794 <                                            s & ~SUSPENDED);
791 >        int s;
792 >        while (((s = runState) & SUSPENDED) != 0) {
793 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
794 >                                         s & ~SUSPENDED))
795 >                return true;
796 >        }
797          return false;
798      }
799  
800      /**
801 <     * Sets suspended status and blocks as spare until resumed,
802 <     * shutdown, or timed out.
803 <     *
763 <     * @return false if trimmed
801 >     * Sets suspended status and blocks as spare until resumed
802 >     * or shutdown.
803 >     * @returns true if still running on exit
804       */
805      final boolean suspendAsSpare() {
806 <        for (;;) {               // set suspended unless terminating
806 >        lastEventCount = 0;         // reset upon resume
807 >        for (;;) {                  // set suspended unless terminating
808              int s = runState;
809              if ((s & TERMINATING) != 0) { // must kill
810                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
# Line 774 | Line 815 | public class ForkJoinWorkerThread extend
815                                                s | SUSPENDED))
816                  break;
817          }
818 <        boolean timed;
819 <        long nanos;
779 <        long startTime;
780 <        if (poolIndex < pool.parallelism) {
781 <            timed = false;
782 <            nanos = 0L;
783 <            startTime = 0L;
784 <        }
785 <        else {
786 <            timed = true;
787 <            nanos = SPARE_KEEPALIVE_NANOS;
788 <            startTime = System.nanoTime();
789 <        }
790 <        pool.accumulateStealCount(this);
791 <        lastEventCount = 0;      // reset upon resume
792 <        interrupted();           // clear/ignore interrupts
818 >        ForkJoinPool p = pool;
819 >        p.pushSpare(this);
820          while ((runState & SUSPENDED) != 0) {
821 <            ++parkCount;
822 <            if (!timed)
821 >            if (!p.tryAccumulateStealCount(this))
822 >                continue;
823 >            interrupted();          // clear/ignore interrupts
824 >            if ((runState & SUSPENDED) == 0)
825 >                break;
826 >            if (nextSpare != 0)     // untimed
827                  LockSupport.park(this);
828 <            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
829 <                LockSupport.parkNanos(this, nanos);
830 <            else { // try to trim on timeout
831 <                int s = runState;
832 <                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
833 <                                             (s & ~SUSPENDED) |
834 <                                             (TRIMMED|TERMINATING)))
835 <                    return false;
828 >            else {
829 >                long startTime = System.nanoTime();
830 >                LockSupport.parkNanos(this, TRIM_RATE_NANOS);
831 >                if ((runState & SUSPENDED) == 0)
832 >                    break;
833 >                long now = System.nanoTime();
834 >                if (now - startTime >= TRIM_RATE_NANOS)
835 >                    pool.tryTrimSpare(now);
836              }
837          }
838 <        return true;
838 >        return runState == 0;
839      }
840  
841      // Misc support methods for ForkJoinPool
# Line 814 | Line 845 | public class ForkJoinWorkerThread extend
845       * used by ForkJoinTask.
846       */
847      final int getQueueSize() {
848 <        return -base + sp;
848 >        int n; // external calls must read base first
849 >        return (n = -base + sp) <= 0 ? 0 : n;
850      }
851  
852      /**
# Line 822 | Line 854 | public class ForkJoinWorkerThread extend
854       * thread.
855       */
856      final void cancelTasks() {
857 +        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
858 +        if (cj != null) {
859 +            currentJoin = null;
860 +            cj.cancelIgnoringExceptions();
861 +            try {
862 +                this.interrupt(); // awaken wait
863 +            } catch (SecurityException ignore) {
864 +            }
865 +        }
866 +        ForkJoinTask<?> cs = currentSteal;
867 +        if (cs != null) {
868 +            currentSteal = null;
869 +            cs.cancelIgnoringExceptions();
870 +        }
871          while (base != sp) {
872              ForkJoinTask<?> t = deqTask();
873              if (t != null)
# Line 849 | Line 895 | public class ForkJoinWorkerThread extend
895      // Support methods for ForkJoinTask
896  
897      /**
898 +     * Gets and removes a local task.
899 +     *
900 +     * @return a task, if available
901 +     */
902 +    final ForkJoinTask<?> pollLocalTask() {
903 +        while (sp != base) {
904 +            if (active || (active = pool.tryIncrementActiveCount()))
905 +                return locallyFifo? locallyDeqTask() : popTask();
906 +        }
907 +        return null;
908 +    }
909 +
910 +    /**
911 +     * Gets and removes a local or stolen task.
912 +     *
913 +     * @return a task, if available
914 +     */
915 +    final ForkJoinTask<?> pollTask() {
916 +        ForkJoinTask<?> t = pollLocalTask();
917 +        if (t == null) {
918 +            t = scan();
919 +            currentSteal = null; // cannot retain/track/help
920 +        }
921 +        return t;
922 +    }
923 +
924 +    /**
925       * Possibly runs some tasks and/or blocks, until task is done.
926       *
927       * @param joinMe the task to join
928       */
929      final void joinTask(ForkJoinTask<?> joinMe) {
930 <        ForkJoinTask<?> prevJoining = joining;
931 <        joining = joinMe;
932 <        while (joinMe.status >= 0) {
933 <            int s = sp;
934 <            if (s == base) {
935 <                nonlocalJoinTask(joinMe);
936 <                break;
937 <            }
938 <            // process local task
939 <            ForkJoinTask<?> t;
940 <            ForkJoinTask<?>[] q = queue;
930 >        // currentJoin only written by this thread; only need ordered store
931 >        ForkJoinTask<?> prevJoin = currentJoin;
932 >        UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
933 >        if (sp != base)
934 >            localHelpJoinTask(joinMe);
935 >        if (joinMe.status >= 0)
936 >            pool.awaitJoin(joinMe, this);
937 >        UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
938 >    }
939 >
940 >    /**
941 >     * Run tasks in local queue until given task is done.
942 >     *
943 >     * @param joinMe the task to join
944 >     */
945 >    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
946 >        int s;
947 >        ForkJoinTask<?>[] q;
948 >        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
949              int i = (q.length - 1) & --s;
950              long u = (i << qShift) + qBase; // raw offset
951 <            if ((t = q[i]) != null &&
952 <                UNSAFE.compareAndSwapObject(q, u, t, null)) {
951 >            ForkJoinTask<?> t = q[i];
952 >            if (t == null)  // lost to a stealer
953 >                break;
954 >            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
955                  /*
956 <                 * This recheck (and similarly in nonlocalJoinTask)
956 >                 * This recheck (and similarly in helpJoinTask)
957                   * handles cases where joinMe is independently
958                   * cancelled or forced even though there is other work
959                   * available. Back out of the pop by putting t back
960 <                 * into slot before we commit by setting sp.
960 >                 * into slot before we commit by writing sp.
961                   */
962                  if (joinMe.status < 0) {
963                      UNSAFE.putObjectVolatile(q, u, t);
964                      break;
965                  }
966                  sp = s;
967 <                t.tryExec();
967 >                // UNSAFE.putOrderedInt(this, spOffset, s);
968 >                t.quietlyExec();
969              }
970          }
887        joining = prevJoining;
971      }
972  
973      /**
974       * Tries to locate and help perform tasks for a stealer of the
975 <     * given task (or in turn one of its stealers), blocking (via
976 <     * pool.tryAwaitJoin) upon failure to find work.  Traces
894 <     * stolen->joining links looking for a thread working on
975 >     * given task, or in turn one of its stealers.  Traces
976 >     * currentSteal->currentJoin links looking for a thread working on
977       * a descendant of the given task and with a non-empty queue to
978 <     * steal back and execute tasks from. Inhibits mutual steal chains
979 <     * and scans on outer joins upon nesting to avoid unbounded
980 <     * growth.  Restarts search upon encountering inconsistencies.
981 <     * Tries to block if two passes agree that there are no remaining
982 <     * targets.
978 >     * steal back and execute tasks from.
979 >     *
980 >     * The implementation is very branchy to cope with the potential
981 >     * inconsistencies or loops encountering chains that are stale,
982 >     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
983 >     * of these cases are dealt with by just returning back to the
984 >     * caller, who is expected to retry if other join mechanisms also
985 >     * don't work out.
986       *
987       * @param joinMe the task to join
988       */
989 <    private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
990 <        ForkJoinPool p = pool;
991 <        int scans = p.parallelism;       // give up if too many retries
992 <        ForkJoinTask<?> bottom = null;   // target seen when can't descend
993 <        restart: while (joinMe.status >= 0) {
994 <            ForkJoinTask<?> target = null;
995 <            ForkJoinTask<?> next = joinMe;
996 <            while (scans >= 0 && next != null) {
997 <                --scans;
998 <                target = next;
999 <                next = null;
1000 <                ForkJoinWorkerThread v = null;
1001 <                ForkJoinWorkerThread[] ws = p.workers;
1002 <                int n = ws.length;
1003 <                for (int j = 0; j < n; ++j) {
1004 <                    ForkJoinWorkerThread w = ws[j];
1005 <                    if (w != null && w.stolen == target) {
1006 <                        v = w;
1007 <                        break;
989 >    final void helpJoinTask(ForkJoinTask<?> joinMe) {
990 >        ForkJoinWorkerThread[] ws = pool.workers;
991 >        int n; // need at least 2 workers
992 >        if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) {
993 >            ForkJoinTask<?> task = joinMe;        // base of chain
994 >            ForkJoinWorkerThread thread = this;   // thread with stolen task
995 >            for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
996 >                // Try to find v, the stealer of task, by first using hint
997 >                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
998 >                if (v == null || v.currentSteal != task) {
999 >                    for (int j = 0; ; ++j) {      // search array
1000 >                        if (j < n) {
1001 >                            if ((v = ws[j]) != null) {
1002 >                                if (task.status < 0)
1003 >                                    return;       // stale or done
1004 >                                if (v.currentSteal == task) {
1005 >                                    thread.stealHint = j;
1006 >                                    break;        // save hint for next time
1007 >                                }
1008 >                            }
1009 >                        }
1010 >                        else
1011 >                            return;               // no stealer
1012                      }
1013                  }
1014 <                if (v != null && v != this) {
1015 <                    ForkJoinTask<?> prevStolen = stolen;
1016 <                    int b;
1017 <                    ForkJoinTask<?>[] q;
1018 <                    while ((b = v.base) != v.sp && (q = v.queue) != null) {
1019 <                        int i = (q.length - 1) & b;
1020 <                        long u = (i << qShift) + qBase;
1021 <                        ForkJoinTask<?> t = q[i];
1022 <                        if (target.status < 0)
1023 <                            continue restart;
1024 <                        if (t != null && v.base == b &&
1025 <                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
1014 >                // Try to help v, using specialized form of deqTask
1015 >                int b;
1016 >                ForkJoinTask<?>[] q;
1017 >                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1018 >                    int i = (q.length - 1) & b;
1019 >                    long u = (i << qShift) + qBase;
1020 >                    ForkJoinTask<?> t = q[i];
1021 >                    if (task.status < 0)
1022 >                        return;                   // stale or done
1023 >                    if (v.base == b) {
1024 >                        if (t == null)
1025 >                            return;               // producer stalled
1026 >                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1027                              if (joinMe.status < 0) {
1028                                  UNSAFE.putObjectVolatile(q, u, t);
1029 <                                return; // back out
1029 >                                return;           // back out on cancel
1030                              }
1031 <                            stolen = t;
1031 >                            int pid = poolIndex;
1032 >                            ForkJoinTask<?> prevSteal = currentSteal;
1033 >                            currentSteal = t;
1034 >                            v.stealHint = pid;
1035                              v.base = b + 1;
1036 <                            t.tryExec();
1037 <                            stolen = prevStolen;
1036 >                            t.quietlyExec();
1037 >                            currentSteal = prevSteal;
1038                          }
946                        if (joinMe.status < 0)
947                            return;
1039                      }
1040 <                    next = v.joining;
1040 >                    if (joinMe.status < 0)
1041 >                        return;
1042                  }
1043 <                if (target.status < 0)
1044 <                    continue restart;  // inconsistent
1045 <                if (joinMe.status < 0)
1043 >                // Try to descend to find v's stealer
1044 >                ForkJoinTask<?> next = v.currentJoin;
1045 >                if (task.status < 0 || next == null || next == task ||
1046 >                    joinMe.status < 0)
1047                      return;
1048 +                task = next;
1049 +                thread = v;
1050              }
956
957            if (bottom != target)
958                bottom = target;    // recheck landing spot
959            else if (p.tryAwaitJoin(joinMe) < 0)
960                return;             // successfully blocked
961            Thread.yield();         // tame spin in case too many active
1051          }
1052      }
1053  
# Line 1014 | Line 1103 | public class ForkJoinWorkerThread extend
1103      }
1104  
1105      /**
1017     * Gets and removes a local task.
1018     *
1019     * @return a task, if available
1020     */
1021    final ForkJoinTask<?> pollLocalTask() {
1022        while (sp != base) {
1023            if (active || (active = pool.tryIncrementActiveCount()))
1024                return locallyFifo? locallyDeqTask() : popTask();
1025        }
1026        return null;
1027    }
1028
1029    /**
1030     * Gets and removes a local or stolen task.
1031     *
1032     * @return a task, if available
1033     */
1034    final ForkJoinTask<?> pollTask() {
1035        ForkJoinTask<?> t;
1036        return (t = pollLocalTask()) != null ? t : scan();
1037    }
1038
1039    /**
1106       * Runs tasks until {@code pool.isQuiescent()}.
1107       */
1108      final void helpQuiescePool() {
1109          for (;;) {
1110              ForkJoinTask<?> t = pollLocalTask();
1111              if (t != null || (t = scan()) != null) {
1112 <                t.tryExec();
1113 <                stolen = null;
1112 >                t.quietlyExec();
1113 >                currentSteal = null;
1114              }
1115              else {
1116                  ForkJoinPool p = pool;
1117                  if (active) {
1118 +                    if (!p.tryDecrementActiveCount())
1119 +                        continue;   // retry later
1120                      active = false; // inactivate
1053                    do {} while (!p.tryDecrementActiveCount());
1121                  }
1122                  if (p.isQuiescent()) {
1123                      active = true; // re-activate
# Line 1061 | Line 1128 | public class ForkJoinWorkerThread extend
1128          }
1129      }
1130  
1131 +
1132      // Unsafe mechanics
1133  
1134      private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1135 +    private static final long spOffset =
1136 +        objectFieldOffset("sp", ForkJoinWorkerThread.class);
1137      private static final long runStateOffset =
1138          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1139 +    private static final long currentJoinOffset =
1140 +        objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1141 +    private static final long currentStealOffset =
1142 +        objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1143      private static final long qBase =
1144          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1145 +
1146      private static final int qShift;
1147  
1148      static {
# Line 1087 | Line 1162 | public class ForkJoinWorkerThread extend
1162              throw error;
1163          }
1164      }
1165 +
1166   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines