ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.22
Committed: Wed Jul 29 12:05:55 2009 UTC (14 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.21: +1 -1 lines
Log Message:
Improve drainTasksTo signature; update example

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.18
9 dl 1.1 import java.util.concurrent.*;
10 jsr166 1.18
11     import java.util.Collection;
12 dl 1.1
13     /**
14 dl 1.2 * A thread managed by a {@link ForkJoinPool}. This class is
15     * subclassable solely for the sake of adding functionality -- there
16     * are no overridable methods dealing with scheduling or
17     * execution. However, you can override initialization and termination
18 dl 1.7 * methods surrounding the main task processing loop. If you do
19     * create such a subclass, you will also need to supply a custom
20 dl 1.2 * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
21 jsr166 1.6 *
22 jsr166 1.13 * @since 1.7
23     * @author Doug Lea
24 dl 1.1 */
25     public class ForkJoinWorkerThread extends Thread {
26     /*
27     * Algorithm overview:
28     *
29     * 1. Work-Stealing: Work-stealing queues are special forms of
30     * Deques that support only three of the four possible
31     * end-operations -- push, pop, and deq (aka steal), and only do
32     * so under the constraints that push and pop are called only from
33     * the owning thread, while deq may be called from other threads.
34     * (If you are unfamiliar with them, you probably want to read
35     * Herlihy and Shavit's book "The Art of Multiprocessor
36     * programming", chapter 16 describing these in more detail before
37     * proceeding.) The main work-stealing queue design is roughly
38     * similar to "Dynamic Circular Work-Stealing Deque" by David
39     * Chase and Yossi Lev, SPAA 2005
40     * (http://research.sun.com/scalable/pubs/index.html). The main
41     * difference ultimately stems from gc requirements that we null
42     * out taken slots as soon as we can, to maintain as small a
43     * footprint as possible even in programs generating huge numbers
44     * of tasks. To accomplish this, we shift the CAS arbitrating pop
45     * vs deq (steal) from being on the indices ("base" and "sp") to
46     * the slots themselves (mainly via method "casSlotNull()"). So,
47 jsr166 1.10 * both a successful pop and deq mainly entail CAS'ing a non-null
48 dl 1.1 * slot to null. Because we rely on CASes of references, we do
49     * not need tag bits on base or sp. They are simple ints as used
50     * in any circular array-based queue (see for example ArrayDeque).
51     * Updates to the indices must still be ordered in a way that
52     * guarantees that (sp - base) > 0 means the queue is empty, but
53     * otherwise may err on the side of possibly making the queue
54     * appear nonempty when a push, pop, or deq have not fully
55     * committed. Note that this means that the deq operation,
56     * considered individually, is not wait-free. One thief cannot
57     * successfully continue until another in-progress one (or, if
58     * previously empty, a push) completes. However, in the
59 jsr166 1.9 * aggregate, we ensure at least probabilistic non-blockingness. If
60 dl 1.1 * an attempted steal fails, a thief always chooses a different
61     * random victim target to try next. So, in order for one thief to
62     * progress, it suffices for any in-progress deq or new push on
63     * any empty queue to complete. One reason this works well here is
64     * that apparently-nonempty often means soon-to-be-stealable,
65     * which gives threads a chance to activate if necessary before
66     * stealing (see below).
67     *
68     * Efficient implementation of this approach currently relies on
69     * an uncomfortable amount of "Unsafe" mechanics. To maintain
70     * correct orderings, reads and writes of variable base require
71     * volatile ordering. Variable sp does not require volatile write
72     * but needs cheaper store-ordering on writes. Because they are
73     * protected by volatile base reads, reads of the queue array and
74     * its slots do not need volatile load semantics, but writes (in
75     * push) require store order and CASes (in pop and deq) require
76     * (volatile) CAS semantics. Since these combinations aren't
77     * supported using ordinary volatiles, the only way to accomplish
78 jsr166 1.9 * these efficiently is to use direct Unsafe calls. (Using external
79 dl 1.1 * AtomicIntegers and AtomicReferenceArrays for the indices and
80     * array is significantly slower because of memory locality and
81     * indirection effects.) Further, performance on most platforms is
82     * very sensitive to placement and sizing of the (resizable) queue
83     * array. Even though these queues don't usually become all that
84     * big, the initial size must be large enough to counteract cache
85     * contention effects across multiple queues (especially in the
86     * presence of GC cardmarking). Also, to improve thread-locality,
87     * queues are currently initialized immediately after the thread
88     * gets the initial signal to start processing tasks. However,
89     * all queue-related methods except pushTask are written in a way
90     * that allows them to instead be lazily allocated and/or disposed
91     * of when empty. All together, these low-level implementation
92     * choices produce as much as a factor of 4 performance
93     * improvement compared to naive implementations, and enable the
94     * processing of billions of tasks per second, sometimes at the
95     * expense of ugliness.
96     *
97     * 2. Run control: The primary run control is based on a global
98     * counter (activeCount) held by the pool. It uses an algorithm
99     * similar to that in Herlihy and Shavit section 17.6 to cause
100     * threads to eventually block when all threads declare they are
101     * inactive. (See variable "scans".) For this to work, threads
102     * must be declared active when executing tasks, and before
103     * stealing a task. They must be inactive before blocking on the
104     * Pool Barrier (awaiting a new submission or other Pool
105     * event). In between, there is some free play which we take
106     * advantage of to avoid contention and rapid flickering of the
107     * global activeCount: If inactive, we activate only if a victim
108     * queue appears to be nonempty (see above). Similarly, a thread
109     * tries to inactivate only after a full scan of other threads.
110     * The net effect is that contention on activeCount is rarely a
111     * measurable performance issue. (There are also a few other cases
112     * where we scan for work rather than retry/block upon
113     * contention.)
114     *
115     * 3. Selection control. We maintain policy of always choosing to
116     * run local tasks rather than stealing, and always trying to
117     * steal tasks before trying to run a new submission. All steals
118     * are currently performed in randomly-chosen deq-order. It may be
119     * worthwhile to bias these with locality / anti-locality
120     * information, but doing this well probably requires more
121     * lower-level information from JVMs than currently provided.
122     */
123    
124     /**
125     * Capacity of work-stealing queue array upon initialization.
126     * Must be a power of two. Initial size must be at least 2, but is
127     * padded to minimize cache effects.
128     */
129     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
130    
131     /**
132     * Maximum work-stealing queue array size. Must be less than or
133 dl 1.5 * equal to 1 << 28 to ensure lack of index wraparound. (This
134     * is less than usual bounds, because we need leftshift by 3
135     * to be in int range).
136 dl 1.1 */
137 dl 1.5 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
138 dl 1.1
139     /**
140 jsr166 1.16 * The pool this thread works in. Accessed directly by ForkJoinTask.
141 dl 1.1 */
142 dl 1.5 final ForkJoinPool pool;
143 dl 1.1
144     /**
145     * The work-stealing queue array. Size must be a power of two.
146 dl 1.5 * Initialized when thread starts, to improve memory locality.
147 dl 1.1 */
148     private ForkJoinTask<?>[] queue;
149    
150     /**
151     * Index (mod queue.length) of next queue slot to push to or pop
152     * from. It is written only by owner thread, via ordered store.
153     * Both sp and base are allowed to wrap around on overflow, but
154     * (sp - base) still estimates size.
155     */
156     private volatile int sp;
157    
158     /**
159     * Index (mod queue.length) of least valid queue slot, which is
160     * always the next position to steal from if nonempty.
161     */
162     private volatile int base;
163    
164     /**
165 dl 1.5 * Activity status. When true, this worker is considered active.
166     * Must be false upon construction. It must be true when executing
167     * tasks, and BEFORE stealing a task. It must be false before
168 jsr166 1.16 * calling pool.sync.
169 dl 1.1 */
170 dl 1.5 private boolean active;
171 dl 1.1
172     /**
173     * Run state of this worker. Supports simple versions of the usual
174     * shutdown/shutdownNow control.
175     */
176     private volatile int runState;
177    
178     /**
179 dl 1.5 * Seed for random number generator for choosing steal victims.
180     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
181 dl 1.1 */
182 dl 1.5 private int seed;
183 dl 1.1
184     /**
185     * Number of steals, transferred to pool when idle
186     */
187     private int stealCount;
188    
189     /**
190 dl 1.5 * Index of this worker in pool array. Set once by pool before
191 jsr166 1.16 * running, and accessed directly by pool during cleanup etc.
192 dl 1.1 */
193 dl 1.5 int poolIndex;
194 dl 1.1
195     /**
196 dl 1.5 * The last barrier event waited for. Accessed in pool callback
197     * methods, but only by current thread.
198 dl 1.1 */
199 dl 1.5 long lastEventCount;
200 dl 1.1
201     /**
202 dl 1.7 * True if use local fifo, not default lifo, for local polling
203     */
204     private boolean locallyFifo;
205    
206     /**
207 dl 1.1 * Creates a ForkJoinWorkerThread operating in the given pool.
208 jsr166 1.11 *
209 dl 1.1 * @param pool the pool this thread works in
210     * @throws NullPointerException if pool is null
211     */
212     protected ForkJoinWorkerThread(ForkJoinPool pool) {
213     if (pool == null) throw new NullPointerException();
214     this.pool = pool;
215 dl 1.5 // Note: poolIndex is set by pool during construction
216     // Remaining initialization is deferred to onStart
217 dl 1.1 }
218    
219 jsr166 1.6 // Public access methods
220 dl 1.2
221     /**
222 jsr166 1.11 * Returns the pool hosting this thread.
223     *
224 dl 1.2 * @return the pool
225     */
226 dl 1.4 public ForkJoinPool getPool() {
227     return pool;
228 dl 1.2 }
229    
230     /**
231 dl 1.4 * Returns the index number of this thread in its pool. The
232     * returned value ranges from zero to the maximum number of
233     * threads (minus one) that have ever been created in the pool.
234     * This method may be useful for applications that track status or
235 dl 1.5 * collect results per-worker rather than per-task.
236 jsr166 1.11 *
237     * @return the index number
238 dl 1.2 */
239 dl 1.4 public int getPoolIndex() {
240     return poolIndex;
241 dl 1.2 }
242    
243 dl 1.7 /**
244     * Establishes local first-in-first-out scheduling mode for forked
245 jsr166 1.8 * tasks that are never joined.
246 jsr166 1.11 *
247 dl 1.7 * @param async if true, use locally FIFO scheduling
248     */
249     void setAsyncMode(boolean async) {
250     locallyFifo = async;
251     }
252 dl 1.5
253     // Runstate management
254    
255     // Runstate values. Order matters
256     private static final int RUNNING = 0;
257     private static final int SHUTDOWN = 1;
258     private static final int TERMINATING = 2;
259     private static final int TERMINATED = 3;
260    
261     final boolean isShutdown() { return runState >= SHUTDOWN; }
262     final boolean isTerminating() { return runState >= TERMINATING; }
263     final boolean isTerminated() { return runState == TERMINATED; }
264     final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
265     final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
266    
267     /**
268 jsr166 1.21 * Transitions to at least the given state.
269     *
270     * @return {@code true} if not already at least at given state
271 dl 1.5 */
272     private boolean transitionRunStateTo(int state) {
273     for (;;) {
274     int s = runState;
275     if (s >= state)
276     return false;
277 jsr166 1.12 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
278 dl 1.5 return true;
279     }
280     }
281    
282     /**
283 jsr166 1.11 * Tries to set status to active; fails on contention.
284 dl 1.5 */
285     private boolean tryActivate() {
286     if (!active) {
287     if (!pool.tryIncrementActiveCount())
288     return false;
289     active = true;
290     }
291     return true;
292     }
293    
294     /**
295 jsr166 1.16 * Tries to set status to inactive; fails on contention.
296 dl 1.5 */
297     private boolean tryInactivate() {
298     if (active) {
299     if (!pool.tryDecrementActiveCount())
300     return false;
301     active = false;
302     }
303     return true;
304     }
305    
306     /**
307 jsr166 1.11 * Computes next value for random victim probe. Scans don't
308 dl 1.5 * require a very high quality generator, but also not a crummy
309 jsr166 1.11 * one. Marsaglia xor-shift is cheap and works well.
310 dl 1.5 */
311     private static int xorShift(int r) {
312     r ^= r << 1;
313     r ^= r >>> 3;
314     r ^= r << 10;
315     return r;
316     }
317    
318     // Lifecycle methods
319 dl 1.1
320     /**
321 dl 1.5 * This method is required to be public, but should never be
322     * called explicitly. It performs the main run loop to execute
323     * ForkJoinTasks.
324 dl 1.1 */
325 dl 1.5 public void run() {
326     Throwable exception = null;
327     try {
328     onStart();
329     pool.sync(this); // await first pool event
330     mainLoop();
331     } catch (Throwable ex) {
332     exception = ex;
333     } finally {
334     onTermination(exception);
335     }
336 dl 1.1 }
337    
338     /**
339 jsr166 1.11 * Executes tasks until shut down.
340 dl 1.1 */
341 dl 1.5 private void mainLoop() {
342     while (!isShutdown()) {
343     ForkJoinTask<?> t = pollTask();
344 jsr166 1.6 if (t != null || (t = pollSubmission()) != null)
345 dl 1.5 t.quietlyExec();
346     else if (tryInactivate())
347     pool.sync(this);
348     }
349 dl 1.1 }
350    
351 dl 1.5 /**
352     * Initializes internal state after construction but before
353     * processing any tasks. If you override this method, you must
354     * invoke super.onStart() at the beginning of the method.
355     * Initialization requires care: Most fields must have legal
356     * default values, to ensure that attempted accesses from other
357     * threads work correctly even before this thread starts
358     * processing tasks.
359     */
360     protected void onStart() {
361     // Allocate while starting to improve chances of thread-local
362     // isolation
363     queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
364     // Initial value of seed need not be especially random but
365     // should differ across workers and must be nonzero
366     int p = poolIndex + 1;
367     seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
368     }
369 dl 1.1
370     /**
371 jsr166 1.11 * Performs cleanup associated with termination of this worker
372 dl 1.5 * thread. If you override this method, you must invoke
373 jsr166 1.16 * {@code super.onTermination} at the end of the overridden method.
374 dl 1.5 *
375     * @param exception the exception causing this thread to abort due
376 jsr166 1.21 * to an unrecoverable error, or {@code null} if completed normally
377 dl 1.1 */
378 dl 1.5 protected void onTermination(Throwable exception) {
379     // Execute remaining local tasks unless aborting or terminating
380     while (exception == null && !pool.isTerminating() && base != sp) {
381     try {
382     ForkJoinTask<?> t = popTask();
383     if (t != null)
384     t.quietlyExec();
385 jsr166 1.16 } catch (Throwable ex) {
386 dl 1.5 exception = ex;
387     }
388     }
389     // Cancel other tasks, transition status, notify pool, and
390     // propagate exception to uncaught exception handler
391     try {
392 jsr166 1.16 do {} while (!tryInactivate()); // ensure inactive
393 jsr166 1.6 cancelTasks();
394 dl 1.5 runState = TERMINATED;
395     pool.workerTerminated(this);
396     } catch (Throwable ex) { // Shouldn't ever happen
397     if (exception == null) // but if so, at least rethrown
398     exception = ex;
399     } finally {
400     if (exception != null)
401     ForkJoinTask.rethrowException(exception);
402     }
403 dl 1.1 }
404    
405 jsr166 1.6 // Intrinsics-based support for queue operations.
406 dl 1.5
407 dl 1.1 /**
408 jsr166 1.10 * Adds in store-order the given task at given slot of q to null.
409     * Caller must ensure q is non-null and index is in range.
410 dl 1.1 */
411     private static void setSlot(ForkJoinTask<?>[] q, int i,
412 jsr166 1.14 ForkJoinTask<?> t) {
413 jsr166 1.12 UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
414 dl 1.1 }
415    
416     /**
417 jsr166 1.10 * CAS given slot of q to null. Caller must ensure q is non-null
418 dl 1.1 * and index is in range.
419     */
420     private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
421     ForkJoinTask<?> t) {
422 jsr166 1.12 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
423 dl 1.1 }
424    
425 dl 1.5 /**
426     * Sets sp in store-order.
427     */
428     private void storeSp(int s) {
429 jsr166 1.12 UNSAFE.putOrderedInt(this, spOffset, s);
430 dl 1.5 }
431    
432 dl 1.1 // Main queue methods
433    
434     /**
435     * Pushes a task. Called only by current thread.
436 jsr166 1.11 *
437 jsr166 1.10 * @param t the task. Caller must ensure non-null.
438 dl 1.1 */
439     final void pushTask(ForkJoinTask<?> t) {
440     ForkJoinTask<?>[] q = queue;
441     int mask = q.length - 1;
442     int s = sp;
443 dl 1.5 setSlot(q, s & mask, t);
444     storeSp(++s);
445 dl 1.1 if ((s -= base) == 1)
446 dl 1.5 pool.signalWork();
447 dl 1.1 else if (s >= mask)
448     growQueue();
449     }
450    
451     /**
452     * Tries to take a task from the base of the queue, failing if
453     * either empty or contended.
454 jsr166 1.11 *
455     * @return a task, or null if none or contended
456 dl 1.1 */
457 dl 1.7 final ForkJoinTask<?> deqTask() {
458 dl 1.5 ForkJoinTask<?> t;
459 dl 1.1 ForkJoinTask<?>[] q;
460     int i;
461     int b;
462     if (sp != (b = base) &&
463     (q = queue) != null && // must read q after b
464     (t = q[i = (q.length - 1) & b]) != null &&
465 dl 1.5 casSlotNull(q, i, t)) {
466 dl 1.1 base = b + 1;
467     return t;
468     }
469     return null;
470     }
471    
472     /**
473 dl 1.5 * Returns a popped task, or null if empty. Ensures active status
474 jsr166 1.10 * if non-null. Called only by current thread.
475 dl 1.1 */
476     final ForkJoinTask<?> popTask() {
477     int s = sp;
478 dl 1.5 while (s != base) {
479     if (tryActivate()) {
480     ForkJoinTask<?>[] q = queue;
481     int mask = q.length - 1;
482     int i = (s - 1) & mask;
483     ForkJoinTask<?> t = q[i];
484     if (t == null || !casSlotNull(q, i, t))
485     break;
486     storeSp(s - 1);
487     return t;
488     }
489 dl 1.1 }
490     return null;
491     }
492    
493     /**
494     * Specialized version of popTask to pop only if
495     * topmost element is the given task. Called only
496 dl 1.5 * by current thread while active.
497 jsr166 1.11 *
498     * @param t the task. Caller must ensure non-null.
499 dl 1.1 */
500     final boolean unpushTask(ForkJoinTask<?> t) {
501     ForkJoinTask<?>[] q = queue;
502     int mask = q.length - 1;
503     int s = sp - 1;
504 dl 1.5 if (casSlotNull(q, s & mask, t)) {
505     storeSp(s);
506 dl 1.1 return true;
507     }
508     return false;
509     }
510    
511     /**
512 dl 1.7 * Returns next task.
513 dl 1.1 */
514 dl 1.2 final ForkJoinTask<?> peekTask() {
515 dl 1.1 ForkJoinTask<?>[] q = queue;
516 dl 1.7 if (q == null)
517     return null;
518     int mask = q.length - 1;
519 jsr166 1.15 int i = locallyFifo ? base : (sp - 1);
520 dl 1.7 return q[i & mask];
521 dl 1.1 }
522    
523     /**
524     * Doubles queue array size. Transfers elements by emulating
525     * steals (deqs) from old array and placing, oldest first, into
526     * new array.
527     */
528     private void growQueue() {
529     ForkJoinTask<?>[] oldQ = queue;
530     int oldSize = oldQ.length;
531     int newSize = oldSize << 1;
532     if (newSize > MAXIMUM_QUEUE_CAPACITY)
533     throw new RejectedExecutionException("Queue capacity exceeded");
534     ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
535    
536     int b = base;
537     int bf = b + oldSize;
538     int oldMask = oldSize - 1;
539     int newMask = newSize - 1;
540     do {
541     int oldIndex = b & oldMask;
542     ForkJoinTask<?> t = oldQ[oldIndex];
543     if (t != null && !casSlotNull(oldQ, oldIndex, t))
544     t = null;
545     setSlot(newQ, b & newMask, t);
546     } while (++b != bf);
547 dl 1.5 pool.signalWork();
548 dl 1.1 }
549    
550     /**
551 dl 1.5 * Tries to steal a task from another worker. Starts at a random
552     * index of workers array, and probes workers until finding one
553     * with non-empty queue or finding that all are empty. It
554     * randomly selects the first n probes. If these are empty, it
555     * resorts to a full circular traversal, which is necessary to
556     * accurately set active status by caller. Also restarts if pool
557     * events occurred since last scan, which forces refresh of
558     * workers array, in case barrier was associated with resize.
559 dl 1.1 *
560     * This method must be both fast and quiet -- usually avoiding
561     * memory accesses that could disrupt cache sharing etc other than
562     * those needed to check for and take tasks. This accounts for,
563     * among other things, updating random seed in place without
564 dl 1.5 * storing it until exit.
565 dl 1.1 *
566     * @return a task, or null if none found
567     */
568 dl 1.5 private ForkJoinTask<?> scan() {
569     ForkJoinTask<?> t = null;
570     int r = seed; // extract once to keep scan quiet
571     ForkJoinWorkerThread[] ws; // refreshed on outer loop
572     int mask; // must be power 2 minus 1 and > 0
573     outer:do {
574     if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
575 dl 1.1 int idx = r;
576 dl 1.5 int probes = ~mask; // use random index while negative
577 dl 1.1 for (;;) {
578 dl 1.5 r = xorShift(r); // update random seed
579     ForkJoinWorkerThread v = ws[mask & idx];
580     if (v == null || v.sp == v.base) {
581     if (probes <= mask)
582 jsr166 1.15 idx = (probes++ < 0) ? r : (idx + 1);
583 dl 1.5 else
584     break;
585 dl 1.1 }
586 dl 1.5 else if (!tryActivate() || (t = v.deqTask()) == null)
587     continue outer; // restart on contention
588 dl 1.1 else
589 dl 1.5 break outer;
590 dl 1.1 }
591     }
592 dl 1.5 } while (pool.hasNewSyncEvent(this)); // retry on pool events
593     seed = r;
594     return t;
595 dl 1.1 }
596    
597     /**
598 jsr166 1.11 * Gets and removes a local or stolen task.
599     *
600 dl 1.5 * @return a task, if available
601     */
602     final ForkJoinTask<?> pollTask() {
603 jsr166 1.15 ForkJoinTask<?> t = locallyFifo ? deqTask() : popTask();
604 dl 1.5 if (t == null && (t = scan()) != null)
605     ++stealCount;
606     return t;
607 dl 1.1 }
608    
609     /**
610 jsr166 1.11 * Gets a local task.
611     *
612 dl 1.7 * @return a task, if available
613     */
614     final ForkJoinTask<?> pollLocalTask() {
615 jsr166 1.15 return locallyFifo ? deqTask() : popTask();
616 dl 1.7 }
617    
618     /**
619 dl 1.5 * Returns a pool submission, if one exists, activating first.
620 jsr166 1.11 *
621 dl 1.5 * @return a submission, if available
622 dl 1.1 */
623 dl 1.5 private ForkJoinTask<?> pollSubmission() {
624     ForkJoinPool p = pool;
625     while (p.hasQueuedSubmissions()) {
626     ForkJoinTask<?> t;
627     if (tryActivate() && (t = p.pollSubmission()) != null)
628     return t;
629 dl 1.1 }
630 dl 1.5 return null;
631 dl 1.1 }
632 dl 1.5
633     // Methods accessed only by Pool
634    
635 dl 1.1 /**
636 dl 1.5 * Removes and cancels all tasks in queue. Can be called from any
637     * thread.
638 dl 1.1 */
639 dl 1.5 final void cancelTasks() {
640 dl 1.3 ForkJoinTask<?> t;
641 dl 1.5 while (base != sp && (t = deqTask()) != null)
642     t.cancelIgnoringExceptions();
643 dl 1.1 }
644    
645     /**
646 jsr166 1.11 * Drains tasks to given collection c.
647     *
648 dl 1.7 * @return the number of tasks drained
649     */
650 dl 1.22 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
651 dl 1.7 int n = 0;
652     ForkJoinTask<?> t;
653     while (base != sp && (t = deqTask()) != null) {
654     c.add(t);
655     ++n;
656     }
657     return n;
658     }
659    
660     /**
661 jsr166 1.11 * Gets and clears steal count for accumulation by pool. Called
662 dl 1.5 * only when known to be idle (in pool.sync and termination).
663 dl 1.1 */
664 dl 1.5 final int getAndClearStealCount() {
665     int sc = stealCount;
666     stealCount = 0;
667     return sc;
668     }
669    
670     /**
671 jsr166 1.21 * Returns {@code true} if at least one worker in the given array
672     * appears to have at least one queued task.
673 jsr166 1.16 *
674 dl 1.5 * @param ws array of workers
675     */
676     static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
677     if (ws != null) {
678     int len = ws.length;
679     for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
680     for (int i = 0; i < len; ++i) {
681     ForkJoinWorkerThread w = ws[i];
682     if (w != null && w.sp != w.base)
683     return true;
684 dl 1.1 }
685     }
686     }
687 dl 1.5 return false;
688 dl 1.1 }
689    
690 dl 1.5 // Support methods for ForkJoinTask
691    
692 dl 1.1 /**
693 dl 1.2 * Returns an estimate of the number of tasks in the queue.
694     */
695     final int getQueueSize() {
696 jsr166 1.15 // suppress momentarily negative values
697     return Math.max(0, sp - base);
698 dl 1.2 }
699    
700     /**
701 dl 1.1 * Returns an estimate of the number of tasks, offset by a
702     * function of number of idle workers.
703     */
704     final int getEstimatedSurplusTaskCount() {
705 dl 1.3 // The halving approximates weighting idle vs non-idle workers
706 dl 1.1 return (sp - base) - (pool.getIdleThreadCount() >>> 1);
707     }
708    
709     /**
710 jsr166 1.16 * Scans, returning early if joinMe done.
711 dl 1.1 */
712 dl 1.5 final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
713     ForkJoinTask<?> t = pollTask();
714     if (t != null && joinMe.status < 0 && sp == base) {
715     pushTask(t); // unsteal if done and this task would be stealable
716     t = null;
717     }
718     return t;
719 dl 1.1 }
720 jsr166 1.6
721 dl 1.1 /**
722 jsr166 1.16 * Runs tasks until {@code pool.isQuiescent()}.
723 dl 1.1 */
724 dl 1.5 final void helpQuiescePool() {
725     for (;;) {
726     ForkJoinTask<?> t = pollTask();
727 jsr166 1.6 if (t != null)
728 dl 1.5 t.quietlyExec();
729     else if (tryInactivate() && pool.isQuiescent())
730     break;
731     }
732 jsr166 1.16 do {} while (!tryActivate()); // re-activate on exit
733 dl 1.1 }
734    
735 jsr166 1.20 // Unsafe mechanics
736    
737     private static final sun.misc.Unsafe UNSAFE = getUnsafe();
738     private static final long spOffset =
739     objectFieldOffset("sp", ForkJoinWorkerThread.class);
740     private static final long runStateOffset =
741     objectFieldOffset("runState", ForkJoinWorkerThread.class);
742     private static final long qBase;
743     private static final int qShift;
744    
745     static {
746     qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
747     int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
748     if ((s & (s-1)) != 0)
749     throw new Error("data type scale not a power of two");
750     qShift = 31 - Integer.numberOfLeadingZeros(s);
751     }
752    
753     private static long objectFieldOffset(String field, Class<?> klazz) {
754     try {
755     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
756     } catch (NoSuchFieldException e) {
757     // Convert Exception to corresponding Error
758     NoSuchFieldError error = new NoSuchFieldError(field);
759     error.initCause(e);
760     throw error;
761     }
762     }
763    
764     /**
765     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
766     * Replace with a simple call to Unsafe.getUnsafe when integrating
767     * into a jdk.
768     *
769     * @return a sun.misc.Unsafe
770     */
771 jsr166 1.17 private static sun.misc.Unsafe getUnsafe() {
772 jsr166 1.6 try {
773 jsr166 1.17 return sun.misc.Unsafe.getUnsafe();
774 jsr166 1.6 } catch (SecurityException se) {
775     try {
776     return java.security.AccessController.doPrivileged
777 jsr166 1.20 (new java.security
778     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
779 jsr166 1.17 public sun.misc.Unsafe run() throws Exception {
780 jsr166 1.20 java.lang.reflect.Field f = sun.misc
781     .Unsafe.class.getDeclaredField("theUnsafe");
782     f.setAccessible(true);
783     return (sun.misc.Unsafe) f.get(null);
784 jsr166 1.6 }});
785     } catch (java.security.PrivilegedActionException e) {
786 jsr166 1.17 throw new RuntimeException("Could not initialize intrinsics",
787     e.getCause());
788 jsr166 1.6 }
789     }
790     }
791 dl 1.1 }