ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.33 by dl, Thu May 27 16:46:49 2010 UTC vs.
Revision 1.53 by dl, Sun Oct 24 19:37:26 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 83 | Line 82 | public class ForkJoinWorkerThread extend
82       * by the ForkJoinPool).  This allows use in message-passing
83       * frameworks in which tasks are never joined.
84       *
85 <     * Efficient implementation of this approach currently relies on
86 <     * an uncomfortable amount of "Unsafe" mechanics. To maintain
85 >     * When a worker would otherwise be blocked waiting to join a
86 >     * task, it first tries a form of linear helping: Each worker
87 >     * records (in field currentSteal) the most recent task it stole
88 >     * from some other worker. Plus, it records (in field currentJoin)
89 >     * the task it is currently actively joining. Method joinTask uses
90 >     * these markers to try to find a worker to help (i.e., steal back
91 >     * a task from and execute it) that could hasten completion of the
92 >     * actively joined task. In essence, the joiner executes a task
93 >     * that would be on its own local deque had the to-be-joined task
94 >     * not been stolen. This may be seen as a conservative variant of
95 >     * the approach in Wagner & Calder "Leapfrogging: a portable
96 >     * technique for implementing efficient futures" SIGPLAN Notices,
97 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
98 >     * in that: (1) We only maintain dependency links across workers
99 >     * upon steals, rather than use per-task bookkeeping.  This may
100 >     * require a linear scan of workers array to locate stealers, but
101 >     * usually doesn't because stealers leave hints (that may become
102 >     * stale/wrong) of where to locate them. This isolates cost to
103 >     * when it is needed, rather than adding to per-task overhead.
104 >     * (2) It is "shallow", ignoring nesting and potentially cyclic
105 >     * mutual steals.  (3) It is intentionally racy: field currentJoin
106 >     * is updated only while actively joining, which means that we
107 >     * miss links in the chain during long-lived tasks, GC stalls etc
108 >     * (which is OK since blocking in such cases is usually a good
109 >     * idea).  (4) We bound the number of attempts to find work (see
110 >     * MAX_HELP_DEPTH) and fall back to suspending the worker and if
111 >     * necessary replacing it with a spare (see
112 >     * ForkJoinPool.awaitJoin).
113 >     *
114 >     * Efficient implementation of these algorithms currently relies
115 >     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
116       * correct orderings, reads and writes of variable base require
117       * volatile ordering.  Variable sp does not require volatile
118       * writes but still needs store-ordering, which we accomplish by
119       * pre-incrementing sp before filling the slot with an ordered
120       * store.  (Pre-incrementing also enables backouts used in
121 <     * scanWhileJoining.)  Because they are protected by volatile base
122 <     * reads, reads of the queue array and its slots by other threads
123 <     * do not need volatile load semantics, but writes (in push)
124 <     * require store order and CASes (in pop and deq) require
125 <     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
126 <     * algorithm has similar properties, but without support for
127 <     * nulling slots.)  Since these combinations aren't supported
128 <     * using ordinary volatiles, the only way to accomplish these
129 <     * efficiently is to use direct Unsafe calls. (Using external
130 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
131 <     * array is significantly slower because of memory locality and
132 <     * indirection effects.)
121 >     * joinTask.)  Because they are protected by volatile base reads,
122 >     * reads of the queue array and its slots by other threads do not
123 >     * need volatile load semantics, but writes (in push) require
124 >     * store order and CASes (in pop and deq) require (volatile) CAS
125 >     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
126 >     * similar properties, but without support for nulling slots.)
127 >     * Since these combinations aren't supported using ordinary
128 >     * volatiles, the only way to accomplish these efficiently is to
129 >     * use direct Unsafe calls. (Using external AtomicIntegers and
130 >     * AtomicReferenceArrays for the indices and array is
131 >     * significantly slower because of memory locality and indirection
132 >     * effects.)
133       *
134       * Further, performance on most platforms is very sensitive to
135       * placement and sizing of the (resizable) queue array.  Even
# Line 126 | Line 154 | public class ForkJoinWorkerThread extend
154      private static final Random seedGenerator = new Random();
155  
156      /**
157 <     * The timeout value for suspending spares. Spare workers that
158 <     * remain unsignalled for more than this time may be trimmed
159 <     * (killed and removed from pool).  Since our goal is to avoid
160 <     * long-term thread buildup, the exact value of timeout does not
161 <     * matter too much so long as it avoids most false-alarm timeouts
134 <     * under GC stalls or momentarily high system load.
157 >     * The maximum stolen->joining link depth allowed in helpJoinTask.
158 >     * Depths for legitimate chains are unbounded, but we use a fixed
159 >     * constant to avoid (otherwise unchecked) cycles and bound
160 >     * staleness of traversal parameters at the expense of sometimes
161 >     * blocking when we could be helping.
162       */
163 <    private static final long SPARE_KEEPALIVE_NANOS =
137 <        5L * 1000L * 1000L * 1000L; // 5 secs
163 >    private static final int MAX_HELP_DEPTH = 8;
164  
165      /**
166       * Capacity of work-stealing queue array upon initialization.
167 <     * Must be a power of two. Initial size must be at least 2, but is
167 >     * Must be a power of two. Initial size must be at least 4, but is
168       * padded to minimize cache effects.
169       */
170      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
171  
172      /**
173       * Maximum work-stealing queue array size.  Must be less than or
174 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
175 <     * is less than usual bounds, because we need leftshift by 3
176 <     * to be in int range).
174 >     * equal to 1 << (31 - width of array entry) to ensure lack of
175 >     * index wraparound. The value is set in the static block
176 >     * at the end of this file after obtaining width.
177       */
178 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
178 >    private static final int MAXIMUM_QUEUE_CAPACITY;
179  
180      /**
181       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 178 | Line 204 | public class ForkJoinWorkerThread extend
204      private int sp;
205  
206      /**
207 +     * The index of most recent stealer, used as a hint to avoid
208 +     * traversal in method helpJoinTask. This is only a hint because a
209 +     * worker might have had multiple steals and this only holds one
210 +     * of them (usually the most current). Declared non-volatile,
211 +     * relying on other prevailing sync to keep reasonably current.
212 +     */
213 +    private int stealHint;
214 +
215 +    /**
216       * Run state of this worker. In addition to the usual run levels,
217       * tracks if this worker is suspended as a spare, and if it was
218       * killed (trimmed) while suspended. However, "active" status is
219 <     * maintained separately.
219 >     * maintained separately and modified only in conjunction with
220 >     * CASes of the pool's runState (which are currently sadly
221 >     * manually inlined for performance.)  Accessed directly by pool
222 >     * to simplify checks for normal (zero) status.
223       */
224 <    private volatile int runState;
224 >    volatile int runState;
225  
226      private static final int TERMINATING = 0x01;
227      private static final int TERMINATED  = 0x02;
# Line 191 | Line 229 | public class ForkJoinWorkerThread extend
229      private static final int TRIMMED     = 0x08; // killed while suspended
230  
231      /**
232 <     * Number of LockSupport.park calls to block this thread for
233 <     * suspension or event waits. Used for internal instrumention;
196 <     * currently not exported but included because volatile write upon
197 <     * park also provides a workaround for a JVM bug.
198 <     */
199 <    private volatile int parkCount;
200 <
201 <    /**
202 <     * Number of steals, transferred and reset in pool callbacks pool
203 <     * when idle Accessed directly by pool.
232 >     * Number of steals. Directly accessed (and reset) by
233 >     * pool.tryAccumulateStealCount when idle.
234       */
235      int stealCount;
236  
# Line 218 | Line 248 | public class ForkJoinWorkerThread extend
248  
249      /**
250       * True if use local fifo, not default lifo, for local polling.
251 <     * Shadows value from ForkJoinPool, which resets it if changed
222 <     * pool-wide.
251 >     * Shadows value from ForkJoinPool.
252       */
253 <    private boolean locallyFifo;
253 >    private final boolean locallyFifo;
254  
255      /**
256       * Index of this worker in pool array. Set once by pool before
# Line 237 | Line 266 | public class ForkJoinWorkerThread extend
266      int lastEventCount;
267  
268      /**
269 <     * Encoded index and event count of next event waiter. Used only
270 <     * by ForkJoinPool for managing event waiters.
269 >     * Encoded index and event count of next event waiter. Accessed
270 >     * only by ForkJoinPool for managing event waiters.
271       */
272      volatile long nextWaiter;
273  
274      /**
275 +     * Number of times this thread suspended as spare. Accessed only
276 +     * by pool.
277 +     */
278 +    int spareCount;
279 +
280 +    /**
281 +     * Encoded index and count of next spare waiter. Accessed only
282 +     * by ForkJoinPool for managing spares.
283 +     */
284 +    volatile int nextSpare;
285 +
286 +    /**
287 +     * The task currently being joined, set only when actively trying
288 +     * to help other stealers in helpJoinTask. Written only by this
289 +     * thread, but read by others.
290 +     */
291 +    private volatile ForkJoinTask<?> currentJoin;
292 +
293 +    /**
294 +     * The task most recently stolen from another worker (or
295 +     * submission queue).  Written only by this thread, but read by
296 +     * others.
297 +     */
298 +    private volatile ForkJoinTask<?> currentSteal;
299 +
300 +    /**
301       * Creates a ForkJoinWorkerThread operating in the given pool.
302       *
303       * @param pool the pool this thread works in
304       * @throws NullPointerException if pool is null
305       */
306      protected ForkJoinWorkerThread(ForkJoinPool pool) {
252        if (pool == null) throw new NullPointerException();
307          this.pool = pool;
308 +        this.locallyFifo = pool.locallyFifo;
309 +        setDaemon(true);
310          // To avoid exposing construction details to subclasses,
311          // remaining initialization is in start() and onStart()
312      }
313  
314      /**
315 <     * Performs additional initialization and starts this thread
315 >     * Performs additional initialization and starts this thread.
316       */
317 <    final void start(int poolIndex, boolean locallyFifo,
262 <                     UncaughtExceptionHandler ueh) {
317 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
318          this.poolIndex = poolIndex;
264        this.locallyFifo = locallyFifo;
319          if (ueh != null)
320              setUncaughtExceptionHandler(ueh);
267        setDaemon(true);
321          start();
322      }
323  
# Line 295 | Line 348 | public class ForkJoinWorkerThread extend
348      /**
349       * Initializes internal state after construction but before
350       * processing any tasks. If you override this method, you must
351 <     * invoke super.onStart() at the beginning of the method.
351 >     * invoke @code{super.onStart()} at the beginning of the method.
352       * Initialization requires care: Most fields must have legal
353       * default values, to ensure that attempted accesses from other
354       * threads work correctly even before this thread starts
# Line 305 | Line 358 | public class ForkJoinWorkerThread extend
358          int rs = seedGenerator.nextInt();
359          seed = rs == 0? 1 : rs; // seed must be nonzero
360  
361 <        // Allocate name string and queue array in this thread
361 >        // Allocate name string and arrays in this thread
362          String pid = Integer.toString(pool.getPoolNumber());
363          String wid = Integer.toString(poolIndex);
364          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 323 | Line 376 | public class ForkJoinWorkerThread extend
376       */
377      protected void onTermination(Throwable exception) {
378          try {
379 +            ForkJoinPool p = pool;
380 +            if (active) {
381 +                int a; // inline p.tryDecrementActiveCount
382 +                active = false;
383 +                do {} while (!UNSAFE.compareAndSwapInt
384 +                             (p, poolRunStateOffset, a = p.runState, a - 1));
385 +            }
386              cancelTasks();
387              setTerminated();
388 <            pool.workerTerminated(this);
388 >            p.workerTerminated(this);
389          } catch (Throwable ex) {        // Shouldn't ever happen
390              if (exception == null)      // but if so, at least rethrown
391                  exception = ex;
# Line 355 | Line 415 | public class ForkJoinWorkerThread extend
415      // helpers for run()
416  
417      /**
418 <     * Find and execute tasks and check status while running
418 >     * Finds and executes tasks, and checks status while running.
419       */
420      private void mainLoop() {
421 <        boolean ran = false; // true if ran task on previous step
421 >        boolean ran = false; // true if ran a task on last step
422          ForkJoinPool p = pool;
423          for (;;) {
424              p.preStep(this, ran);
425              if (runState != 0)
426 <                return;
427 <            ForkJoinTask<?> t; // try to get and run stolen or submitted task
368 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
369 <                t.tryExec();
370 <                if (base != sp)
371 <                    runLocalTasks();
372 <            }
426 >                break;
427 >            ran = tryExecSteal() || tryExecSubmission();
428          }
429      }
430  
431      /**
432 <     * Runs local tasks until queue is empty or shut down.  Call only
433 <     * while active.
432 >     * Tries to steal a task and execute it.
433 >     *
434 >     * @return true if ran a task
435       */
436 <    private void runLocalTasks() {
437 <        while (runState == 0) {
438 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
439 <            if (t != null)
440 <                t.tryExec();
441 <            else if (base == sp)
442 <                break;
436 >    private boolean tryExecSteal() {
437 >        ForkJoinTask<?> t;
438 >        if ((t = scan()) != null) {
439 >            t.quietlyExec();
440 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
441 >            if (sp != base)
442 >                execLocalTasks();
443 >            return true;
444          }
445 +        return false;
446      }
447  
448      /**
449 <     * If a submission exists, try to activate and take it
449 >     * If a submission exists, try to activate and run it.
450       *
451 <     * @return a task, if available
451 >     * @return true if ran a task
452       */
453 <    private ForkJoinTask<?> pollSubmission() {
453 >    private boolean tryExecSubmission() {
454          ForkJoinPool p = pool;
455 +        // This loop is needed in case attempt to activate fails, in
456 +        // which case we only retry if there still appears to be a
457 +        // submission.
458          while (p.hasQueuedSubmissions()) {
459 <            if (active || (active = p.tryIncrementActiveCount())) {
460 <                ForkJoinTask<?> t = p.pollSubmission();
461 <                return t != null ? t : scan(); // if missed, rescan
459 >            ForkJoinTask<?> t; int a;
460 >            if (active || // inline p.tryIncrementActiveCount
461 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
462 >                                                   a = p.runState, a + 1))) {
463 >                if ((t = p.pollSubmission()) != null) {
464 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
465 >                    t.quietlyExec();
466 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
467 >                    if (sp != base)
468 >                        execLocalTasks();
469 >                    return true;
470 >                }
471              }
472          }
473 <        return null;
473 >        return false;
474 >    }
475 >
476 >    /**
477 >     * Runs local tasks until queue is empty or shut down.  Call only
478 >     * while active.
479 >     */
480 >    private void execLocalTasks() {
481 >        while (runState == 0) {
482 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
483 >            if (t != null)
484 >                t.quietlyExec();
485 >            else if (sp == base)
486 >                break;
487 >        }
488      }
489  
490      /*
491       * Intrinsics-based atomic writes for queue slots. These are
492 <     * basically the same as methods in AtomicObjectArray, but
492 >     * basically the same as methods in AtomicReferenceArray, but
493       * specialized for (1) ForkJoinTask elements (2) requirement that
494       * nullness and bounds checks have already been performed by
495       * callers and (3) effective offsets are known not to overflow
496       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
497       * need corresponding version for reads: plain array reads are OK
498 <     * because they protected by other volatile reads and are
498 >     * because they are protected by other volatile reads and are
499       * confirmed by CASes.
500       *
501       * Most uses don't actually call these methods, but instead contain
# Line 435 | Line 519 | public class ForkJoinWorkerThread extend
519       * range. This method is used only during resets and backouts.
520       */
521      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
522 <                                              ForkJoinTask<?> t) {
522 >                                        ForkJoinTask<?> t) {
523          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
524      }
525  
# Line 447 | Line 531 | public class ForkJoinWorkerThread extend
531       * @param t the task. Caller must ensure non-null.
532       */
533      final void pushTask(ForkJoinTask<?> t) {
450        int s;
534          ForkJoinTask<?>[] q = queue;
535          int mask = q.length - 1; // implicit assert q != null
536 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
537 <        if ((s -= base) <= 0)
538 <            pool.signalWork();
539 <        else if (s + 1 >= mask)
540 <            growQueue();
536 >        int s = sp++;            // ok to increment sp before slot write
537 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
538 >        if ((s -= base) == 0)
539 >            pool.signalWork();   // was empty
540 >        else if (s == mask)
541 >            growQueue();         // is full
542      }
543  
544      /**
545       * Tries to take a task from the base of the queue, failing if
546       * empty or contended. Note: Specializations of this code appear
547 <     * in scan and scanWhileJoining.
547 >     * in locallyDeqTask and elsewhere.
548       *
549       * @return a task, or null if none or contended
550       */
# Line 468 | Line 552 | public class ForkJoinWorkerThread extend
552          ForkJoinTask<?> t;
553          ForkJoinTask<?>[] q;
554          int b, i;
555 <        if ((b = base) != sp &&
555 >        if (sp != (b = base) &&
556              (q = queue) != null && // must read q after b
557 <            (t = q[i = (q.length - 1) & b]) != null &&
557 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
558              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
559              base = b + 1;
560              return t;
# Line 480 | Line 564 | public class ForkJoinWorkerThread extend
564  
565      /**
566       * Tries to take a task from the base of own queue. Assumes active
567 <     * status.  Called only by current thread.
567 >     * status.  Called only by this thread.
568       *
569       * @return a task, or null if none
570       */
# Line 490 | Line 574 | public class ForkJoinWorkerThread extend
574              ForkJoinTask<?> t;
575              int b, i;
576              while (sp != (b = base)) {
577 <                if ((t = q[i = (q.length - 1) & b]) != null &&
577 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
578                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
579                                                  t, null)) {
580                      base = b + 1;
# Line 503 | Line 587 | public class ForkJoinWorkerThread extend
587  
588      /**
589       * Returns a popped task, or null if empty. Assumes active status.
590 <     * Called only by current thread. (Note: a specialization of this
507 <     * code appears in popWhileJoining.)
590 >     * Called only by this thread.
591       */
592 <    final ForkJoinTask<?> popTask() {
593 <        int s;
594 <        ForkJoinTask<?>[] q;
595 <        if (base != (s = sp) && (q = queue) != null) {
596 <            int i = (q.length - 1) & --s;
597 <            ForkJoinTask<?> t = q[i];
598 <            if (t != null && UNSAFE.compareAndSwapObject
599 <                (q, (i << qShift) + qBase, t, null)) {
600 <                sp = s;
601 <                return t;
592 >    private ForkJoinTask<?> popTask() {
593 >        ForkJoinTask<?>[] q = queue;
594 >        if (q != null) {
595 >            int s;
596 >            while ((s = sp) != base) {
597 >                int i = (q.length - 1) & --s;
598 >                long u = (i << qShift) + qBase; // raw offset
599 >                ForkJoinTask<?> t = q[i];
600 >                if (t == null)   // lost to stealer
601 >                    break;
602 >                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
603 >                    sp = s; // putOrderedInt may encourage more timely write
604 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
605 >                    return t;
606 >                }
607              }
608          }
609          return null;
# Line 523 | Line 611 | public class ForkJoinWorkerThread extend
611  
612      /**
613       * Specialized version of popTask to pop only if topmost element
614 <     * is the given task. Called only by current thread while
527 <     * active.
614 >     * is the given task. Called only by this thread while active.
615       *
616       * @param t the task. Caller must ensure non-null.
617       */
618      final boolean unpushTask(ForkJoinTask<?> t) {
619          int s;
620 <        ForkJoinTask<?>[] q;
621 <        if (base != (s = sp) && (q = queue) != null &&
620 >        ForkJoinTask<?>[] q = queue;
621 >        if ((s = sp) != base && q != null &&
622              UNSAFE.compareAndSwapObject
623              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
624 <            sp = s;
624 >            sp = s; // putOrderedInt may encourage more timely write
625 >            // UNSAFE.putOrderedInt(this, spOffset, s);
626              return true;
627          }
628          return false;
629      }
630  
631      /**
632 <     * Returns next task or null if empty or contended
632 >     * Returns next task, or null if empty or contended.
633       */
634      final ForkJoinTask<?> peekTask() {
635          ForkJoinTask<?>[] q = queue;
# Line 583 | Line 671 | public class ForkJoinWorkerThread extend
671       * Computes next value for random victim probe in scan().  Scans
672       * don't require a very high quality generator, but also not a
673       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
674 <     * Note: This is manually inlined in scan()
674 >     * Note: This is manually inlined in scan().
675       */
676      private static final int xorShift(int r) {
677          r ^= r << 13;
# Line 622 | Line 710 | public class ForkJoinWorkerThread extend
710              for (;;) {
711                  ForkJoinWorkerThread v = ws[k & mask];
712                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
713 <                if (v != null && v.base != v.sp) {
714 <                    int b, i;             // inline specialized deqTask
715 <                    ForkJoinTask<?>[] q;
716 <                    ForkJoinTask<?> t;
717 <                    if ((canSteal ||      // ensure active status
718 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
719 <                        (q = v.queue) != null &&
720 <                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
721 <                        UNSAFE.compareAndSwapObject
722 <                        (q, (i << qShift) + qBase, t, null)) {
723 <                        v.base = b + 1;
724 <                        seed = r;
725 <                        ++stealCount;
726 <                        return t;
713 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
714 >                if (v != null && (b = v.base) != v.sp &&
715 >                    (q = v.queue) != null) {
716 >                    int i = (q.length - 1) & b;
717 >                    long u = (i << qShift) + qBase; // raw offset
718 >                    int pid = poolIndex;
719 >                    if ((t = q[i]) != null) {
720 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
721 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
722 >                                                     a = p.runState, a + 1))
723 >                            canSteal = active = true;
724 >                        if (canSteal && v.base == b++ &&
725 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
726 >                            v.base = b;
727 >                            v.stealHint = pid;
728 >                            UNSAFE.putOrderedObject(this,
729 >                                                    currentStealOffset, t);
730 >                            seed = r;
731 >                            ++stealCount;
732 >                            return t;
733 >                        }
734                      }
735                      j = -n;
736                      k = r;                // restart on contention
# Line 654 | Line 749 | public class ForkJoinWorkerThread extend
749      // Run State management
750  
751      // status check methods used mainly by ForkJoinPool
752 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
753 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
754 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
755 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
752 >    final boolean isRunning()    { return runState == 0; }
753 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
754 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
755 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
756 >
757 >    final boolean isTerminating() {
758 >        if ((runState & TERMINATING) != 0)
759 >            return true;
760 >        if (pool.isAtLeastTerminating()) { // propagate pool state
761 >            shutdown();
762 >            return true;
763 >        }
764 >        return false;
765 >    }
766  
767      /**
768 <     * Sets state to TERMINATING, also resuming if suspended.
768 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
769 >     * to wake up if currently blocked. Callers must do so if desired.
770       */
771      final void shutdown() {
772          for (;;) {
773              int s = runState;
774 +            if ((s & (TERMINATING|TERMINATED)) != 0)
775 +                break;
776              if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
777                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
778                                               (s & ~SUSPENDED) |
779 <                                             (TRIMMED|TERMINATING))) {
672 <                    LockSupport.unpark(this);
779 >                                             (TRIMMED|TERMINATING)))
780                      break;
674                }
781              }
782              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
783                                                s | TERMINATING))
# Line 680 | Line 786 | public class ForkJoinWorkerThread extend
786      }
787  
788      /**
789 <     * Sets state to TERMINATED. Called only by this thread.
789 >     * Sets state to TERMINATED. Called only by onTermination().
790       */
791      private void setTerminated() {
792          int s;
# Line 690 | Line 796 | public class ForkJoinWorkerThread extend
796      }
797  
798      /**
693     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
694     */
695    final void doPark() {
696        ++parkCount;
697        LockSupport.park(this);
698    }
699
700    /**
799       * If suspended, tries to set status to unsuspended.
800 <     * Caller must unpark to actually resume
800 >     * Does NOT wake up if blocked.
801       *
802       * @return true if successful
803       */
804      final boolean tryUnsuspend() {
805          int s;
806 <        return (((s = runState) & SUSPENDED) != 0 &&
807 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
808 <                                         s & ~SUSPENDED));
806 >        while (((s = runState) & SUSPENDED) != 0) {
807 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
808 >                                         s & ~SUSPENDED))
809 >                return true;
810 >        }
811 >        return false;
812      }
813  
814      /**
815 <     * Sets suspended status and blocks as spare until resumed,
816 <     * shutdown, or timed out.
716 <     *
717 <     * @return false if trimmed
815 >     * Sets suspended status and blocks as spare until resumed
816 >     * or shutdown.
817       */
818 <    final boolean suspendAsSpare() {
819 <        for (;;) {               // set suspended unless terminating
818 >    final void suspendAsSpare() {
819 >        for (;;) {                  // set suspended unless terminating
820              int s = runState;
821              if ((s & TERMINATING) != 0) { // must kill
822                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
823                                               s | (TRIMMED | TERMINATING)))
824 <                    return false;
824 >                    return;
825              }
826              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
827                                                s | SUSPENDED))
828                  break;
829          }
731        lastEventCount = 0;      // reset upon resume
830          ForkJoinPool p = pool;
831 <        p.releaseWaiters();      // help others progress
734 <        p.accumulateStealCount(this);
735 <        interrupted();           // clear/ignore interrupts
736 <        if (poolIndex < p.getParallelism()) { // untimed wait
737 <            while ((runState & SUSPENDED) != 0)
738 <                doPark();
739 <            return true;
740 <        }
741 <        return timedSuspend();   // timed wait if apparently non-core
742 <    }
743 <
744 <    /**
745 <     * Blocks as spare until resumed or timed out
746 <     * @return false if trimmed
747 <     */
748 <    private boolean timedSuspend() {
749 <        long nanos = SPARE_KEEPALIVE_NANOS;
750 <        long startTime = System.nanoTime();
831 >        p.pushSpare(this);
832          while ((runState & SUSPENDED) != 0) {
833 <            ++parkCount;
834 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
835 <                LockSupport.parkNanos(this, nanos);
836 <            else { // try to trim on timeout
837 <                int s = runState;
757 <                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
758 <                                             (s & ~SUSPENDED) |
759 <                                             (TRIMMED|TERMINATING)))
760 <                    return false;
833 >            if (p.tryAccumulateStealCount(this)) {
834 >                interrupted();          // clear/ignore interrupts
835 >                if ((runState & SUSPENDED) == 0)
836 >                    break;
837 >                LockSupport.park(this);
838              }
839          }
763        return true;
840      }
841  
842      // Misc support methods for ForkJoinPool
# Line 770 | Line 846 | public class ForkJoinWorkerThread extend
846       * used by ForkJoinTask.
847       */
848      final int getQueueSize() {
849 <        return -base + sp;
850 <    }
775 <
776 <    /**
777 <     * Set locallyFifo mode. Called only by ForkJoinPool
778 <     */
779 <    final void setAsyncMode(boolean async) {
780 <        locallyFifo = async;
849 >        int n; // external calls must read base first
850 >        return (n = -base + sp) <= 0 ? 0 : n;
851      }
852  
853      /**
# Line 785 | Line 855 | public class ForkJoinWorkerThread extend
855       * thread.
856       */
857      final void cancelTasks() {
858 +        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
859 +        if (cj != null) {
860 +            currentJoin = null;
861 +            cj.cancelIgnoringExceptions();
862 +            try {
863 +                this.interrupt(); // awaken wait
864 +            } catch (SecurityException ignore) {
865 +            }
866 +        }
867 +        ForkJoinTask<?> cs = currentSteal;
868 +        if (cs != null) {
869 +            currentSteal = null;
870 +            cs.cancelIgnoringExceptions();
871 +        }
872          while (base != sp) {
873              ForkJoinTask<?> t = deqTask();
874              if (t != null)
# Line 812 | Line 896 | public class ForkJoinWorkerThread extend
896      // Support methods for ForkJoinTask
897  
898      /**
899 +     * Gets and removes a local task.
900 +     *
901 +     * @return a task, if available
902 +     */
903 +    final ForkJoinTask<?> pollLocalTask() {
904 +        ForkJoinPool p = pool;
905 +        while (sp != base) {
906 +            int a; // inline p.tryIncrementActiveCount
907 +            if (active ||
908 +                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
909 +                                                   a = p.runState, a + 1)))
910 +                return locallyFifo ? locallyDeqTask() : popTask();
911 +        }
912 +        return null;
913 +    }
914 +
915 +    /**
916 +     * Gets and removes a local or stolen task.
917 +     *
918 +     * @return a task, if available
919 +     */
920 +    final ForkJoinTask<?> pollTask() {
921 +        ForkJoinTask<?> t = pollLocalTask();
922 +        if (t == null) {
923 +            t = scan();
924 +            // cannot retain/track/help steal
925 +            UNSAFE.putOrderedObject(this, currentStealOffset, null);
926 +        }
927 +        return t;
928 +    }
929 +
930 +    /**
931 +     * Possibly runs some tasks and/or blocks, until task is done.
932 +     *
933 +     * @param joinMe the task to join
934 +     * @param timed true if use timed wait
935 +     * @param nanos wait time if timed
936 +     */
937 +    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
938 +        // currentJoin only written by this thread; only need ordered store
939 +        ForkJoinTask<?> prevJoin = currentJoin;
940 +        UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
941 +        if (isTerminating())                // cancel if shutting down
942 +            joinMe.cancelIgnoringExceptions();
943 +        else {
944 +            if (sp != base)
945 +                localHelpJoinTask(joinMe);
946 +            if (joinMe.status >= 0)
947 +                pool.awaitJoin(joinMe, this, timed, nanos);
948 +        }
949 +        UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
950 +    }
951 +
952 +    /**
953 +     * Run tasks in local queue until given task is done.
954 +     *
955 +     * @param joinMe the task to join
956 +     */
957 +    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
958 +        int s;
959 +        ForkJoinTask<?>[] q;
960 +        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
961 +            int i = (q.length - 1) & --s;
962 +            long u = (i << qShift) + qBase; // raw offset
963 +            ForkJoinTask<?> t = q[i];
964 +            if (t == null)  // lost to a stealer
965 +                break;
966 +            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
967 +                /*
968 +                 * This recheck (and similarly in helpJoinTask)
969 +                 * handles cases where joinMe is independently
970 +                 * cancelled or forced even though there is other work
971 +                 * available. Back out of the pop by putting t back
972 +                 * into slot before we commit by writing sp.
973 +                 */
974 +                if (joinMe.status < 0) {
975 +                    UNSAFE.putObjectVolatile(q, u, t);
976 +                    break;
977 +                }
978 +                sp = s;
979 +                // UNSAFE.putOrderedInt(this, spOffset, s);
980 +                t.quietlyExec();
981 +            }
982 +        }
983 +    }
984 +
985 +    /**
986 +     * Tries to locate and help perform tasks for a stealer of the
987 +     * given task, or in turn one of its stealers.  Traces
988 +     * currentSteal->currentJoin links looking for a thread working on
989 +     * a descendant of the given task and with a non-empty queue to
990 +     * steal back and execute tasks from.
991 +     *
992 +     * The implementation is very branchy to cope with potential
993 +     * inconsistencies or loops encountering chains that are stale,
994 +     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
995 +     * of these cases are dealt with by just returning back to the
996 +     * caller, who is expected to retry if other join mechanisms also
997 +     * don't work out.
998 +     *
999 +     * @param joinMe the task to join
1000 +     */
1001 +    final void helpJoinTask(ForkJoinTask<?> joinMe) {
1002 +        ForkJoinWorkerThread[] ws;
1003 +        int n;
1004 +        if (joinMe.status < 0)                // already done
1005 +            return;
1006 +        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1007 +            return;                           // need at least 2 workers
1008 +
1009 +        ForkJoinTask<?> task = joinMe;        // base of chain
1010 +        ForkJoinWorkerThread thread = this;   // thread with stolen task
1011 +        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1012 +            // Try to find v, the stealer of task, by first using hint
1013 +            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1014 +            if (v == null || v.currentSteal != task) {
1015 +                for (int j = 0; ; ++j) {      // search array
1016 +                    if (j < n) {
1017 +                        ForkJoinTask<?> vs;
1018 +                        if ((v = ws[j]) != null &&
1019 +                            (vs = v.currentSteal) != null) {
1020 +                            if (joinMe.status < 0 || task.status < 0)
1021 +                                return;       // stale or done
1022 +                            if (vs == task) {
1023 +                                thread.stealHint = j;
1024 +                                break;        // save hint for next time
1025 +                            }
1026 +                        }
1027 +                    }
1028 +                    else
1029 +                        return;               // no stealer
1030 +                }
1031 +            }
1032 +            for (;;) { // Try to help v, using specialized form of deqTask
1033 +                if (joinMe.status < 0)
1034 +                    return;
1035 +                int b = v.base;
1036 +                ForkJoinTask<?>[] q = v.queue;
1037 +                if (b == v.sp || q == null)
1038 +                    break;
1039 +                int i = (q.length - 1) & b;
1040 +                long u = (i << qShift) + qBase;
1041 +                ForkJoinTask<?> t = q[i];
1042 +                int pid = poolIndex;
1043 +                ForkJoinTask<?> ps = currentSteal;
1044 +                if (task.status < 0)
1045 +                    return;                   // stale or done
1046 +                if (t != null && v.base == b++ &&
1047 +                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1048 +                    if (joinMe.status < 0) {
1049 +                        UNSAFE.putObjectVolatile(q, u, t);
1050 +                        return;               // back out on cancel
1051 +                    }
1052 +                    v.base = b;
1053 +                    v.stealHint = pid;
1054 +                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1055 +                    t.quietlyExec();
1056 +                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1057 +                }
1058 +            }
1059 +            // Try to descend to find v's stealer
1060 +            ForkJoinTask<?> next = v.currentJoin;
1061 +            if (task.status < 0 || next == null || next == task ||
1062 +                joinMe.status < 0)
1063 +                return;
1064 +            task = next;
1065 +            thread = v;
1066 +        }
1067 +    }
1068 +
1069 +    /**
1070 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1071       * Returns an estimate of the number of tasks, offset by a
1072       * function of number of idle workers.
1073       *
# Line 863 | Line 1119 | public class ForkJoinWorkerThread extend
1119      }
1120  
1121      /**
866     * Gets and removes a local task.
867     *
868     * @return a task, if available
869     */
870    final ForkJoinTask<?> pollLocalTask() {
871        while (base != sp) {
872            if (active || (active = pool.tryIncrementActiveCount()))
873                return locallyFifo? locallyDeqTask() : popTask();
874        }
875        return null;
876    }
877
878    /**
879     * Gets and removes a local or stolen task.
880     *
881     * @return a task, if available
882     */
883    final ForkJoinTask<?> pollTask() {
884        ForkJoinTask<?> t;
885        return (t = pollLocalTask()) != null ? t : scan();
886    }
887
888    /**
889     * Executes or processes other tasks awaiting the given task
890     * @return task completion status
891     */
892    final int execWhileJoining(ForkJoinTask<?> joinMe) {
893        int s;
894        while ((s = joinMe.status) >= 0) {
895            ForkJoinTask<?> t = base != sp?
896                popWhileJoining(joinMe) :
897                scanWhileJoining(joinMe);
898            if (t != null)
899                t.tryExec();
900        }
901        return s;
902    }
903
904    /**
905     * Returns or stolen task, if available, unless joinMe is done
906     *
907     * This method is intrinsically nonmodular. To maintain the
908     * property that tasks are never stolen if the awaited task is
909     * ready, we must interleave mechanics of scan with status
910     * checks. We rely here on the commit points of deq that allow us
911     * to cancel a steal even after CASing slot to null, but before
912     * adjusting base index: If, after the CAS, we see that joinMe is
913     * ready, we can back out by placing the task back into the slot,
914     * without adjusting index. The loop is otherwise a variant of the
915     * one in scan().
916     *
917     */
918    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
919        int r = seed;
920        ForkJoinPool p = pool;
921        ForkJoinWorkerThread[] ws;
922        int n;
923        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
924            int mask = n - 1;
925            int k = r;
926            boolean contended = false; // to retry loop if deq contends
927            for (int j = -n; j <= n; ++j) {
928                if (joinMe.status < 0)
929                    break outer;
930                int b;
931                ForkJoinTask<?>[] q;
932                ForkJoinWorkerThread v = ws[k & mask];
933                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
934                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
935                    int i = (q.length - 1) & b;
936                    ForkJoinTask<?> t = q[i];
937                    if (t != null && UNSAFE.compareAndSwapObject
938                        (q, (i << qShift) + qBase, t, null)) {
939                        if (joinMe.status >= 0) {
940                            v.base = b + 1;
941                            seed = r;
942                            ++stealCount;
943                            return t;
944                        }
945                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
946                        break outer; // back out
947                    }
948                    contended = true;
949                }
950                k = j < 0 ? r : (k + ((n >>> 1) | 1));
951            }
952            if (!contended && p.tryAwaitBusyJoin(joinMe))
953                break;
954        }
955        return null;
956    }
957
958    /**
959     * Version of popTask with join checks surrounding extraction.
960     * Uses the same backout strategy as helpJoinTask. Note that
961     * we ignore locallyFifo flag for local tasks here since helping
962     * joins only make sense in LIFO mode.
963     *
964     * @return a popped task, if available, unless joinMe is done
965     */
966    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
967        int s;
968        ForkJoinTask<?>[] q;
969        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
970            int i = (q.length - 1) & --s;
971            ForkJoinTask<?> t = q[i];
972            if (t != null && UNSAFE.compareAndSwapObject
973                (q, (i << qShift) + qBase, t, null)) {
974                if (joinMe.status >= 0) {
975                    sp = s;
976                    return t;
977                }
978                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
979                break;  // back out
980            }
981        }
982        return null;
983    }
984
985    /**
1122       * Runs tasks until {@code pool.isQuiescent()}.
1123       */
1124      final void helpQuiescePool() {
1125 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1126          for (;;) {
1127              ForkJoinTask<?> t = pollLocalTask();
1128              if (t != null || (t = scan()) != null)
1129 <                t.tryExec();
1129 >                t.quietlyExec();
1130              else {
1131                  ForkJoinPool p = pool;
1132 +                int a; // to inline CASes
1133                  if (active) {
1134 +                    if (!UNSAFE.compareAndSwapInt
1135 +                        (p, poolRunStateOffset, a = p.runState, a - 1))
1136 +                        continue;   // retry later
1137                      active = false; // inactivate
1138 <                    do {} while (!p.tryDecrementActiveCount());
1138 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1139                  }
1140                  if (p.isQuiescent()) {
1141                      active = true; // re-activate
1142 <                    do {} while (!p.tryIncrementActiveCount());
1142 >                    do {} while (!UNSAFE.compareAndSwapInt
1143 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1144                      return;
1145                  }
1146              }
# Line 1008 | Line 1150 | public class ForkJoinWorkerThread extend
1150      // Unsafe mechanics
1151  
1152      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1153 +    private static final long spOffset =
1154 +        objectFieldOffset("sp", ForkJoinWorkerThread.class);
1155      private static final long runStateOffset =
1156          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1157 +    private static final long currentJoinOffset =
1158 +        objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1159 +    private static final long currentStealOffset =
1160 +        objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1161      private static final long qBase =
1162          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1163 +    private static final long poolRunStateOffset = // to inline CAS
1164 +        objectFieldOffset("runState", ForkJoinPool.class);
1165 +
1166      private static final int qShift;
1167  
1168      static {
# Line 1019 | Line 1170 | public class ForkJoinWorkerThread extend
1170          if ((s & (s-1)) != 0)
1171              throw new Error("data type scale not a power of two");
1172          qShift = 31 - Integer.numberOfLeadingZeros(s);
1173 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1174      }
1175  
1176      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines