ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.31 by dl, Mon Apr 5 15:52:26 2010 UTC vs.
Revision 1.35 by dl, Wed Jul 7 19:52:32 2010 UTC

# Line 83 | Line 83 | public class ForkJoinWorkerThread extend
83       * by the ForkJoinPool).  This allows use in message-passing
84       * frameworks in which tasks are never joined.
85       *
86 <     * Efficient implementation of this approach currently relies on
86 >     * When a worker would otherwise be blocked waiting to join a
87 >     * task, it first tries a form of linear helping: Each worker
88 >     * records (in field stolen) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field joining) the
90 >     * task it is currently actively joining. Method joinTask uses
91 >     * these markers to try to find a worker to help (i.e., steal back
92 >     * a task from and execute it) that could hasten completion of the
93 >     * actively joined task. In essence, the joiner executes a task
94 >     * that would be on its own local deque had the to-be-joined task
95 >     * not been stolen. This may be seen as a conservative variant of
96 >     * the approach in Wagner & Calder "Leapfrogging: a portable
97 >     * technique for implementing efficient futures" SIGPLAN Notices,
98 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 >     * in that: (1) We only maintain dependency links across workers
100 >     * upon steals, rather than maintain per-task bookkeeping.  This
101 >     * requires a linear scan of workers array to locate stealers,
102 >     * which isolates cost to when it is needed, rather than adding to
103 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
104 >     * potentially cyclic mutual steals.  (3) It is intentionally
105 >     * racy: field joining is updated only while actively joining,
106 >     * which means that we could miss links in the chain during
107 >     * long-lived tasks, GC stalls etc.  (4) We fall back to
108 >     * suspending the worker and if necessary replacing it with a
109 >     * spare (see ForkJoinPool.tryAwaitJoin).
110 >     *
111 >     * Efficient implementation of these algorithms currently relies on
112       * an uncomfortable amount of "Unsafe" mechanics. To maintain
113       * correct orderings, reads and writes of variable base require
114       * volatile ordering.  Variable sp does not require volatile
# Line 138 | Line 163 | public class ForkJoinWorkerThread extend
163  
164      /**
165       * Capacity of work-stealing queue array upon initialization.
166 <     * Must be a power of two. Initial size must be at least 2, but is
166 >     * Must be a power of two. Initial size must be at least 4, but is
167       * padded to minimize cache effects.
168       */
169      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
# Line 157 | Line 182 | public class ForkJoinWorkerThread extend
182      final ForkJoinPool pool;
183  
184      /**
185 +     * The task most recently stolen from another worker
186 +     */
187 +    private volatile ForkJoinTask<?> stolen;
188 +
189 +    /**
190 +     * The task currently being joined, set only when actively
191 +     * trying to helpStealer.
192 +     */
193 +    private volatile ForkJoinTask<?> joining;
194 +
195 +    /**
196       * The work-stealing queue array. Size must be a power of two.
197       * Initialized in onStart, to improve memory locality.
198       */
# Line 196 | Line 232 | public class ForkJoinWorkerThread extend
232       * currently not exported but included because volatile write upon
233       * park also provides a workaround for a JVM bug.
234       */
235 <    private volatile int parkCount;
235 >    volatile int parkCount;
236  
237      /**
238       * Number of steals, transferred and reset in pool callbacks pool
# Line 221 | Line 257 | public class ForkJoinWorkerThread extend
257       * Shadows value from ForkJoinPool, which resets it if changed
258       * pool-wide.
259       */
260 <    private boolean locallyFifo;
261 <
260 >    private final boolean locallyFifo;
261 >    
262      /**
263       * Index of this worker in pool array. Set once by pool before
264       * running, and accessed directly by pool to locate this worker in
# Line 249 | Line 285 | public class ForkJoinWorkerThread extend
285       * @throws NullPointerException if pool is null
286       */
287      protected ForkJoinWorkerThread(ForkJoinPool pool) {
252        if (pool == null) throw new NullPointerException();
288          this.pool = pool;
289 +        this.locallyFifo = pool.locallyFifo;
290          // To avoid exposing construction details to subclasses,
291          // remaining initialization is in start() and onStart()
292      }
# Line 258 | Line 294 | public class ForkJoinWorkerThread extend
294      /**
295       * Performs additional initialization and starts this thread
296       */
297 <    final void start(int poolIndex, boolean locallyFifo,
262 <                     UncaughtExceptionHandler ueh) {
297 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
298          this.poolIndex = poolIndex;
264        this.locallyFifo = locallyFifo;
299          if (ueh != null)
300              setUncaughtExceptionHandler(ueh);
301          setDaemon(true);
# Line 305 | Line 339 | public class ForkJoinWorkerThread extend
339          int rs = seedGenerator.nextInt();
340          seed = rs == 0? 1 : rs; // seed must be nonzero
341  
342 <        // Allocate name string and queue array in this thread
342 >        // Allocate name string and arrays in this thread
343          String pid = Integer.toString(pool.getPoolNumber());
344          String wid = Integer.toString(poolIndex);
345          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 323 | Line 357 | public class ForkJoinWorkerThread extend
357       */
358      protected void onTermination(Throwable exception) {
359          try {
360 +            stolen = null;
361 +            joining = null;
362              cancelTasks();
363              setTerminated();
364              pool.workerTerminated(this);
# Line 358 | Line 394 | public class ForkJoinWorkerThread extend
394       * Find and execute tasks and check status while running
395       */
396      private void mainLoop() {
397 <        boolean ran = false; // true if ran task on previous step
397 >        boolean ran = false;      // true if ran task in last loop iter
398 >        boolean prevRan = false;  // true if ran on last or previous step
399          ForkJoinPool p = pool;
400          for (;;) {
401 <            p.preStep(this, ran);
401 >            p.preStep(this, prevRan);
402              if (runState != 0)
403                  return;
404              ForkJoinTask<?> t; // try to get and run stolen or submitted task
405 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
405 >            if ((t = scan()) != null || (t = pollSubmission()) != null) {
406                  t.tryExec();
407                  if (base != sp)
408                      runLocalTasks();
409 +                stolen = null;
410 +                prevRan = ran = true;
411 +            }
412 +            else {
413 +                prevRan = ran;
414 +                ran = false;
415              }
416          }
417      }
# Line 447 | Line 490 | public class ForkJoinWorkerThread extend
490       * @param t the task. Caller must ensure non-null.
491       */
492      final void pushTask(ForkJoinTask<?> t) {
450        int s;
493          ForkJoinTask<?>[] q = queue;
494          int mask = q.length - 1; // implicit assert q != null
495 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
496 <        if ((s -= base) <= 0)
497 <            pool.signalWork();
498 <        else if (s + 1 >= mask)
499 <            growQueue();
495 >        int s = sp++;            // ok to increment sp before slot write
496 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
497 >        if ((s -= base) == 0)
498 >            pool.signalWork();   // was empty
499 >        else if (s == mask)
500 >            growQueue();         // is full
501      }
502  
503      /**
504       * Tries to take a task from the base of the queue, failing if
505       * empty or contended. Note: Specializations of this code appear
506 <     * in scan and scanWhileJoining.
506 >     * in locallyDeqTask and elsewhere.
507       *
508       * @return a task, or null if none or contended
509       */
# Line 470 | Line 513 | public class ForkJoinWorkerThread extend
513          int b, i;
514          if ((b = base) != sp &&
515              (q = queue) != null && // must read q after b
516 <            (t = q[i = (q.length - 1) & b]) != null &&
516 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
517              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
518              base = b + 1;
519              return t;
# Line 490 | Line 533 | public class ForkJoinWorkerThread extend
533              ForkJoinTask<?> t;
534              int b, i;
535              while (sp != (b = base)) {
536 <                if ((t = q[i = (q.length - 1) & b]) != null &&
536 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
537                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
538                                                  t, null)) {
539                      base = b + 1;
# Line 504 | Line 547 | public class ForkJoinWorkerThread extend
547      /**
548       * Returns a popped task, or null if empty. Assumes active status.
549       * Called only by current thread. (Note: a specialization of this
550 <     * code appears in scanWhileJoining.)
550 >     * code appears in popWhileJoining.)
551       */
552      final ForkJoinTask<?> popTask() {
553          int s;
554 <        ForkJoinTask<?>[] q = queue;
555 <        if (q != null && (s = sp) != base) {
554 >        ForkJoinTask<?>[] q;
555 >        if (base != (s = sp) && (q = queue) != null) {
556              int i = (q.length - 1) & --s;
557              ForkJoinTask<?> t = q[i];
558              if (t != null && UNSAFE.compareAndSwapObject
# Line 522 | Line 565 | public class ForkJoinWorkerThread extend
565      }
566  
567      /**
568 <     * Specialized version of popTask to pop only if
569 <     * topmost element is the given task. Called only
570 <     * by current thread while active.
568 >     * Specialized version of popTask to pop only if topmost element
569 >     * is the given task. Called only by current thread while
570 >     * active.
571       *
572       * @param t the task. Caller must ensure non-null.
573       */
574      final boolean unpushTask(ForkJoinTask<?> t) {
575          int s;
576 <        ForkJoinTask<?>[] q = queue;
577 <        if (q != null && UNSAFE.compareAndSwapObject
578 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
576 >        ForkJoinTask<?>[] q;
577 >        if (base != (s = sp) && (q = queue) != null &&
578 >            UNSAFE.compareAndSwapObject
579 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
580              sp = s;
581              return true;
582          }
# Line 610 | Line 654 | public class ForkJoinWorkerThread extend
654       */
655      private ForkJoinTask<?> scan() {
656          ForkJoinPool p = pool;
657 <        ForkJoinWorkerThread[] ws = p.workers;
658 <        int n = ws.length;            // upper bound of #workers
659 <        boolean canSteal = active;    // shadow active status
660 <        int r = seed;                 // extract seed once
661 <        int k = r;                    // index: random if j<0 else step
662 <        for (int j = -n; j < n; ++j) {
663 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
664 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
665 <            if (v != null && v.base != v.sp) {
666 <                if (canSteal ||       // ensure active status
667 <                    (canSteal = active = p.tryIncrementActiveCount())) {
668 <                    int b, i;         // inlined specialization of deqTask
669 <                    ForkJoinTask<?> t;
670 <                    ForkJoinTask<?>[] q;
671 <                    if ((b = v.base) != v.sp &&  // recheck
672 <                        (q = v.queue) != null &&
673 <                        (t = q[i = (q.length - 1) & b]) != null &&
674 <                        UNSAFE.compareAndSwapObject
675 <                        (q, (i << qShift) + qBase, t, null)) {
676 <                        v.base = b + 1;
677 <                        seed = r;
678 <                        ++stealCount;
679 <                        return t;
657 >        ForkJoinWorkerThread[] ws;        // worker array
658 >        int n;                            // upper bound of #workers
659 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
660 >            boolean canSteal = active;    // shadow active status
661 >            int r = seed;                 // extract seed once
662 >            int mask = n - 1;
663 >            int j = -n;                   // loop counter
664 >            int k = r;                    // worker index, random if j < 0
665 >            for (;;) {
666 >                ForkJoinWorkerThread v = ws[k & mask];
667 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
668 >                if (v != null && v.base != v.sp) {
669 >                    if (canSteal ||       // ensure active status
670 >                        (canSteal = active = p.tryIncrementActiveCount())) {
671 >                        int b = v.base;   // inline specialized deqTask
672 >                        ForkJoinTask<?>[] q;
673 >                        if (b != v.sp && (q = v.queue) != null) {
674 >                            ForkJoinTask<?> t;
675 >                            int i = (q.length - 1) & b;
676 >                            long u = (i << qShift) + qBase; // raw offset
677 >                            if ((t = q[i]) != null && v.base == b &&
678 >                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
679 >                                stolen = t;
680 >                                v.base = b + 1;
681 >                                seed = r;
682 >                                ++stealCount;
683 >                                return t;
684 >                            }
685 >                        }
686                      }
687 +                    j = -n;
688 +                    k = r;                // restart on contention
689                  }
690 <                j = -n;               // reset on contention
690 >                else if (++j <= 0)
691 >                    k = r;
692 >                else if (j <= n)
693 >                    k += (n >>> 1) | 1;
694 >                else
695 >                    break;
696              }
640            k = j >= 0? k + ((n >>> 1) | 1) : r;
697          }
698          return null;
699      }
# Line 681 | Line 737 | public class ForkJoinWorkerThread extend
737      }
738  
739      /**
740 <     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
740 >     * Instrumented version of park used by ForkJoinPool.awaitEvent
741       */
742      final void doPark() {
743          ++parkCount;
# Line 695 | Line 751 | public class ForkJoinWorkerThread extend
751       * @return true if successful
752       */
753      final boolean tryUnsuspend() {
754 <        int s;
755 <        return (((s = runState) & SUSPENDED) != 0 &&
756 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
757 <                                         s & ~SUSPENDED));
754 >        int s = runState;
755 >        if ((s & SUSPENDED) != 0)
756 >            return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
757 >                                            s & ~SUSPENDED);
758 >        return false;
759      }
760  
761      /**
# Line 719 | Line 776 | public class ForkJoinWorkerThread extend
776                                                s | SUSPENDED))
777                  break;
778          }
779 +        boolean timed;
780 +        long nanos;
781 +        long startTime;
782 +        if (poolIndex < pool.parallelism) {
783 +            timed = false;
784 +            nanos = 0L;
785 +            startTime = 0L;
786 +        }
787 +        else {
788 +            timed = true;
789 +            nanos = SPARE_KEEPALIVE_NANOS;
790 +            startTime = System.nanoTime();
791 +        }
792 +        pool.accumulateStealCount(this);
793          lastEventCount = 0;      // reset upon resume
723        ForkJoinPool p = pool;
724        p.releaseWaiters();      // help others progress
725        p.accumulateStealCount(this);
794          interrupted();           // clear/ignore interrupts
727        if (poolIndex < p.getParallelism()) { // untimed wait
728            while ((runState & SUSPENDED) != 0)
729                doPark();
730            return true;
731        }
732        return timedSuspend();   // timed wait if apparently non-core
733    }
734
735    /**
736     * Blocks as spare until resumed or timed out
737     * @return false if trimmed
738     */
739    private boolean timedSuspend() {
740        long nanos = SPARE_KEEPALIVE_NANOS;
741        long startTime = System.nanoTime();
795          while ((runState & SUSPENDED) != 0) {
796              ++parkCount;
797 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
797 >            if (!timed)
798 >                LockSupport.park(this);
799 >            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
800                  LockSupport.parkNanos(this, nanos);
801              else { // try to trim on timeout
802                  int s = runState;
# Line 765 | Line 820 | public class ForkJoinWorkerThread extend
820      }
821  
822      /**
768     * Set locallyFifo mode. Called only by ForkJoinPool
769     */
770    final void setAsyncMode(boolean async) {
771        locallyFifo = async;
772    }
773
774    /**
823       * Removes and cancels all tasks in queue.  Can be called from any
824       * thread.
825       */
# Line 803 | Line 851 | public class ForkJoinWorkerThread extend
851      // Support methods for ForkJoinTask
852  
853      /**
854 +     * Possibly runs some tasks and/or blocks, until task is done.
855 +     *
856 +     * @param joinMe the task to join
857 +     */
858 +    final void joinTask(ForkJoinTask<?> joinMe) {
859 +        ForkJoinTask<?> prevJoining = joining;
860 +        joining = joinMe;
861 +        while (joinMe.status >= 0) {
862 +            int s = sp;
863 +            if (s == base) {
864 +                nonlocalJoinTask(joinMe);
865 +                break;
866 +            }
867 +            // process local task
868 +            ForkJoinTask<?> t;
869 +            ForkJoinTask<?>[] q = queue;
870 +            int i = (q.length - 1) & --s;
871 +            long u = (i << qShift) + qBase; // raw offset
872 +            if ((t = q[i]) != null &&
873 +                UNSAFE.compareAndSwapObject(q, u, t, null)) {
874 +                /*
875 +                 * This recheck (and similarly in nonlocalJoinTask)
876 +                 * handles cases where joinMe is independently
877 +                 * cancelled or forced even though there is other work
878 +                 * available. Back out of the pop by putting t back
879 +                 * into slot before we commit by setting sp.
880 +                 */
881 +                if (joinMe.status < 0) {
882 +                    UNSAFE.putObjectVolatile(q, u, t);
883 +                    break;
884 +                }
885 +                sp = s;
886 +                t.tryExec();
887 +            }
888 +        }
889 +        joining = prevJoining;
890 +    }
891 +
892 +    /**
893 +     * Tries to locate and help perform tasks for a stealer of the
894 +     * given task (or in turn one of its stealers), blocking (via
895 +     * pool.tryAwaitJoin) upon failure to find work.  Traces
896 +     * stolen->joining links looking for a thread working on
897 +     * a descendant of the given task and with a non-empty queue to
898 +     * steal back and execute tasks from. Inhibits mutual steal chains
899 +     * and scans on outer joins upon nesting to avoid unbounded
900 +     * growth.  Restarts search upon encountering inconsistencies.
901 +     * Tries to block if two passes agree that there are no remaining
902 +     * targets.
903 +     *
904 +     * @param joinMe the task to join
905 +     */
906 +    private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
907 +        ForkJoinPool p = pool;
908 +        int scans = p.parallelism;       // give up if too many retries
909 +        ForkJoinTask<?> bottom = null;   // target seen when can't descend
910 +        restart: while (joinMe.status >= 0) {
911 +            ForkJoinTask<?> target = null;
912 +            ForkJoinTask<?> next = joinMe;
913 +            while (scans >= 0 && next != null) {
914 +                --scans;
915 +                target = next;
916 +                next = null;
917 +                ForkJoinWorkerThread v = null;
918 +                ForkJoinWorkerThread[] ws = p.workers;
919 +                int n = ws.length;
920 +                for (int j = 0; j < n; ++j) {
921 +                    ForkJoinWorkerThread w = ws[j];
922 +                    if (w != null && w.stolen == target) {
923 +                        v = w;
924 +                        break;
925 +                    }
926 +                }
927 +                if (v != null && v != this) {
928 +                    ForkJoinTask<?> prevStolen = stolen;
929 +                    int b;
930 +                    ForkJoinTask<?>[] q;
931 +                    while ((b = v.base) != v.sp && (q = v.queue) != null) {
932 +                        int i = (q.length - 1) & b;
933 +                        long u = (i << qShift) + qBase;
934 +                        ForkJoinTask<?> t = q[i];
935 +                        if (target.status < 0)
936 +                            continue restart;
937 +                        if (t != null && v.base == b &&
938 +                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
939 +                            if (joinMe.status < 0) {
940 +                                UNSAFE.putObjectVolatile(q, u, t);
941 +                                return; // back out
942 +                            }
943 +                            stolen = t;
944 +                            v.base = b + 1;
945 +                            t.tryExec();
946 +                            stolen = prevStolen;
947 +                        }
948 +                        if (joinMe.status < 0)
949 +                            return;
950 +                    }
951 +                    next = v.joining;
952 +                }
953 +                if (target.status < 0)
954 +                    continue restart;  // inconsistent
955 +                if (joinMe.status < 0)
956 +                    return;
957 +            }
958 +
959 +            if (bottom != target)
960 +                bottom = target;    // recheck landing spot
961 +            else if (p.tryAwaitJoin(joinMe) < 0)
962 +                return;             // successfully blocked
963 +            Thread.yield();         // tame spin in case too many active
964 +        }
965 +    }
966 +
967 +    /**
968       * Returns an estimate of the number of tasks, offset by a
969       * function of number of idle workers.
970       *
# Line 859 | Line 1021 | public class ForkJoinWorkerThread extend
1021       * @return a task, if available
1022       */
1023      final ForkJoinTask<?> pollLocalTask() {
1024 <        while (base != sp) {
1024 >        while (sp != base) {
1025              if (active || (active = pool.tryIncrementActiveCount()))
1026                  return locallyFifo? locallyDeqTask() : popTask();
1027          }
# Line 877 | Line 1039 | public class ForkJoinWorkerThread extend
1039      }
1040  
1041      /**
880     * Returns a stolen task, if available, unless joinMe is done
881     *
882     * This method is intrinsically nonmodular. To maintain the
883     * property that tasks are never stolen if the awaited task is
884     * ready, we must interleave mechanics of scan with status
885     * checks. We rely here on the commit points of deq that allow us
886     * to cancel a steal even after CASing slot to null, but before
887     * adjusting base index: If, after the CAS, we see that joinMe is
888     * ready, we can back out by placing the task back into the slot,
889     * without adjusting index. The scan loop is otherwise the same as
890     * in scan.
891     *
892     * The outer loop cannot be allowed to run forever, because it
893     * could lead to a form of deadlock if all threads are executing
894     * this method. However, we must also be patient before giving up,
895     * to cope with GC stalls, transient high loads, etc. The loop
896     * terminates (causing caller to possibly block this thread and
897     * create a replacement) only after #workers clean sweeps during
898     * which all running threads are active.
899     */
900    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
901        int sweeps = 0;
902        int r = seed;
903        ForkJoinPool p = pool;
904        p.releaseWaiters(); // help other threads progress
905        while (joinMe.status >= 0) {
906            ForkJoinWorkerThread[] ws = p.workers;
907            int n = ws.length;
908            int k = r;
909            for (int j = -n; j < n; ++j) {
910                ForkJoinWorkerThread v = ws[k & (n - 1)];
911                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
912                if (v != null) {
913                    int b = v.base;
914                    ForkJoinTask<?>[] q;
915                    if (b != v.sp && (q = v.queue) != null) {
916                        int i = (q.length - 1) & b;
917                        ForkJoinTask<?> t = q[i];
918                        if (t != null) {
919                            if (joinMe.status < 0)
920                                return null;
921                            if (UNSAFE.compareAndSwapObject
922                                (q, (i << qShift) + qBase, t, null)) {
923                                if (joinMe.status < 0) {
924                                    writeSlot(q, i, t); // back out
925                                    return null;
926                                }
927                                v.base = b + 1;
928                                seed = r;
929                                ++stealCount;
930                                return t;
931                            }
932                        }
933                        sweeps = 0; // ensure rescan on contention
934                    }
935                }
936                k = j >= 0? k + ((n >>> 1) | 1) : r;
937                if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck
938                    return null;
939            }
940            if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n)
941                return null;
942        }
943        return null;
944    }
945
946    /**
1042       * Runs tasks until {@code pool.isQuiescent()}.
1043       */
1044      final void helpQuiescePool() {
1045          for (;;) {
1046              ForkJoinTask<?> t = pollLocalTask();
1047 <            if (t != null || (t = scan()) != null)
1047 >            if (t != null || (t = scan()) != null) {
1048                  t.tryExec();
1049 +                stolen = null;
1050 +            }
1051              else {
1052                  ForkJoinPool p = pool;
1053                  if (active) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines