ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.36 by dl, Fri Jul 23 13:07:43 2010 UTC vs.
Revision 1.68 by dl, Thu Jan 26 00:08:13 2012 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
11 import java.util.Random;
12 import java.util.Collection;
13 import java.util.concurrent.locks.LockSupport;
14
9   /**
10 < * A thread managed by a {@link ForkJoinPool}.  This class is
11 < * subclassable solely for the sake of adding functionality -- there
12 < * are no overridable methods dealing with scheduling or execution.
13 < * However, you can override initialization and termination methods
14 < * surrounding the main task processing loop.  If you do create such a
15 < * subclass, you will also need to supply a custom {@link
16 < * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
17 < * ForkJoinPool}.
10 > * A thread managed by a {@link ForkJoinPool}, which executes
11 > * {@link ForkJoinTask}s.
12 > * This class is subclassable solely for the sake of adding
13 > * functionality -- there are no overridable methods dealing with
14 > * scheduling or execution.  However, you can override initialization
15 > * and termination methods surrounding the main task processing loop.
16 > * If you do create such a subclass, you will also need to supply a
17 > * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
18 > * in a {@code ForkJoinPool}.
19   *
20   * @since 1.7
21   * @author Doug Lea
22   */
23   public class ForkJoinWorkerThread extends Thread {
24      /*
30     * Overview:
31     *
25       * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
26 <     * ForkJoinTasks. This class includes bookkeeping in support of
27 <     * worker activation, suspension, and lifecycle control described
35 <     * in more detail in the internal documentation of class
36 <     * ForkJoinPool. And as described further below, this class also
37 <     * includes special-cased support for some ForkJoinTask
38 <     * methods. But the main mechanics involve work-stealing:
39 <     *
40 <     * Work-stealing queues are special forms of Deques that support
41 <     * only three of the four possible end-operations -- push, pop,
42 <     * and deq (aka steal), under the further constraints that push
43 <     * and pop are called only from the owning thread, while deq may
44 <     * be called from other threads.  (If you are unfamiliar with
45 <     * them, you probably want to read Herlihy and Shavit's book "The
46 <     * Art of Multiprocessor programming", chapter 16 describing these
47 <     * in more detail before proceeding.)  The main work-stealing
48 <     * queue design is roughly similar to those in the papers "Dynamic
49 <     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
50 <     * (http://research.sun.com/scalable/pubs/index.html) and
51 <     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
52 <     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
53 <     * The main differences ultimately stem from gc requirements that
54 <     * we null out taken slots as soon as we can, to maintain as small
55 <     * a footprint as possible even in programs generating huge
56 <     * numbers of tasks. To accomplish this, we shift the CAS
57 <     * arbitrating pop vs deq (steal) from being on the indices
58 <     * ("base" and "sp") to the slots themselves (mainly via method
59 <     * "casSlotNull()"). So, both a successful pop and deq mainly
60 <     * entail a CAS of a slot from non-null to null.  Because we rely
61 <     * on CASes of references, we do not need tag bits on base or sp.
62 <     * They are simple ints as used in any circular array-based queue
63 <     * (see for example ArrayDeque).  Updates to the indices must
64 <     * still be ordered in a way that guarantees that sp == base means
65 <     * the queue is empty, but otherwise may err on the side of
66 <     * possibly making the queue appear nonempty when a push, pop, or
67 <     * deq have not fully committed. Note that this means that the deq
68 <     * operation, considered individually, is not wait-free. One thief
69 <     * cannot successfully continue until another in-progress one (or,
70 <     * if previously empty, a push) completes.  However, in the
71 <     * aggregate, we ensure at least probabilistic non-blockingness.
72 <     * If an attempted steal fails, a thief always chooses a different
73 <     * random victim target to try next. So, in order for one thief to
74 <     * progress, it suffices for any in-progress deq or new push on
75 <     * any empty queue to complete. One reason this works well here is
76 <     * that apparently-nonempty often means soon-to-be-stealable,
77 <     * which gives threads a chance to set activation status if
78 <     * necessary before stealing.
79 <     *
80 <     * This approach also enables support for "async mode" where local
81 <     * task processing is in FIFO, not LIFO order; simply by using a
82 <     * version of deq rather than pop when locallyFifo is true (as set
83 <     * by the ForkJoinPool).  This allows use in message-passing
84 <     * frameworks in which tasks are never joined.
85 <     *
86 <     * When a worker would otherwise be blocked waiting to join a
87 <     * task, it first tries a form of linear helping: Each worker
88 <     * records (in field currentSteal) the most recent task it stole
89 <     * from some other worker. Plus, it records (in field currentJoin)
90 <     * the task it is currently actively joining. Method joinTask uses
91 <     * these markers to try to find a worker to help (i.e., steal back
92 <     * a task from and execute it) that could hasten completion of the
93 <     * actively joined task. In essence, the joiner executes a task
94 <     * that would be on its own local deque had the to-be-joined task
95 <     * not been stolen. This may be seen as a conservative variant of
96 <     * the approach in Wagner & Calder "Leapfrogging: a portable
97 <     * technique for implementing efficient futures" SIGPLAN Notices,
98 <     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 <     * in that: (1) We only maintain dependency links across workers
100 <     * upon steals, rather than maintain per-task bookkeeping.  This
101 <     * may require a linear scan of workers array to locate stealers,
102 <     * but usually doesn't because stealers leave hints (that may
103 <     * become stale/wrong) of where to locate the kathem. This
104 <     * isolates cost to when it is needed, rather than adding to
105 <     * per-task overhead.  (2) It is "shallow", ignoring nesting and
106 <     * potentially cyclic mutual steals.  (3) It is intentionally
107 <     * racy: field currentJoin is updated only while actively joining,
108 <     * which means that we could miss links in the chain during
109 <     * long-lived tasks, GC stalls etc.  (4) We bound the number of
110 <     * attempts to find work (see MAX_HELP_DEPTH) and fall back to
111 <     * suspending the worker and if necessary replacing it with a
112 <     * spare (see ForkJoinPool.tryAwaitJoin).
113 <     *
114 <     * Efficient implementation of these algorithms currently relies
115 <     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
116 <     * correct orderings, reads and writes of variable base require
117 <     * volatile ordering.  Variable sp does not require volatile
118 <     * writes but still needs store-ordering, which we accomplish by
119 <     * pre-incrementing sp before filling the slot with an ordered
120 <     * store.  (Pre-incrementing also enables backouts used in
121 <     * joinTask.)  Because they are protected by volatile base reads,
122 <     * reads of the queue array and its slots by other threads do not
123 <     * need volatile load semantics, but writes (in push) require
124 <     * store order and CASes (in pop and deq) require (volatile) CAS
125 <     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
126 <     * similar properties, but without support for nulling slots.)
127 <     * Since these combinations aren't supported using ordinary
128 <     * volatiles, the only way to accomplish these efficiently is to
129 <     * use direct Unsafe calls. (Using external AtomicIntegers and
130 <     * AtomicReferenceArrays for the indices and array is
131 <     * significantly slower because of memory locality and indirection
132 <     * effects.)
133 <     *
134 <     * Further, performance on most platforms is very sensitive to
135 <     * placement and sizing of the (resizable) queue array.  Even
136 <     * though these queues don't usually become all that big, the
137 <     * initial size must be large enough to counteract cache
138 <     * contention effects across multiple queues (especially in the
139 <     * presence of GC cardmarking). Also, to improve thread-locality,
140 <     * queues are initialized after starting.  All together, these
141 <     * low-level implementation choices produce as much as a factor of
142 <     * 4 performance improvement compared to naive implementations,
143 <     * and enable the processing of billions of tasks per second,
144 <     * sometimes at the expense of ugliness.
145 <     */
146 <
147 <    /**
148 <     * Generator for initial random seeds for random victim
149 <     * selection. This is used only to create initial seeds. Random
150 <     * steals use a cheaper xorshift generator per steal attempt. We
151 <     * expect only rare contention on seedGenerator, so just use a
152 <     * plain Random.
153 <     */
154 <    private static final Random seedGenerator = new Random();
155 <
156 <    /**
157 <     * The timeout value for suspending spares. Spare workers that
158 <     * remain unsignalled for more than this time may be trimmed
159 <     * (killed and removed from pool).  Since our goal is to avoid
160 <     * long-term thread buildup, the exact value of timeout does not
161 <     * matter too much so long as it avoids most false-alarm timeouts
162 <     * under GC stalls or momentarily high system load.
163 <     */
164 <    private static final long SPARE_KEEPALIVE_NANOS =
165 <        5L * 1000L * 1000L * 1000L; // 5 secs
166 <
167 <    /**
168 <     * The maximum stolen->joining link depth allowed in helpJoinTask.
169 <     * Depths for legitimate chains are unbounded, but we use a fixed
170 <     * constant to avoid (otherwise unchecked) cycles and bound
171 <     * staleness of traversal parameters at the expense of sometimes
172 <     * blocking when we could be helping.
173 <     */
174 <    private static final int MAX_HELP_DEPTH = 8;
175 <
176 <    /**
177 <     * Capacity of work-stealing queue array upon initialization.
178 <     * Must be a power of two. Initial size must be at least 4, but is
179 <     * padded to minimize cache effects.
180 <     */
181 <    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
182 <
183 <    /**
184 <     * Maximum work-stealing queue array size.  Must be less than or
185 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
186 <     * is less than usual bounds, because we need leftshift by 3
187 <     * to be in int range).
188 <     */
189 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
190 <
191 <    /**
192 <     * The pool this thread works in. Accessed directly by ForkJoinTask.
193 <     */
194 <    final ForkJoinPool pool;
195 <
196 <    /**
197 <     * The work-stealing queue array. Size must be a power of two.
198 <     * Initialized in onStart, to improve memory locality.
26 >     * ForkJoinTasks. For explanation, see the internal documentation
27 >     * of class ForkJoinPool.
28       */
200    private ForkJoinTask<?>[] queue;
201    
202    /**
203     * Index (mod queue.length) of least valid queue slot, which is
204     * always the next position to steal from if nonempty.
205     */
206    private volatile int base;
29  
30 <    /**
31 <     * Index (mod queue.length) of next queue slot to push to or pop
210 <     * from. It is written only by owner thread, and accessed by other
211 <     * threads only after reading (volatile) base.  Both sp and base
212 <     * are allowed to wrap around on overflow, but (sp - base) still
213 <     * estimates size.
214 <     */
215 <    private int sp;
216 <
217 <    /**
218 <     * The index of most recent stealer, used as a hint to avoid
219 <     * traversal in method helpJoinTask. This is only a hint because a
220 <     * worker might have had multiple steals and this only holds one
221 <     * of them (usually the most current). Declared non-volatile,
222 <     * relying on other prevailing sync to keep reasonably current.
223 <     */
224 <    private int stealHint;
225 <
226 <    /**
227 <     * Run state of this worker. In addition to the usual run levels,
228 <     * tracks if this worker is suspended as a spare, and if it was
229 <     * killed (trimmed) while suspended. However, "active" status is
230 <     * maintained separately.
231 <     */
232 <    private volatile int runState;
233 <
234 <    private static final int TERMINATING = 0x01;
235 <    private static final int TERMINATED  = 0x02;
236 <    private static final int SUSPENDED   = 0x04; // inactive spare
237 <    private static final int TRIMMED     = 0x08; // killed while suspended
238 <
239 <    /**
240 <     * Number of LockSupport.park calls to block this thread for
241 <     * suspension or event waits. Used for internal instrumention;
242 <     * currently not exported but included because volatile write upon
243 <     * park also provides a workaround for a JVM bug.
244 <     */
245 <    volatile int parkCount;
246 <
247 <    /**
248 <     * Number of steals, transferred and reset in pool callbacks pool
249 <     * when idle Accessed directly by pool.
250 <     */
251 <    int stealCount;
252 <
253 <    /**
254 <     * Seed for random number generator for choosing steal victims.
255 <     * Uses Marsaglia xorshift. Must be initialized as nonzero.
256 <     */
257 <    private int seed;
258 <
259 <
260 <    /**
261 <     * Activity status. When true, this worker is considered active.
262 <     * Accessed directly by pool.  Must be false upon construction.
263 <     */
264 <    boolean active;
265 <
266 <    /**
267 <     * True if use local fifo, not default lifo, for local polling.
268 <     * Shadows value from ForkJoinPool, which resets it if changed
269 <     * pool-wide.
270 <     */
271 <    private final boolean locallyFifo;
272 <    
273 <    /**
274 <     * Index of this worker in pool array. Set once by pool before
275 <     * running, and accessed directly by pool to locate this worker in
276 <     * its workers array.
277 <     */
278 <    int poolIndex;
279 <
280 <    /**
281 <     * The last pool event waited for. Accessed only by pool in
282 <     * callback methods invoked within this thread.
283 <     */
284 <    int lastEventCount;
285 <
286 <    /**
287 <     * Encoded index and event count of next event waiter. Used only
288 <     * by ForkJoinPool for managing event waiters.
289 <     */
290 <    volatile long nextWaiter;
291 <
292 <    /**
293 <     * The task currently being joined, set only when actively trying
294 <     * to helpStealer. Written only by current thread, but read by
295 <     * others.
296 <     */
297 <    private volatile ForkJoinTask<?> currentJoin;
298 <    
299 <    /**
300 <     * The task most recently stolen from another worker (or
301 <     * submission queue).  Not volatile because always read/written in
302 <     * presence of related volatiles in those cases where it matters.
303 <     */
304 <    private ForkJoinTask<?> currentSteal;
30 >    final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics
31 >    final ForkJoinPool pool;                // the pool this thread works in
32  
33      /**
34       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 310 | Line 37 | public class ForkJoinWorkerThread extend
37       * @throws NullPointerException if pool is null
38       */
39      protected ForkJoinWorkerThread(ForkJoinPool pool) {
40 <        this.pool = pool;
41 <        this.locallyFifo = pool.locallyFifo;
42 <        // To avoid exposing construction details to subclasses,
316 <        // remaining initialization is in start() and onStart()
317 <    }
318 <
319 <    /**
320 <     * Performs additional initialization and starts this thread
321 <     */
322 <    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
323 <        this.poolIndex = poolIndex;
40 >        super(pool.nextWorkerName());
41 >        setDaemon(true);
42 >        Thread.UncaughtExceptionHandler ueh = pool.ueh;
43          if (ueh != null)
44              setUncaughtExceptionHandler(ueh);
45 <        setDaemon(true);
46 <        start();
45 >        this.pool = pool;
46 >        this.workQueue = new ForkJoinPool.WorkQueue(this, pool.localMode);
47 >        pool.registerWorker(this);
48      }
49  
330    // Public/protected methods
331
50      /**
51       * Returns the pool hosting this thread.
52       *
# Line 348 | Line 66 | public class ForkJoinWorkerThread extend
66       * @return the index number
67       */
68      public int getPoolIndex() {
69 <        return poolIndex;
69 >        return workQueue.poolIndex;
70      }
71  
72      /**
73       * Initializes internal state after construction but before
74       * processing any tasks. If you override this method, you must
75 <     * invoke super.onStart() at the beginning of the method.
75 >     * invoke {@code super.onStart()} at the beginning of the method.
76       * Initialization requires care: Most fields must have legal
77       * default values, to ensure that attempted accesses from other
78       * threads work correctly even before this thread starts
79       * processing tasks.
80       */
81      protected void onStart() {
364        int rs = seedGenerator.nextInt();
365        seed = rs == 0? 1 : rs; // seed must be nonzero
366
367        // Allocate name string and arrays in this thread
368        String pid = Integer.toString(pool.getPoolNumber());
369        String wid = Integer.toString(poolIndex);
370        setName("ForkJoinPool-" + pid + "-worker-" + wid);
371
372        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
82      }
83  
84      /**
# Line 381 | Line 90 | public class ForkJoinWorkerThread extend
90       * to an unrecoverable error, or {@code null} if completed normally
91       */
92      protected void onTermination(Throwable exception) {
384        try {
385            cancelTasks();
386            setTerminated();
387            pool.workerTerminated(this);
388        } catch (Throwable ex) {        // Shouldn't ever happen
389            if (exception == null)      // but if so, at least rethrown
390                exception = ex;
391        } finally {
392            if (exception != null)
393                UNSAFE.throwException(exception);
394        }
93      }
94  
95      /**
96       * This method is required to be public, but should never be
97       * called explicitly. It performs the main run loop to execute
98 <     * ForkJoinTasks.
98 >     * {@link ForkJoinTask}s.
99       */
100      public void run() {
101          Throwable exception = null;
102          try {
103              onStart();
104 <            mainLoop();
104 >            pool.runWorker(this);
105          } catch (Throwable ex) {
106              exception = ex;
107          } finally {
410            onTermination(exception);
411        }
412    }
413
414    // helpers for run()
415
416    /**
417     * Find and execute tasks and check status while running
418     */
419    private void mainLoop() {
420        int emptyScans = 0; // consecutive times failed to find work
421        ForkJoinPool p = pool;
422        for (;;) {
423            p.preStep(this, emptyScans);
424            if (runState != 0)
425                return;
426            ForkJoinTask<?> t; // try to get and run stolen or submitted task
427            if ((t = scan()) != null || (t = pollSubmission()) != null) {
428                t.tryExec();
429                if (base != sp)
430                    runLocalTasks();
431                currentSteal = null;
432                emptyScans = 0;
433            }
434            else
435                ++emptyScans;
436        }
437    }
438
439    /**
440     * Runs local tasks until queue is empty or shut down.  Call only
441     * while active.
442     */
443    private void runLocalTasks() {
444        while (runState == 0) {
445            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
446            if (t != null)
447                t.tryExec();
448            else if (base == sp)
449                break;
450        }
451    }
452
453    /**
454     * If a submission exists, try to activate and take it
455     *
456     * @return a task, if available
457     */
458    private ForkJoinTask<?> pollSubmission() {
459        ForkJoinPool p = pool;
460        while (p.hasQueuedSubmissions()) {
461            if (active || (active = p.tryIncrementActiveCount())) {
462                ForkJoinTask<?> t = p.pollSubmission();
463                if (t != null) {
464                    currentSteal = t;
465                    return t;
466                }
467                return scan(); // if missed, rescan
468            }
469        }
470        return null;
471    }
472
473    /*
474     * Intrinsics-based atomic writes for queue slots. These are
475     * basically the same as methods in AtomicObjectArray, but
476     * specialized for (1) ForkJoinTask elements (2) requirement that
477     * nullness and bounds checks have already been performed by
478     * callers and (3) effective offsets are known not to overflow
479     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
480     * need corresponding version for reads: plain array reads are OK
481     * because they protected by other volatile reads and are
482     * confirmed by CASes.
483     *
484     * Most uses don't actually call these methods, but instead contain
485     * inlined forms that enable more predictable optimization.  We
486     * don't define the version of write used in pushTask at all, but
487     * instead inline there a store-fenced array slot write.
488     */
489
490    /**
491     * CASes slot i of array q from t to null. Caller must ensure q is
492     * non-null and index is in range.
493     */
494    private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
495                                             ForkJoinTask<?> t) {
496        return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
497    }
498
499    /**
500     * Performs a volatile write of the given task at given slot of
501     * array q.  Caller must ensure q is non-null and index is in
502     * range. This method is used only during resets and backouts.
503     */
504    private static final void writeSlot(ForkJoinTask<?>[] q, int i,
505                                              ForkJoinTask<?> t) {
506        UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
507    }
508
509    // queue methods
510
511    /**
512     * Pushes a task. Call only from this thread.
513     *
514     * @param t the task. Caller must ensure non-null.
515     */
516    final void pushTask(ForkJoinTask<?> t) {
517        ForkJoinTask<?>[] q = queue;
518        int mask = q.length - 1; // implicit assert q != null
519        int s = sp++;            // ok to increment sp before slot write
520        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
521        if ((s -= base) == 0)
522            pool.signalWork();   // was empty
523        else if (s == mask)
524            growQueue();         // is full
525    }
526
527    /**
528     * Tries to take a task from the base of the queue, failing if
529     * empty or contended. Note: Specializations of this code appear
530     * in locallyDeqTask and elsewhere.
531     *
532     * @return a task, or null if none or contended
533     */
534    final ForkJoinTask<?> deqTask() {
535        ForkJoinTask<?> t;
536        ForkJoinTask<?>[] q;
537        int b, i;
538        if ((b = base) != sp &&
539            (q = queue) != null && // must read q after b
540            (t = q[i = (q.length - 1) & b]) != null && base == b &&
541            UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
542            base = b + 1;
543            return t;
544        }
545        return null;
546    }
547
548    /**
549     * Tries to take a task from the base of own queue. Assumes active
550     * status.  Called only by current thread.
551     *
552     * @return a task, or null if none
553     */
554    final ForkJoinTask<?> locallyDeqTask() {
555        ForkJoinTask<?>[] q = queue;
556        if (q != null) {
557            ForkJoinTask<?> t;
558            int b, i;
559            while (sp != (b = base)) {
560                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
561                    UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
562                                                t, null)) {
563                    base = b + 1;
564                    return t;
565                }
566            }
567        }
568        return null;
569    }
570
571    /**
572     * Returns a popped task, or null if empty. Assumes active status.
573     * Called only by current thread.
574     */
575    final ForkJoinTask<?> popTask() {
576        int s;
577        ForkJoinTask<?>[] q;
578        if (base != (s = sp) && (q = queue) != null) {
579            int i = (q.length - 1) & --s;
580            ForkJoinTask<?> t = q[i];
581            if (t != null && UNSAFE.compareAndSwapObject
582                (q, (i << qShift) + qBase, t, null)) {
583                sp = s;
584                return t;
585            }
586        }
587        return null;
588    }
589
590    /**
591     * Specialized version of popTask to pop only if topmost element
592     * is the given task. Called only by current thread while
593     * active.
594     *
595     * @param t the task. Caller must ensure non-null.
596     */
597    final boolean unpushTask(ForkJoinTask<?> t) {
598        int s;
599        ForkJoinTask<?>[] q;
600        if (base != (s = sp) && (q = queue) != null &&
601            UNSAFE.compareAndSwapObject
602            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
603            sp = s;
604            return true;
605        }
606        return false;
607    }
608
609    /**
610     * Returns next task or null if empty or contended
611     */
612    final ForkJoinTask<?> peekTask() {
613        ForkJoinTask<?>[] q = queue;
614        if (q == null)
615            return null;
616        int mask = q.length - 1;
617        int i = locallyFifo ? base : (sp - 1);
618        return q[i & mask];
619    }
620
621    /**
622     * Doubles queue array size. Transfers elements by emulating
623     * steals (deqs) from old array and placing, oldest first, into
624     * new array.
625     */
626    private void growQueue() {
627        ForkJoinTask<?>[] oldQ = queue;
628        int oldSize = oldQ.length;
629        int newSize = oldSize << 1;
630        if (newSize > MAXIMUM_QUEUE_CAPACITY)
631            throw new RejectedExecutionException("Queue capacity exceeded");
632        ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
633
634        int b = base;
635        int bf = b + oldSize;
636        int oldMask = oldSize - 1;
637        int newMask = newSize - 1;
638        do {
639            int oldIndex = b & oldMask;
640            ForkJoinTask<?> t = oldQ[oldIndex];
641            if (t != null && !casSlotNull(oldQ, oldIndex, t))
642                t = null;
643            writeSlot(newQ, b & newMask, t);
644        } while (++b != bf);
645        pool.signalWork();
646    }
647
648    /**
649     * Computes next value for random victim probe in scan().  Scans
650     * don't require a very high quality generator, but also not a
651     * crummy one.  Marsaglia xor-shift is cheap and works well enough.
652     * Note: This is manually inlined in scan()
653     */
654    private static final int xorShift(int r) {
655        r ^= r << 13;
656        r ^= r >>> 17;
657        return r ^ (r << 5);
658    }
659
660    /**
661     * Tries to steal a task from another worker. Starts at a random
662     * index of workers array, and probes workers until finding one
663     * with non-empty queue or finding that all are empty.  It
664     * randomly selects the first n probes. If these are empty, it
665     * resorts to a circular sweep, which is necessary to accurately
666     * set active status. (The circular sweep uses steps of
667     * approximately half the array size plus 1, to avoid bias
668     * stemming from leftmost packing of the array in ForkJoinPool.)
669     *
670     * This method must be both fast and quiet -- usually avoiding
671     * memory accesses that could disrupt cache sharing etc other than
672     * those needed to check for and take tasks (or to activate if not
673     * already active). This accounts for, among other things,
674     * updating random seed in place without storing it until exit.
675     *
676     * @return a task, or null if none found
677     */
678    private ForkJoinTask<?> scan() {
679        ForkJoinPool p = pool;
680        ForkJoinWorkerThread[] ws;        // worker array
681        int n;                            // upper bound of #workers
682        if ((ws = p.workers) != null && (n = ws.length) > 1) {
683            boolean canSteal = active;    // shadow active status
684            int r = seed;                 // extract seed once
685            int mask = n - 1;
686            int j = -n;                   // loop counter
687            int k = r;                    // worker index, random if j < 0
688            for (;;) {
689                ForkJoinWorkerThread v = ws[k & mask];
690                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
691                if (v != null && v.base != v.sp) {
692                    if (canSteal ||       // ensure active status
693                        (canSteal = active = p.tryIncrementActiveCount())) {
694                        int b = v.base;   // inline specialized deqTask
695                        ForkJoinTask<?>[] q;
696                        if (b != v.sp && (q = v.queue) != null) {
697                            ForkJoinTask<?> t;
698                            int i = (q.length - 1) & b;
699                            long u = (i << qShift) + qBase; // raw offset
700                            if ((t = q[i]) != null && v.base == b &&
701                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
702                                currentSteal = t;
703                                v.stealHint = poolIndex;
704                                v.base = b + 1;
705                                seed = r;
706                                ++stealCount;
707                                return t;
708                            }
709                        }
710                    }
711                    j = -n;
712                    k = r;                // restart on contention
713                }
714                else if (++j <= 0)
715                    k = r;
716                else if (j <= n)
717                    k += (n >>> 1) | 1;
718                else
719                    break;
720            }
721        }
722        return null;
723    }
724
725    // Run State management
726
727    // status check methods used mainly by ForkJoinPool
728    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
729    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
730    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
731    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
732
733    /**
734     * Sets state to TERMINATING, also resuming if suspended.
735     */
736    final void shutdown() {
737        for (;;) {
738            int s = runState;
739            if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
740                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
741                                             (s & ~SUSPENDED) |
742                                             (TRIMMED|TERMINATING))) {
743                    LockSupport.unpark(this);
744                    break;
745                }
746            }
747            else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
748                                              s | TERMINATING))
749                break;
750        }
751    }
752
753    /**
754     * Sets state to TERMINATED. Called only by this thread.
755     */
756    private void setTerminated() {
757        int s;
758        do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
759                                               s = runState,
760                                               s | (TERMINATING|TERMINATED)));
761    }
762
763    /**
764     * Instrumented version of park used by ForkJoinPool.awaitEvent
765     */
766    final void doPark() {
767        ++parkCount;
768        LockSupport.park(this);
769    }
770
771    /**
772     * If suspended, tries to set status to unsuspended and unparks.
773     *
774     * @return true if successful
775     */
776    final boolean tryResumeSpare() {
777        int s = runState;
778        if ((s & SUSPENDED) != 0 &&
779            UNSAFE.compareAndSwapInt(this, runStateOffset, s,
780                                     s & ~SUSPENDED)) {
781            LockSupport.unpark(this);
782            return true;
783        }
784        return false;
785    }
786
787    /**
788     * Sets suspended status and blocks as spare until resumed,
789     * shutdown, or timed out.
790     *
791     * @return false if trimmed
792     */
793    final boolean suspendAsSpare() {
794        for (;;) {               // set suspended unless terminating
795            int s = runState;
796            if ((s & TERMINATING) != 0) { // must kill
797                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
798                                             s | (TRIMMED | TERMINATING)))
799                    return false;
800            }
801            else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
802                                              s | SUSPENDED))
803                break;
804        }
805        boolean timed;
806        long nanos;
807        long startTime;
808        if (poolIndex < pool.parallelism) {
809            timed = false;
810            nanos = 0L;
811            startTime = 0L;
812        }
813        else {
814            timed = true;
815            nanos = SPARE_KEEPALIVE_NANOS;
816            startTime = System.nanoTime();
817        }
818        pool.accumulateStealCount(this);
819        lastEventCount = 0;      // reset upon resume
820        interrupted();           // clear/ignore interrupts
821        while ((runState & SUSPENDED) != 0) {
822            ++parkCount;
823            if (!timed)
824                LockSupport.park(this);
825            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
826                LockSupport.parkNanos(this, nanos);
827            else { // try to trim on timeout
828                int s = runState;
829                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
830                                             (s & ~SUSPENDED) |
831                                             (TRIMMED|TERMINATING)))
832                    return false;
833            }
834        }
835        return true;
836    }
837
838    // Misc support methods for ForkJoinPool
839
840    /**
841     * Returns an estimate of the number of tasks in the queue.  Also
842     * used by ForkJoinTask.
843     */
844    final int getQueueSize() {
845        return -base + sp;
846    }
847
848    /**
849     * Removes and cancels all tasks in queue.  Can be called from any
850     * thread.
851     */
852    final void cancelTasks() {
853        ForkJoinTask<?> cj = currentJoin; // try to kill live tasks
854        if (cj != null) {
855            currentJoin = null;
856            cj.cancelIgnoringExceptions();
857        }
858        ForkJoinTask<?> cs = currentSteal;
859        if (cs != null) {
860            currentSteal = null;
861            cs.cancelIgnoringExceptions();
862        }
863        while (base != sp) {
864            ForkJoinTask<?> t = deqTask();
865            if (t != null)
866                t.cancelIgnoringExceptions();
867        }
868    }
869
870    /**
871     * Drains tasks to given collection c.
872     *
873     * @return the number of tasks drained
874     */
875    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
876        int n = 0;
877        while (base != sp) {
878            ForkJoinTask<?> t = deqTask();
879            if (t != null) {
880                c.add(t);
881                ++n;
882            }
883        }
884        return n;
885    }
886
887    // Support methods for ForkJoinTask
888
889    /**
890     * Gets and removes a local task.
891     *
892     * @return a task, if available
893     */
894    final ForkJoinTask<?> pollLocalTask() {
895        while (sp != base) {
896            if (active || (active = pool.tryIncrementActiveCount()))
897                return locallyFifo? locallyDeqTask() : popTask();
898        }
899        return null;
900    }
901
902    /**
903     * Gets and removes a local or stolen task.
904     *
905     * @return a task, if available
906     */
907    final ForkJoinTask<?> pollTask() {
908        ForkJoinTask<?> t;
909        return (t = pollLocalTask()) != null ? t : scan();
910    }
911
912    /**
913     * Possibly runs some tasks and/or blocks, until task is done.
914     * The main body is basically a big spinloop, alternating between
915     * calls to helpJoinTask and pool.tryAwaitJoin with increased
916     * patience parameters until either the task is done without
917     * waiting, or we have, if necessary, created or resumed a
918     * replacement for this thread while it blocks.
919     *
920     * @param joinMe the task to join
921     * @return task status on exit
922     */
923    final int joinTask(ForkJoinTask<?> joinMe) {
924        int stat;
925        ForkJoinTask<?> prevJoin = currentJoin;
926        currentJoin = joinMe;
927        if ((stat = joinMe.status) >= 0 &&
928            (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
929            ForkJoinPool p = pool;
930            int helpRetries = 2;     // initial patience values
931            int awaitRetries = -1;   // -1 is sentinel for replace-check only
932            do {
933                helpJoinTask(joinMe, helpRetries);
934                if ((stat = joinMe.status) < 0)
935                    break;
936                boolean busy = p.tryAwaitJoin(joinMe, awaitRetries);
937                if ((stat = joinMe.status) < 0)
938                    break;
939                if (awaitRetries == -1)
940                    awaitRetries = 0;
941                else if (busy)
942                    ++awaitRetries;
943                if (helpRetries < p.parallelism)
944                    helpRetries <<= 1;
945                Thread.yield(); // tame unbounded loop
946            } while (joinMe.status >= 0);
947        }
948        currentJoin = prevJoin;
949        return stat;
950    }
951
952    /**
953     * Run tasks in local queue until given task is done.
954     *
955     * @param joinMe the task to join
956     * @return task status on exit
957     */
958    private int localHelpJoinTask(ForkJoinTask<?> joinMe) {
959        int stat, s;
960        ForkJoinTask<?>[] q;
961        while ((stat = joinMe.status) >= 0 &&
962               base != (s = sp) && (q = queue) != null) {
963            ForkJoinTask<?> t;
964            int i = (q.length - 1) & --s;
965            long u = (i << qShift) + qBase; // raw offset
966            if ((t = q[i]) != null &&
967                UNSAFE.compareAndSwapObject(q, u, t, null)) {
968                /*
969                 * This recheck (and similarly in helpJoinTask)
970                 * handles cases where joinMe is independently
971                 * cancelled or forced even though there is other work
972                 * available. Back out of the pop by putting t back
973                 * into slot before we commit by writing sp.
974                 */
975                if ((stat = joinMe.status) < 0) {
976                    UNSAFE.putObjectVolatile(q, u, t);
977                    break;
978                }
979                sp = s;
980                t.tryExec();
981            }
982        }
983        return stat;
984    }
985
986    /**
987     * Tries to locate and help perform tasks for a stealer of the
988     * given task, or in turn one of its stealers.  Traces
989     * currentSteal->currentJoin links looking for a thread working on
990     * a descendant of the given task and with a non-empty queue to
991     * steal back and execute tasks from. Restarts search upon
992     * encountering chains that are stale, unknown, or of length
993     * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles.
994     *
995     * The implementation is very branchy to cope with the restart
996     * cases.  Returns void, not task status (which must be reread by
997     * caller anyway) to slightly simplify control paths.
998     *
999     * @param joinMe the task to join
1000     */
1001    final void helpJoinTask(ForkJoinTask<?> joinMe, int retries) {
1002        ForkJoinWorkerThread[] ws = pool.workers;
1003        int n;
1004        if (ws == null || (n = ws.length) <= 1)
1005            return;                   // need at least 2 workers
1006
1007        restart:while (joinMe.status >= 0 && --retries >= 0) {
1008            ForkJoinTask<?> task = joinMe;        // base of chain
1009            ForkJoinWorkerThread thread = this;   // thread with stolen task
1010            for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
1011                // Try to find v, the stealer of task, by first using hint
1012                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1013                if (v == null || v.currentSteal != task) {
1014                    for (int j = 0; ; ++j) {      // search array
1015                        if (task.status < 0 || j == n)
1016                            continue restart;     // stale or no stealer
1017                        if ((v = ws[j]) != null && v.currentSteal == task) {
1018                            thread.stealHint = j; // save for next time
1019                            break;
1020                        }
1021                    }
1022                }
1023                // Try to help v, using specialized form of deqTask
1024                int b;
1025                ForkJoinTask<?>[] q;
1026                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1027                    int i = (q.length - 1) & b;
1028                    long u = (i << qShift) + qBase;
1029                    ForkJoinTask<?> t = q[i];
1030                    if (task.status < 0)          // stale
1031                        continue restart;
1032                    if (v.base == b) {            // recheck after reading t
1033                        if (t == null)            // producer stalled
1034                            continue restart;     // retry via restart
1035                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1036                            if (joinMe.status < 0) {
1037                                UNSAFE.putObjectVolatile(q, u, t);
1038                                return;           // back out on cancel
1039                            }
1040                            ForkJoinTask<?> prevSteal = currentSteal;
1041                            currentSteal = t;
1042                            v.stealHint = poolIndex;
1043                            v.base = b + 1;
1044                            t.tryExec();
1045                            currentSteal = prevSteal;
1046                        }
1047                    }
1048                    if (joinMe.status < 0)
1049                        return;
1050                }
1051                // Try to descend to find v's stealer
1052                ForkJoinTask<?> next = v.currentJoin;
1053                if (next == null || task.status < 0)
1054                    continue restart;             // no descendent or stale
1055                if (joinMe.status < 0)
1056                    return;
1057                task = next;
1058                thread = v;
1059            }
1060        }
1061    }
1062
1063    /**
1064     * Returns an estimate of the number of tasks, offset by a
1065     * function of number of idle workers.
1066     *
1067     * This method provides a cheap heuristic guide for task
1068     * partitioning when programmers, frameworks, tools, or languages
1069     * have little or no idea about task granularity.  In essence by
1070     * offering this method, we ask users only about tradeoffs in
1071     * overhead vs expected throughput and its variance, rather than
1072     * how finely to partition tasks.
1073     *
1074     * In a steady state strict (tree-structured) computation, each
1075     * thread makes available for stealing enough tasks for other
1076     * threads to remain active. Inductively, if all threads play by
1077     * the same rules, each thread should make available only a
1078     * constant number of tasks.
1079     *
1080     * The minimum useful constant is just 1. But using a value of 1
1081     * would require immediate replenishment upon each steal to
1082     * maintain enough tasks, which is infeasible.  Further,
1083     * partitionings/granularities of offered tasks should minimize
1084     * steal rates, which in general means that threads nearer the top
1085     * of computation tree should generate more than those nearer the
1086     * bottom. In perfect steady state, each thread is at
1087     * approximately the same level of computation tree. However,
1088     * producing extra tasks amortizes the uncertainty of progress and
1089     * diffusion assumptions.
1090     *
1091     * So, users will want to use values larger, but not much larger
1092     * than 1 to both smooth over transient shortages and hedge
1093     * against uneven progress; as traded off against the cost of
1094     * extra task overhead. We leave the user to pick a threshold
1095     * value to compare with the results of this call to guide
1096     * decisions, but recommend values such as 3.
1097     *
1098     * When all threads are active, it is on average OK to estimate
1099     * surplus strictly locally. In steady-state, if one thread is
1100     * maintaining say 2 surplus tasks, then so are others. So we can
1101     * just use estimated queue length (although note that (sp - base)
1102     * can be an overestimate because of stealers lagging increments
1103     * of base).  However, this strategy alone leads to serious
1104     * mis-estimates in some non-steady-state conditions (ramp-up,
1105     * ramp-down, other stalls). We can detect many of these by
1106     * further considering the number of "idle" threads, that are
1107     * known to have zero queued tasks, so compensate by a factor of
1108     * (#idle/#active) threads.
1109     */
1110    final int getEstimatedSurplusTaskCount() {
1111        return sp - base - pool.idlePerActive();
1112    }
1113
1114    /**
1115     * Runs tasks until {@code pool.isQuiescent()}.
1116     */
1117    final void helpQuiescePool() {
1118        for (;;) {
1119            ForkJoinTask<?> t = pollLocalTask();
1120            if (t != null || (t = scan()) != null) {
1121                t.tryExec();
1122                currentSteal = null;
1123            }
1124            else {
1125                ForkJoinPool p = pool;
1126                if (active) {
1127                    active = false; // inactivate
1128                    do {} while (!p.tryDecrementActiveCount());
1129                }
1130                if (p.isQuiescent()) {
1131                    active = true; // re-activate
1132                    do {} while (!p.tryIncrementActiveCount());
1133                    return;
1134                }
1135            }
1136        }
1137    }
1138
1139    // Unsafe mechanics
1140
1141    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1142    private static final long runStateOffset =
1143        objectFieldOffset("runState", ForkJoinWorkerThread.class);
1144    private static final long qBase =
1145        UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1146    private static final long threadStatusOffset =
1147        objectFieldOffset("threadStatus", Thread.class);
1148    private static final int qShift;
1149
1150    static {
1151        int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1152        if ((s & (s-1)) != 0)
1153            throw new Error("data type scale not a power of two");
1154        qShift = 31 - Integer.numberOfLeadingZeros(s);
1155    }
1156
1157    private static long objectFieldOffset(String field, Class<?> klazz) {
1158        try {
1159            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1160        } catch (NoSuchFieldException e) {
1161            // Convert Exception to corresponding Error
1162            NoSuchFieldError error = new NoSuchFieldError(field);
1163            error.initCause(e);
1164            throw error;
1165        }
1166    }
1167
1168    /**
1169     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1170     * Replace with a simple call to Unsafe.getUnsafe when integrating
1171     * into a jdk.
1172     *
1173     * @return a sun.misc.Unsafe
1174     */
1175    private static sun.misc.Unsafe getUnsafe() {
1176        try {
1177            return sun.misc.Unsafe.getUnsafe();
1178        } catch (SecurityException se) {
108              try {
109 <                return java.security.AccessController.doPrivileged
110 <                    (new java.security
111 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
112 <                        public sun.misc.Unsafe run() throws Exception {
113 <                            java.lang.reflect.Field f = sun.misc
114 <                                .Unsafe.class.getDeclaredField("theUnsafe");
1186 <                            f.setAccessible(true);
1187 <                            return (sun.misc.Unsafe) f.get(null);
1188 <                        }});
1189 <            } catch (java.security.PrivilegedActionException e) {
1190 <                throw new RuntimeException("Could not initialize intrinsics",
1191 <                                           e.getCause());
109 >                onTermination(exception);
110 >            } catch (Throwable ex) {
111 >                if (exception == null)
112 >                    exception = ex;
113 >            } finally {
114 >                pool.deregisterWorker(this, exception);
115              }
116          }
117      }
118   }
119 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines