ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.26
Committed: Sun Aug 2 11:54:31 2009 UTC (14 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.25: +2 -1 lines
Log Message:
Signature and documentation improvements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.18
9 dl 1.1 import java.util.concurrent.*;
10 jsr166 1.18
11     import java.util.Collection;
12 dl 1.1
13     /**
14 dl 1.2 * A thread managed by a {@link ForkJoinPool}. This class is
15     * subclassable solely for the sake of adding functionality -- there
16 jsr166 1.25 * are no overridable methods dealing with scheduling or execution.
17     * However, you can override initialization and termination methods
18     * surrounding the main task processing loop. If you do create such a
19     * subclass, you will also need to supply a custom {@link
20 dl 1.26 * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
21     * ForkJoinPool}.
22 jsr166 1.6 *
23 jsr166 1.13 * @since 1.7
24     * @author Doug Lea
25 dl 1.1 */
26     public class ForkJoinWorkerThread extends Thread {
27     /*
28     * Algorithm overview:
29     *
30     * 1. Work-Stealing: Work-stealing queues are special forms of
31     * Deques that support only three of the four possible
32     * end-operations -- push, pop, and deq (aka steal), and only do
33     * so under the constraints that push and pop are called only from
34     * the owning thread, while deq may be called from other threads.
35     * (If you are unfamiliar with them, you probably want to read
36     * Herlihy and Shavit's book "The Art of Multiprocessor
37     * programming", chapter 16 describing these in more detail before
38     * proceeding.) The main work-stealing queue design is roughly
39     * similar to "Dynamic Circular Work-Stealing Deque" by David
40     * Chase and Yossi Lev, SPAA 2005
41     * (http://research.sun.com/scalable/pubs/index.html). The main
42     * difference ultimately stems from gc requirements that we null
43     * out taken slots as soon as we can, to maintain as small a
44     * footprint as possible even in programs generating huge numbers
45     * of tasks. To accomplish this, we shift the CAS arbitrating pop
46     * vs deq (steal) from being on the indices ("base" and "sp") to
47     * the slots themselves (mainly via method "casSlotNull()"). So,
48 jsr166 1.10 * both a successful pop and deq mainly entail CAS'ing a non-null
49 dl 1.1 * slot to null. Because we rely on CASes of references, we do
50     * not need tag bits on base or sp. They are simple ints as used
51     * in any circular array-based queue (see for example ArrayDeque).
52     * Updates to the indices must still be ordered in a way that
53     * guarantees that (sp - base) > 0 means the queue is empty, but
54     * otherwise may err on the side of possibly making the queue
55     * appear nonempty when a push, pop, or deq have not fully
56     * committed. Note that this means that the deq operation,
57     * considered individually, is not wait-free. One thief cannot
58     * successfully continue until another in-progress one (or, if
59     * previously empty, a push) completes. However, in the
60 jsr166 1.9 * aggregate, we ensure at least probabilistic non-blockingness. If
61 dl 1.1 * an attempted steal fails, a thief always chooses a different
62     * random victim target to try next. So, in order for one thief to
63     * progress, it suffices for any in-progress deq or new push on
64     * any empty queue to complete. One reason this works well here is
65     * that apparently-nonempty often means soon-to-be-stealable,
66     * which gives threads a chance to activate if necessary before
67     * stealing (see below).
68     *
69 dl 1.23 * This approach also enables support for "async mode" where local
70     * task processing is in FIFO, not LIFO order; simply by using a
71     * version of deq rather than pop when locallyFifo is true (as set
72     * by the ForkJoinPool). This allows use in message-passing
73     * frameworks in which tasks are never joined.
74     *
75 dl 1.1 * Efficient implementation of this approach currently relies on
76     * an uncomfortable amount of "Unsafe" mechanics. To maintain
77     * correct orderings, reads and writes of variable base require
78     * volatile ordering. Variable sp does not require volatile write
79     * but needs cheaper store-ordering on writes. Because they are
80     * protected by volatile base reads, reads of the queue array and
81     * its slots do not need volatile load semantics, but writes (in
82     * push) require store order and CASes (in pop and deq) require
83     * (volatile) CAS semantics. Since these combinations aren't
84     * supported using ordinary volatiles, the only way to accomplish
85 jsr166 1.9 * these efficiently is to use direct Unsafe calls. (Using external
86 dl 1.1 * AtomicIntegers and AtomicReferenceArrays for the indices and
87     * array is significantly slower because of memory locality and
88     * indirection effects.) Further, performance on most platforms is
89     * very sensitive to placement and sizing of the (resizable) queue
90     * array. Even though these queues don't usually become all that
91     * big, the initial size must be large enough to counteract cache
92     * contention effects across multiple queues (especially in the
93     * presence of GC cardmarking). Also, to improve thread-locality,
94     * queues are currently initialized immediately after the thread
95     * gets the initial signal to start processing tasks. However,
96     * all queue-related methods except pushTask are written in a way
97     * that allows them to instead be lazily allocated and/or disposed
98     * of when empty. All together, these low-level implementation
99     * choices produce as much as a factor of 4 performance
100     * improvement compared to naive implementations, and enable the
101     * processing of billions of tasks per second, sometimes at the
102     * expense of ugliness.
103     *
104     * 2. Run control: The primary run control is based on a global
105     * counter (activeCount) held by the pool. It uses an algorithm
106     * similar to that in Herlihy and Shavit section 17.6 to cause
107     * threads to eventually block when all threads declare they are
108     * inactive. (See variable "scans".) For this to work, threads
109     * must be declared active when executing tasks, and before
110     * stealing a task. They must be inactive before blocking on the
111     * Pool Barrier (awaiting a new submission or other Pool
112     * event). In between, there is some free play which we take
113     * advantage of to avoid contention and rapid flickering of the
114     * global activeCount: If inactive, we activate only if a victim
115     * queue appears to be nonempty (see above). Similarly, a thread
116     * tries to inactivate only after a full scan of other threads.
117     * The net effect is that contention on activeCount is rarely a
118     * measurable performance issue. (There are also a few other cases
119     * where we scan for work rather than retry/block upon
120     * contention.)
121     *
122     * 3. Selection control. We maintain policy of always choosing to
123     * run local tasks rather than stealing, and always trying to
124     * steal tasks before trying to run a new submission. All steals
125     * are currently performed in randomly-chosen deq-order. It may be
126     * worthwhile to bias these with locality / anti-locality
127     * information, but doing this well probably requires more
128     * lower-level information from JVMs than currently provided.
129     */
130    
131     /**
132     * Capacity of work-stealing queue array upon initialization.
133     * Must be a power of two. Initial size must be at least 2, but is
134     * padded to minimize cache effects.
135     */
136     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
137    
138     /**
139     * Maximum work-stealing queue array size. Must be less than or
140 dl 1.5 * equal to 1 << 28 to ensure lack of index wraparound. (This
141     * is less than usual bounds, because we need leftshift by 3
142     * to be in int range).
143 dl 1.1 */
144 dl 1.5 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
145 dl 1.1
146     /**
147 jsr166 1.16 * The pool this thread works in. Accessed directly by ForkJoinTask.
148 dl 1.1 */
149 dl 1.5 final ForkJoinPool pool;
150 dl 1.1
151     /**
152     * The work-stealing queue array. Size must be a power of two.
153 dl 1.5 * Initialized when thread starts, to improve memory locality.
154 dl 1.1 */
155     private ForkJoinTask<?>[] queue;
156    
157     /**
158     * Index (mod queue.length) of next queue slot to push to or pop
159     * from. It is written only by owner thread, via ordered store.
160     * Both sp and base are allowed to wrap around on overflow, but
161     * (sp - base) still estimates size.
162     */
163     private volatile int sp;
164    
165     /**
166     * Index (mod queue.length) of least valid queue slot, which is
167     * always the next position to steal from if nonempty.
168     */
169     private volatile int base;
170    
171     /**
172 dl 1.5 * Activity status. When true, this worker is considered active.
173     * Must be false upon construction. It must be true when executing
174     * tasks, and BEFORE stealing a task. It must be false before
175 jsr166 1.16 * calling pool.sync.
176 dl 1.1 */
177 dl 1.5 private boolean active;
178 dl 1.1
179     /**
180     * Run state of this worker. Supports simple versions of the usual
181     * shutdown/shutdownNow control.
182     */
183     private volatile int runState;
184    
185     /**
186 dl 1.5 * Seed for random number generator for choosing steal victims.
187     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
188 dl 1.1 */
189 dl 1.5 private int seed;
190 dl 1.1
191     /**
192     * Number of steals, transferred to pool when idle
193     */
194     private int stealCount;
195    
196     /**
197 dl 1.5 * Index of this worker in pool array. Set once by pool before
198 jsr166 1.16 * running, and accessed directly by pool during cleanup etc.
199 dl 1.1 */
200 dl 1.5 int poolIndex;
201 dl 1.1
202     /**
203 dl 1.5 * The last barrier event waited for. Accessed in pool callback
204     * methods, but only by current thread.
205 dl 1.1 */
206 dl 1.5 long lastEventCount;
207 dl 1.1
208     /**
209 dl 1.7 * True if use local fifo, not default lifo, for local polling
210     */
211     private boolean locallyFifo;
212    
213     /**
214 dl 1.1 * Creates a ForkJoinWorkerThread operating in the given pool.
215 jsr166 1.11 *
216 dl 1.1 * @param pool the pool this thread works in
217     * @throws NullPointerException if pool is null
218     */
219     protected ForkJoinWorkerThread(ForkJoinPool pool) {
220     if (pool == null) throw new NullPointerException();
221     this.pool = pool;
222 dl 1.5 // Note: poolIndex is set by pool during construction
223     // Remaining initialization is deferred to onStart
224 dl 1.1 }
225    
226 jsr166 1.6 // Public access methods
227 dl 1.2
228     /**
229 jsr166 1.11 * Returns the pool hosting this thread.
230     *
231 dl 1.2 * @return the pool
232     */
233 dl 1.4 public ForkJoinPool getPool() {
234     return pool;
235 dl 1.2 }
236    
237     /**
238 dl 1.4 * Returns the index number of this thread in its pool. The
239     * returned value ranges from zero to the maximum number of
240     * threads (minus one) that have ever been created in the pool.
241     * This method may be useful for applications that track status or
242 dl 1.5 * collect results per-worker rather than per-task.
243 jsr166 1.11 *
244     * @return the index number
245 dl 1.2 */
246 dl 1.4 public int getPoolIndex() {
247     return poolIndex;
248 dl 1.2 }
249    
250 dl 1.7 /**
251     * Establishes local first-in-first-out scheduling mode for forked
252 jsr166 1.8 * tasks that are never joined.
253 jsr166 1.11 *
254 dl 1.7 * @param async if true, use locally FIFO scheduling
255     */
256     void setAsyncMode(boolean async) {
257     locallyFifo = async;
258     }
259 dl 1.5
260     // Runstate management
261    
262     // Runstate values. Order matters
263     private static final int RUNNING = 0;
264     private static final int SHUTDOWN = 1;
265     private static final int TERMINATING = 2;
266     private static final int TERMINATED = 3;
267    
268     final boolean isShutdown() { return runState >= SHUTDOWN; }
269     final boolean isTerminating() { return runState >= TERMINATING; }
270     final boolean isTerminated() { return runState == TERMINATED; }
271     final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
272     final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
273    
274     /**
275 jsr166 1.21 * Transitions to at least the given state.
276     *
277     * @return {@code true} if not already at least at given state
278 dl 1.5 */
279     private boolean transitionRunStateTo(int state) {
280     for (;;) {
281     int s = runState;
282     if (s >= state)
283     return false;
284 jsr166 1.12 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
285 dl 1.5 return true;
286     }
287     }
288    
289     /**
290 jsr166 1.11 * Tries to set status to active; fails on contention.
291 dl 1.5 */
292     private boolean tryActivate() {
293     if (!active) {
294     if (!pool.tryIncrementActiveCount())
295     return false;
296     active = true;
297     }
298     return true;
299     }
300    
301     /**
302 jsr166 1.16 * Tries to set status to inactive; fails on contention.
303 dl 1.5 */
304     private boolean tryInactivate() {
305     if (active) {
306     if (!pool.tryDecrementActiveCount())
307     return false;
308     active = false;
309     }
310     return true;
311     }
312    
313     /**
314 jsr166 1.11 * Computes next value for random victim probe. Scans don't
315 dl 1.5 * require a very high quality generator, but also not a crummy
316 jsr166 1.11 * one. Marsaglia xor-shift is cheap and works well.
317 dl 1.5 */
318     private static int xorShift(int r) {
319 dl 1.23 r ^= (r << 13);
320     r ^= (r >>> 17);
321 jsr166 1.24 return r ^ (r << 5);
322 dl 1.5 }
323    
324     // Lifecycle methods
325 dl 1.1
326     /**
327 dl 1.5 * This method is required to be public, but should never be
328     * called explicitly. It performs the main run loop to execute
329     * ForkJoinTasks.
330 dl 1.1 */
331 dl 1.5 public void run() {
332     Throwable exception = null;
333     try {
334     onStart();
335     pool.sync(this); // await first pool event
336     mainLoop();
337     } catch (Throwable ex) {
338     exception = ex;
339     } finally {
340     onTermination(exception);
341     }
342 dl 1.1 }
343    
344     /**
345 jsr166 1.11 * Executes tasks until shut down.
346 dl 1.1 */
347 dl 1.5 private void mainLoop() {
348     while (!isShutdown()) {
349     ForkJoinTask<?> t = pollTask();
350 jsr166 1.6 if (t != null || (t = pollSubmission()) != null)
351 dl 1.5 t.quietlyExec();
352     else if (tryInactivate())
353     pool.sync(this);
354     }
355 dl 1.1 }
356    
357 dl 1.5 /**
358     * Initializes internal state after construction but before
359     * processing any tasks. If you override this method, you must
360     * invoke super.onStart() at the beginning of the method.
361     * Initialization requires care: Most fields must have legal
362     * default values, to ensure that attempted accesses from other
363     * threads work correctly even before this thread starts
364     * processing tasks.
365     */
366     protected void onStart() {
367     // Allocate while starting to improve chances of thread-local
368     // isolation
369     queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
370     // Initial value of seed need not be especially random but
371     // should differ across workers and must be nonzero
372     int p = poolIndex + 1;
373     seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
374     }
375 dl 1.1
376     /**
377 jsr166 1.11 * Performs cleanup associated with termination of this worker
378 dl 1.5 * thread. If you override this method, you must invoke
379 jsr166 1.16 * {@code super.onTermination} at the end of the overridden method.
380 dl 1.5 *
381     * @param exception the exception causing this thread to abort due
382 jsr166 1.21 * to an unrecoverable error, or {@code null} if completed normally
383 dl 1.1 */
384 dl 1.5 protected void onTermination(Throwable exception) {
385     // Execute remaining local tasks unless aborting or terminating
386     while (exception == null && !pool.isTerminating() && base != sp) {
387     try {
388     ForkJoinTask<?> t = popTask();
389     if (t != null)
390     t.quietlyExec();
391 jsr166 1.16 } catch (Throwable ex) {
392 dl 1.5 exception = ex;
393     }
394     }
395     // Cancel other tasks, transition status, notify pool, and
396     // propagate exception to uncaught exception handler
397     try {
398 jsr166 1.16 do {} while (!tryInactivate()); // ensure inactive
399 jsr166 1.6 cancelTasks();
400 dl 1.5 runState = TERMINATED;
401     pool.workerTerminated(this);
402     } catch (Throwable ex) { // Shouldn't ever happen
403     if (exception == null) // but if so, at least rethrown
404     exception = ex;
405     } finally {
406     if (exception != null)
407     ForkJoinTask.rethrowException(exception);
408     }
409 dl 1.1 }
410    
411 jsr166 1.6 // Intrinsics-based support for queue operations.
412 dl 1.5
413 dl 1.1 /**
414 jsr166 1.10 * Adds in store-order the given task at given slot of q to null.
415     * Caller must ensure q is non-null and index is in range.
416 dl 1.1 */
417     private static void setSlot(ForkJoinTask<?>[] q, int i,
418 jsr166 1.14 ForkJoinTask<?> t) {
419 jsr166 1.12 UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
420 dl 1.1 }
421    
422     /**
423 jsr166 1.10 * CAS given slot of q to null. Caller must ensure q is non-null
424 dl 1.1 * and index is in range.
425     */
426     private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
427     ForkJoinTask<?> t) {
428 jsr166 1.12 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
429 dl 1.1 }
430    
431 dl 1.5 /**
432     * Sets sp in store-order.
433     */
434     private void storeSp(int s) {
435 jsr166 1.12 UNSAFE.putOrderedInt(this, spOffset, s);
436 dl 1.5 }
437    
438 dl 1.1 // Main queue methods
439    
440     /**
441     * Pushes a task. Called only by current thread.
442 jsr166 1.11 *
443 jsr166 1.10 * @param t the task. Caller must ensure non-null.
444 dl 1.1 */
445     final void pushTask(ForkJoinTask<?> t) {
446     ForkJoinTask<?>[] q = queue;
447     int mask = q.length - 1;
448     int s = sp;
449 dl 1.5 setSlot(q, s & mask, t);
450     storeSp(++s);
451 dl 1.1 if ((s -= base) == 1)
452 dl 1.5 pool.signalWork();
453 dl 1.1 else if (s >= mask)
454     growQueue();
455     }
456    
457     /**
458     * Tries to take a task from the base of the queue, failing if
459     * either empty or contended.
460 jsr166 1.11 *
461     * @return a task, or null if none or contended
462 dl 1.1 */
463 dl 1.7 final ForkJoinTask<?> deqTask() {
464 dl 1.5 ForkJoinTask<?> t;
465 dl 1.1 ForkJoinTask<?>[] q;
466     int i;
467     int b;
468     if (sp != (b = base) &&
469     (q = queue) != null && // must read q after b
470     (t = q[i = (q.length - 1) & b]) != null &&
471 dl 1.5 casSlotNull(q, i, t)) {
472 dl 1.1 base = b + 1;
473     return t;
474     }
475     return null;
476     }
477    
478     /**
479 dl 1.23 * Tries to take a task from the base of own queue, activating if
480     * necessary, failing only if empty. Called only by current thread.
481     *
482     * @return a task, or null if none
483     */
484     final ForkJoinTask<?> locallyDeqTask() {
485     int b;
486     while (sp != (b = base)) {
487     if (tryActivate()) {
488     ForkJoinTask<?>[] q = queue;
489     int i = (q.length - 1) & b;
490     ForkJoinTask<?> t = q[i];
491     if (t != null && casSlotNull(q, i, t)) {
492     base = b + 1;
493     return t;
494     }
495     }
496     }
497     return null;
498     }
499    
500     /**
501 dl 1.5 * Returns a popped task, or null if empty. Ensures active status
502 jsr166 1.10 * if non-null. Called only by current thread.
503 dl 1.1 */
504     final ForkJoinTask<?> popTask() {
505     int s = sp;
506 dl 1.5 while (s != base) {
507     if (tryActivate()) {
508     ForkJoinTask<?>[] q = queue;
509     int mask = q.length - 1;
510     int i = (s - 1) & mask;
511     ForkJoinTask<?> t = q[i];
512     if (t == null || !casSlotNull(q, i, t))
513     break;
514     storeSp(s - 1);
515     return t;
516     }
517 dl 1.1 }
518     return null;
519     }
520    
521     /**
522     * Specialized version of popTask to pop only if
523     * topmost element is the given task. Called only
524 dl 1.5 * by current thread while active.
525 jsr166 1.11 *
526     * @param t the task. Caller must ensure non-null.
527 dl 1.1 */
528     final boolean unpushTask(ForkJoinTask<?> t) {
529     ForkJoinTask<?>[] q = queue;
530     int mask = q.length - 1;
531     int s = sp - 1;
532 dl 1.5 if (casSlotNull(q, s & mask, t)) {
533     storeSp(s);
534 dl 1.1 return true;
535     }
536     return false;
537     }
538    
539     /**
540 dl 1.23 * Returns next task or null if empty or contended
541 dl 1.1 */
542 dl 1.2 final ForkJoinTask<?> peekTask() {
543 dl 1.1 ForkJoinTask<?>[] q = queue;
544 dl 1.7 if (q == null)
545     return null;
546     int mask = q.length - 1;
547 jsr166 1.15 int i = locallyFifo ? base : (sp - 1);
548 dl 1.7 return q[i & mask];
549 dl 1.1 }
550    
551     /**
552     * Doubles queue array size. Transfers elements by emulating
553     * steals (deqs) from old array and placing, oldest first, into
554     * new array.
555     */
556     private void growQueue() {
557     ForkJoinTask<?>[] oldQ = queue;
558     int oldSize = oldQ.length;
559     int newSize = oldSize << 1;
560     if (newSize > MAXIMUM_QUEUE_CAPACITY)
561     throw new RejectedExecutionException("Queue capacity exceeded");
562     ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
563    
564     int b = base;
565     int bf = b + oldSize;
566     int oldMask = oldSize - 1;
567     int newMask = newSize - 1;
568     do {
569     int oldIndex = b & oldMask;
570     ForkJoinTask<?> t = oldQ[oldIndex];
571     if (t != null && !casSlotNull(oldQ, oldIndex, t))
572     t = null;
573     setSlot(newQ, b & newMask, t);
574     } while (++b != bf);
575 dl 1.5 pool.signalWork();
576 dl 1.1 }
577    
578     /**
579 dl 1.5 * Tries to steal a task from another worker. Starts at a random
580     * index of workers array, and probes workers until finding one
581     * with non-empty queue or finding that all are empty. It
582     * randomly selects the first n probes. If these are empty, it
583     * resorts to a full circular traversal, which is necessary to
584     * accurately set active status by caller. Also restarts if pool
585     * events occurred since last scan, which forces refresh of
586     * workers array, in case barrier was associated with resize.
587 dl 1.1 *
588     * This method must be both fast and quiet -- usually avoiding
589     * memory accesses that could disrupt cache sharing etc other than
590     * those needed to check for and take tasks. This accounts for,
591     * among other things, updating random seed in place without
592 dl 1.5 * storing it until exit.
593 dl 1.1 *
594     * @return a task, or null if none found
595     */
596 dl 1.5 private ForkJoinTask<?> scan() {
597     ForkJoinTask<?> t = null;
598     int r = seed; // extract once to keep scan quiet
599     ForkJoinWorkerThread[] ws; // refreshed on outer loop
600     int mask; // must be power 2 minus 1 and > 0
601     outer:do {
602     if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
603 dl 1.1 int idx = r;
604 dl 1.5 int probes = ~mask; // use random index while negative
605 dl 1.1 for (;;) {
606 dl 1.5 r = xorShift(r); // update random seed
607     ForkJoinWorkerThread v = ws[mask & idx];
608     if (v == null || v.sp == v.base) {
609     if (probes <= mask)
610 jsr166 1.15 idx = (probes++ < 0) ? r : (idx + 1);
611 dl 1.5 else
612     break;
613 dl 1.1 }
614 dl 1.5 else if (!tryActivate() || (t = v.deqTask()) == null)
615     continue outer; // restart on contention
616 dl 1.1 else
617 dl 1.5 break outer;
618 dl 1.1 }
619     }
620 dl 1.5 } while (pool.hasNewSyncEvent(this)); // retry on pool events
621     seed = r;
622     return t;
623 dl 1.1 }
624    
625     /**
626 jsr166 1.11 * Gets and removes a local or stolen task.
627     *
628 dl 1.5 * @return a task, if available
629     */
630     final ForkJoinTask<?> pollTask() {
631 dl 1.23 ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
632 dl 1.5 if (t == null && (t = scan()) != null)
633     ++stealCount;
634     return t;
635 dl 1.1 }
636    
637     /**
638 jsr166 1.11 * Gets a local task.
639     *
640 dl 1.7 * @return a task, if available
641     */
642     final ForkJoinTask<?> pollLocalTask() {
643 dl 1.23 return locallyFifo ? locallyDeqTask() : popTask();
644 dl 1.7 }
645    
646     /**
647 dl 1.5 * Returns a pool submission, if one exists, activating first.
648 jsr166 1.11 *
649 dl 1.5 * @return a submission, if available
650 dl 1.1 */
651 dl 1.5 private ForkJoinTask<?> pollSubmission() {
652     ForkJoinPool p = pool;
653     while (p.hasQueuedSubmissions()) {
654     ForkJoinTask<?> t;
655     if (tryActivate() && (t = p.pollSubmission()) != null)
656     return t;
657 dl 1.1 }
658 dl 1.5 return null;
659 dl 1.1 }
660 dl 1.5
661     // Methods accessed only by Pool
662    
663 dl 1.1 /**
664 dl 1.5 * Removes and cancels all tasks in queue. Can be called from any
665     * thread.
666 dl 1.1 */
667 dl 1.5 final void cancelTasks() {
668 dl 1.3 ForkJoinTask<?> t;
669 dl 1.5 while (base != sp && (t = deqTask()) != null)
670     t.cancelIgnoringExceptions();
671 dl 1.1 }
672    
673     /**
674 jsr166 1.11 * Drains tasks to given collection c.
675     *
676 dl 1.7 * @return the number of tasks drained
677     */
678 dl 1.22 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
679 dl 1.7 int n = 0;
680     ForkJoinTask<?> t;
681     while (base != sp && (t = deqTask()) != null) {
682     c.add(t);
683     ++n;
684     }
685     return n;
686     }
687    
688     /**
689 jsr166 1.11 * Gets and clears steal count for accumulation by pool. Called
690 dl 1.5 * only when known to be idle (in pool.sync and termination).
691 dl 1.1 */
692 dl 1.5 final int getAndClearStealCount() {
693     int sc = stealCount;
694     stealCount = 0;
695     return sc;
696     }
697    
698     /**
699 jsr166 1.21 * Returns {@code true} if at least one worker in the given array
700     * appears to have at least one queued task.
701 jsr166 1.16 *
702 dl 1.5 * @param ws array of workers
703     */
704     static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
705     if (ws != null) {
706     int len = ws.length;
707     for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
708     for (int i = 0; i < len; ++i) {
709     ForkJoinWorkerThread w = ws[i];
710     if (w != null && w.sp != w.base)
711     return true;
712 dl 1.1 }
713     }
714     }
715 dl 1.5 return false;
716 dl 1.1 }
717    
718 dl 1.5 // Support methods for ForkJoinTask
719    
720 dl 1.1 /**
721 dl 1.2 * Returns an estimate of the number of tasks in the queue.
722     */
723     final int getQueueSize() {
724 jsr166 1.15 // suppress momentarily negative values
725     return Math.max(0, sp - base);
726 dl 1.2 }
727    
728     /**
729 dl 1.1 * Returns an estimate of the number of tasks, offset by a
730     * function of number of idle workers.
731     */
732     final int getEstimatedSurplusTaskCount() {
733 dl 1.3 // The halving approximates weighting idle vs non-idle workers
734 dl 1.1 return (sp - base) - (pool.getIdleThreadCount() >>> 1);
735     }
736    
737     /**
738 jsr166 1.16 * Scans, returning early if joinMe done.
739 dl 1.1 */
740 dl 1.5 final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
741     ForkJoinTask<?> t = pollTask();
742     if (t != null && joinMe.status < 0 && sp == base) {
743     pushTask(t); // unsteal if done and this task would be stealable
744     t = null;
745     }
746     return t;
747 dl 1.1 }
748 jsr166 1.6
749 dl 1.1 /**
750 jsr166 1.16 * Runs tasks until {@code pool.isQuiescent()}.
751 dl 1.1 */
752 dl 1.5 final void helpQuiescePool() {
753     for (;;) {
754     ForkJoinTask<?> t = pollTask();
755 jsr166 1.6 if (t != null)
756 dl 1.5 t.quietlyExec();
757     else if (tryInactivate() && pool.isQuiescent())
758     break;
759     }
760 jsr166 1.16 do {} while (!tryActivate()); // re-activate on exit
761 dl 1.1 }
762    
763 jsr166 1.20 // Unsafe mechanics
764    
765     private static final sun.misc.Unsafe UNSAFE = getUnsafe();
766     private static final long spOffset =
767     objectFieldOffset("sp", ForkJoinWorkerThread.class);
768     private static final long runStateOffset =
769     objectFieldOffset("runState", ForkJoinWorkerThread.class);
770     private static final long qBase;
771     private static final int qShift;
772    
773     static {
774     qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
775     int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
776     if ((s & (s-1)) != 0)
777     throw new Error("data type scale not a power of two");
778     qShift = 31 - Integer.numberOfLeadingZeros(s);
779     }
780    
781     private static long objectFieldOffset(String field, Class<?> klazz) {
782     try {
783     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
784     } catch (NoSuchFieldException e) {
785     // Convert Exception to corresponding Error
786     NoSuchFieldError error = new NoSuchFieldError(field);
787     error.initCause(e);
788     throw error;
789     }
790     }
791    
792     /**
793     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
794     * Replace with a simple call to Unsafe.getUnsafe when integrating
795     * into a jdk.
796     *
797     * @return a sun.misc.Unsafe
798     */
799 jsr166 1.17 private static sun.misc.Unsafe getUnsafe() {
800 jsr166 1.6 try {
801 jsr166 1.17 return sun.misc.Unsafe.getUnsafe();
802 jsr166 1.6 } catch (SecurityException se) {
803     try {
804     return java.security.AccessController.doPrivileged
805 jsr166 1.20 (new java.security
806     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
807 jsr166 1.17 public sun.misc.Unsafe run() throws Exception {
808 jsr166 1.20 java.lang.reflect.Field f = sun.misc
809     .Unsafe.class.getDeclaredField("theUnsafe");
810     f.setAccessible(true);
811     return (sun.misc.Unsafe) f.get(null);
812 jsr166 1.6 }});
813     } catch (java.security.PrivilegedActionException e) {
814 jsr166 1.17 throw new RuntimeException("Could not initialize intrinsics",
815     e.getCause());
816 jsr166 1.6 }
817     }
818     }
819 dl 1.1 }