ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.65
Committed: Thu Apr 14 01:17:58 2011 UTC (13 years, 1 month ago) by jsr166
Branch: MAIN
CVS Tags: jdk7-compat, release-1_7_0
Changes since 1.64: +3 -3 lines
Log Message:
coding style

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.64 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package jsr166y;
8 jsr166 1.18
9     import java.util.Collection;
10 dl 1.53 import java.util.concurrent.RejectedExecutionException;
11 dl 1.1
12     /**
13 dl 1.59 * A thread managed by a {@link ForkJoinPool}, which executes
14     * {@link ForkJoinTask}s.
15     * This class is subclassable solely for the sake of adding
16     * functionality -- there are no overridable methods dealing with
17     * scheduling or execution. However, you can override initialization
18     * and termination methods surrounding the main task processing loop.
19     * If you do create such a subclass, you will also need to supply a
20     * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
21     * in a {@code ForkJoinPool}.
22 jsr166 1.6 *
23 jsr166 1.13 * @since 1.7
24     * @author Doug Lea
25 dl 1.1 */
26     public class ForkJoinWorkerThread extends Thread {
27     /*
28 dl 1.31 * Overview:
29 dl 1.1 *
30 dl 1.31 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
31     * ForkJoinTasks. This class includes bookkeeping in support of
32     * worker activation, suspension, and lifecycle control described
33     * in more detail in the internal documentation of class
34     * ForkJoinPool. And as described further below, this class also
35     * includes special-cased support for some ForkJoinTask
36     * methods. But the main mechanics involve work-stealing:
37     *
38     * Work-stealing queues are special forms of Deques that support
39     * only three of the four possible end-operations -- push, pop,
40     * and deq (aka steal), under the further constraints that push
41     * and pop are called only from the owning thread, while deq may
42     * be called from other threads. (If you are unfamiliar with
43     * them, you probably want to read Herlihy and Shavit's book "The
44     * Art of Multiprocessor programming", chapter 16 describing these
45     * in more detail before proceeding.) The main work-stealing
46     * queue design is roughly similar to those in the papers "Dynamic
47     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
48     * (http://research.sun.com/scalable/pubs/index.html) and
49     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
50     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
51     * The main differences ultimately stem from gc requirements that
52     * we null out taken slots as soon as we can, to maintain as small
53     * a footprint as possible even in programs generating huge
54     * numbers of tasks. To accomplish this, we shift the CAS
55     * arbitrating pop vs deq (steal) from being on the indices
56 dl 1.63 * ("queueBase" and "queueTop") to the slots themselves (mainly
57     * via method "casSlotNull()"). So, both a successful pop and deq
58     * mainly entail a CAS of a slot from non-null to null. Because
59     * we rely on CASes of references, we do not need tag bits on
60     * queueBase or queueTop. They are simple ints as used in any
61     * circular array-based queue (see for example ArrayDeque).
62     * Updates to the indices must still be ordered in a way that
63     * guarantees that queueTop == queueBase means the queue is empty,
64     * but otherwise may err on the side of possibly making the queue
65     * appear nonempty when a push, pop, or deq have not fully
66     * committed. Note that this means that the deq operation,
67     * considered individually, is not wait-free. One thief cannot
68     * successfully continue until another in-progress one (or, if
69     * previously empty, a push) completes. However, in the
70 dl 1.31 * aggregate, we ensure at least probabilistic non-blockingness.
71     * If an attempted steal fails, a thief always chooses a different
72     * random victim target to try next. So, in order for one thief to
73     * progress, it suffices for any in-progress deq or new push on
74 dl 1.63 * any empty queue to complete.
75 dl 1.1 *
76 dl 1.23 * This approach also enables support for "async mode" where local
77     * task processing is in FIFO, not LIFO order; simply by using a
78     * version of deq rather than pop when locallyFifo is true (as set
79     * by the ForkJoinPool). This allows use in message-passing
80 dl 1.63 * frameworks in which tasks are never joined. However neither
81     * mode considers affinities, loads, cache localities, etc, so
82     * rarely provide the best possible performance on a given
83     * machine, but portably provide good throughput by averaging over
84     * these factors. (Further, even if we did try to use such
85     * information, we do not usually have a basis for exploiting
86     * it. For example, some sets of tasks profit from cache
87     * affinities, but others are harmed by cache pollution effects.)
88 dl 1.23 *
89 dl 1.35 * When a worker would otherwise be blocked waiting to join a
90     * task, it first tries a form of linear helping: Each worker
91 dl 1.36 * records (in field currentSteal) the most recent task it stole
92     * from some other worker. Plus, it records (in field currentJoin)
93     * the task it is currently actively joining. Method joinTask uses
94 dl 1.35 * these markers to try to find a worker to help (i.e., steal back
95     * a task from and execute it) that could hasten completion of the
96     * actively joined task. In essence, the joiner executes a task
97     * that would be on its own local deque had the to-be-joined task
98     * not been stolen. This may be seen as a conservative variant of
99     * the approach in Wagner & Calder "Leapfrogging: a portable
100     * technique for implementing efficient futures" SIGPLAN Notices,
101     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
102     * in that: (1) We only maintain dependency links across workers
103 dl 1.39 * upon steals, rather than use per-task bookkeeping. This may
104     * require a linear scan of workers array to locate stealers, but
105     * usually doesn't because stealers leave hints (that may become
106     * stale/wrong) of where to locate them. This isolates cost to
107     * when it is needed, rather than adding to per-task overhead.
108     * (2) It is "shallow", ignoring nesting and potentially cyclic
109     * mutual steals. (3) It is intentionally racy: field currentJoin
110     * is updated only while actively joining, which means that we
111     * miss links in the chain during long-lived tasks, GC stalls etc
112     * (which is OK since blocking in such cases is usually a good
113     * idea). (4) We bound the number of attempts to find work (see
114 dl 1.63 * MAX_HELP) and fall back to suspending the worker and if
115     * necessary replacing it with another.
116 dl 1.35 *
117 dl 1.36 * Efficient implementation of these algorithms currently relies
118     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
119 dl 1.63 * correct orderings, reads and writes of variable queueBase
120     * require volatile ordering. Variable queueTop need not be
121     * volatile because non-local reads always follow those of
122     * queueBase. Similarly, because they are protected by volatile
123     * queueBase reads, reads of the queue array and its slots by
124     * other threads do not need volatile load semantics, but writes
125     * (in push) require store order and CASes (in pop and deq)
126     * require (volatile) CAS semantics. (Michael, Saraswat, and
127     * Vechev's algorithm has similar properties, but without support
128     * for nulling slots.) Since these combinations aren't supported
129     * using ordinary volatiles, the only way to accomplish these
130     * efficiently is to use direct Unsafe calls. (Using external
131     * AtomicIntegers and AtomicReferenceArrays for the indices and
132     * array is significantly slower because of memory locality and
133     * indirection effects.)
134 jsr166 1.29 *
135 dl 1.28 * Further, performance on most platforms is very sensitive to
136     * placement and sizing of the (resizable) queue array. Even
137     * though these queues don't usually become all that big, the
138     * initial size must be large enough to counteract cache
139 dl 1.1 * contention effects across multiple queues (especially in the
140     * presence of GC cardmarking). Also, to improve thread-locality,
141 dl 1.63 * queues are initialized after starting.
142 dl 1.1 */
143    
144     /**
145 dl 1.63 * Mask for pool indices encoded as shorts
146 dl 1.31 */
147 dl 1.63 private static final int SMASK = 0xffff;
148 dl 1.36
149     /**
150 dl 1.1 * Capacity of work-stealing queue array upon initialization.
151 dl 1.34 * Must be a power of two. Initial size must be at least 4, but is
152 dl 1.1 * padded to minimize cache effects.
153     */
154     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
155    
156     /**
157 dl 1.63 * Maximum size for queue array. Must be a power of two
158     * less than or equal to 1 << (31 - width of array entry) to
159     * ensure lack of index wraparound, but is capped at a lower
160     * value to help users trap runaway computations.
161 dl 1.1 */
162 dl 1.63 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
163    
164     /**
165     * The work-stealing queue array. Size must be a power of two.
166     * Initialized when started (as oposed to when constructed), to
167     * improve memory locality.
168     */
169     ForkJoinTask<?>[] queue;
170 dl 1.1
171     /**
172 jsr166 1.16 * The pool this thread works in. Accessed directly by ForkJoinTask.
173 dl 1.1 */
174 dl 1.5 final ForkJoinPool pool;
175 dl 1.1
176     /**
177 dl 1.63 * Index (mod queue.length) of next queue slot to push to or pop
178     * from. It is written only by owner thread, and accessed by other
179     * threads only after reading (volatile) queueBase. Both queueTop
180     * and queueBase are allowed to wrap around on overflow, but
181     * (queueTop - queueBase) still estimates size.
182 dl 1.1 */
183 dl 1.63 int queueTop;
184 dl 1.37
185 dl 1.1 /**
186 dl 1.31 * Index (mod queue.length) of least valid queue slot, which is
187     * always the next position to steal from if nonempty.
188     */
189 dl 1.63 volatile int queueBase;
190 dl 1.1
191     /**
192 dl 1.36 * The index of most recent stealer, used as a hint to avoid
193     * traversal in method helpJoinTask. This is only a hint because a
194     * worker might have had multiple steals and this only holds one
195     * of them (usually the most current). Declared non-volatile,
196     * relying on other prevailing sync to keep reasonably current.
197     */
198 dl 1.63 int stealHint;
199 dl 1.36
200     /**
201 dl 1.63 * Index of this worker in pool array. Set once by pool before
202     * running, and accessed directly by pool to locate this worker in
203     * its workers array.
204 dl 1.1 */
205 dl 1.63 final int poolIndex;
206 dl 1.31
207 dl 1.63 /**
208     * Encoded record for pool task waits. Usages are always
209     * surrounded by volatile reads/writes
210     */
211     int nextWait;
212 dl 1.1
213     /**
214 dl 1.63 * Complement of poolIndex, offset by count of entries of task
215     * waits. Accessed by ForkJoinPool to manage event waiters.
216 dl 1.1 */
217 dl 1.63 volatile int eventCount;
218 dl 1.1
219     /**
220 dl 1.5 * Seed for random number generator for choosing steal victims.
221 dl 1.31 * Uses Marsaglia xorshift. Must be initialized as nonzero.
222 dl 1.1 */
223 dl 1.63 int seed;
224 dl 1.1
225     /**
226 dl 1.63 * Number of steals. Directly accessed (and reset) by pool when
227     * idle.
228 dl 1.31 */
229 dl 1.63 int stealCount;
230 dl 1.37
231 dl 1.1 /**
232 dl 1.63 * True if this worker should or did terminate
233 dl 1.1 */
234 dl 1.63 volatile boolean terminate;
235 dl 1.1
236     /**
237 dl 1.63 * Set to true before LockSupport.park; false on return
238 dl 1.1 */
239 dl 1.63 volatile boolean parked;
240 dl 1.1
241     /**
242 dl 1.63 * True if use local fifo, not default lifo, for local polling.
243     * Shadows value from ForkJoinPool.
244 dl 1.7 */
245 dl 1.63 final boolean locallyFifo;
246 dl 1.7
247     /**
248 dl 1.63 * The task most recently stolen from another worker (or
249     * submission queue). All uses are surrounded by enough volatile
250     * reads/writes to maintain as non-volatile.
251 dl 1.40 */
252 dl 1.63 ForkJoinTask<?> currentSteal;
253 dl 1.40
254     /**
255 dl 1.36 * The task currently being joined, set only when actively trying
256 dl 1.63 * to help other stealers in helpJoinTask. All uses are surrounded
257     * by enough volatile reads/writes to maintain as non-volatile.
258 dl 1.36 */
259 dl 1.63 ForkJoinTask<?> currentJoin;
260 dl 1.36
261     /**
262 dl 1.1 * Creates a ForkJoinWorkerThread operating in the given pool.
263 jsr166 1.11 *
264 dl 1.1 * @param pool the pool this thread works in
265     * @throws NullPointerException if pool is null
266     */
267     protected ForkJoinWorkerThread(ForkJoinPool pool) {
268 dl 1.63 super(pool.nextWorkerName());
269 dl 1.1 this.pool = pool;
270 dl 1.63 int k = pool.registerWorker(this);
271     poolIndex = k;
272     eventCount = ~k & SMASK; // clear wait count
273     locallyFifo = pool.locallyFifo;
274     Thread.UncaughtExceptionHandler ueh = pool.ueh;
275 dl 1.31 if (ueh != null)
276     setUncaughtExceptionHandler(ueh);
277 dl 1.63 setDaemon(true);
278 dl 1.31 }
279    
280 dl 1.63 // Public methods
281 dl 1.2
282     /**
283 jsr166 1.11 * Returns the pool hosting this thread.
284     *
285 dl 1.2 * @return the pool
286     */
287 dl 1.4 public ForkJoinPool getPool() {
288     return pool;
289 dl 1.2 }
290    
291     /**
292 dl 1.4 * Returns the index number of this thread in its pool. The
293     * returned value ranges from zero to the maximum number of
294     * threads (minus one) that have ever been created in the pool.
295     * This method may be useful for applications that track status or
296 dl 1.5 * collect results per-worker rather than per-task.
297 jsr166 1.11 *
298     * @return the index number
299 dl 1.2 */
300 dl 1.4 public int getPoolIndex() {
301     return poolIndex;
302 dl 1.2 }
303    
304 dl 1.63 // Randomization
305    
306     /**
307     * Computes next value for random victim probes and backoffs.
308     * Scans don't require a very high quality generator, but also not
309     * a crummy one. Marsaglia xor-shift is cheap and works well
310     * enough. Note: This is manually inlined in FJP.scan() to avoid
311     * writes inside busy loops.
312     */
313     private int nextSeed() {
314     int r = seed;
315     r ^= r << 13;
316     r ^= r >>> 17;
317     r ^= r << 5;
318     return seed = r;
319     }
320    
321     // Run State management
322    
323 dl 1.7 /**
324 dl 1.31 * Initializes internal state after construction but before
325     * processing any tasks. If you override this method, you must
326 jsr166 1.58 * invoke {@code super.onStart()} at the beginning of the method.
327 dl 1.31 * Initialization requires care: Most fields must have legal
328     * default values, to ensure that attempted accesses from other
329     * threads work correctly even before this thread starts
330     * processing tasks.
331 dl 1.7 */
332 dl 1.31 protected void onStart() {
333     queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
334 dl 1.63 int r = pool.workerSeedGenerator.nextInt();
335 jsr166 1.65 seed = (r == 0) ? 1 : r; // must be nonzero
336 dl 1.31 }
337 dl 1.5
338     /**
339 dl 1.31 * Performs cleanup associated with termination of this worker
340     * thread. If you override this method, you must invoke
341     * {@code super.onTermination} at the end of the overridden method.
342 jsr166 1.21 *
343 dl 1.31 * @param exception the exception causing this thread to abort due
344     * to an unrecoverable error, or {@code null} if completed normally
345 dl 1.5 */
346 dl 1.31 protected void onTermination(Throwable exception) {
347     try {
348 dl 1.63 terminate = true;
349 dl 1.31 cancelTasks();
350 dl 1.63 pool.deregisterWorker(this, exception);
351 dl 1.31 } catch (Throwable ex) { // Shouldn't ever happen
352     if (exception == null) // but if so, at least rethrown
353     exception = ex;
354     } finally {
355     if (exception != null)
356     UNSAFE.throwException(exception);
357 dl 1.5 }
358     }
359    
360     /**
361     * This method is required to be public, but should never be
362     * called explicitly. It performs the main run loop to execute
363 dl 1.59 * {@link ForkJoinTask}s.
364 dl 1.1 */
365 dl 1.5 public void run() {
366     Throwable exception = null;
367     try {
368     onStart();
369 dl 1.63 pool.work(this);
370 dl 1.5 } catch (Throwable ex) {
371     exception = ex;
372     } finally {
373     onTermination(exception);
374     }
375 dl 1.1 }
376    
377 dl 1.31 /*
378     * Intrinsics-based atomic writes for queue slots. These are
379 jsr166 1.48 * basically the same as methods in AtomicReferenceArray, but
380 dl 1.31 * specialized for (1) ForkJoinTask elements (2) requirement that
381     * nullness and bounds checks have already been performed by
382     * callers and (3) effective offsets are known not to overflow
383     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
384     * need corresponding version for reads: plain array reads are OK
385 jsr166 1.47 * because they are protected by other volatile reads and are
386 dl 1.31 * confirmed by CASes.
387     *
388 dl 1.63 * Most uses don't actually call these methods, but instead
389     * contain inlined forms that enable more predictable
390     * optimization. We don't define the version of write used in
391     * pushTask at all, but instead inline there a store-fenced array
392     * slot write.
393     *
394     * Also in most methods, as a performance (not correctness) issue,
395     * we'd like to encourage compilers not to arbitrarily postpone
396     * setting queueTop after writing slot. Currently there is no
397     * intrinsic for arranging this, but using Unsafe putOrderedInt
398     * may be a preferable strategy on some compilers even though its
399     * main effect is a pre-, not post- fence. To simplify possible
400     * changes, the option is left in comments next to the associated
401     * assignments.
402 dl 1.1 */
403    
404     /**
405 dl 1.31 * CASes slot i of array q from t to null. Caller must ensure q is
406     * non-null and index is in range.
407 dl 1.1 */
408 dl 1.31 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
409     ForkJoinTask<?> t) {
410 dl 1.63 return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
411 dl 1.1 }
412    
413 dl 1.5 /**
414 dl 1.31 * Performs a volatile write of the given task at given slot of
415     * array q. Caller must ensure q is non-null and index is in
416     * range. This method is used only during resets and backouts.
417 dl 1.5 */
418 dl 1.31 private static final void writeSlot(ForkJoinTask<?>[] q, int i,
419 jsr166 1.45 ForkJoinTask<?> t) {
420 dl 1.63 UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
421 dl 1.5 }
422    
423 dl 1.31 // queue methods
424 dl 1.1
425     /**
426 dl 1.31 * Pushes a task. Call only from this thread.
427 jsr166 1.11 *
428 jsr166 1.10 * @param t the task. Caller must ensure non-null.
429 dl 1.1 */
430     final void pushTask(ForkJoinTask<?> t) {
431 dl 1.63 ForkJoinTask<?>[] q; int s, m;
432     if ((q = queue) != null) { // ignore if queue removed
433     long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
434     UNSAFE.putOrderedObject(q, u, t);
435     queueTop = s + 1; // or use putOrderedInt
436     if ((s -= queueBase) <= 2)
437     pool.signalWork();
438     else if (s == m)
439     growQueue();
440     }
441     }
442    
443     /**
444     * Creates or doubles queue array. Transfers elements by
445     * emulating steals (deqs) from old array and placing, oldest
446     * first, into new array.
447     */
448     private void growQueue() {
449     ForkJoinTask<?>[] oldQ = queue;
450     int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
451     if (size > MAXIMUM_QUEUE_CAPACITY)
452     throw new RejectedExecutionException("Queue capacity exceeded");
453     if (size < INITIAL_QUEUE_CAPACITY)
454     size = INITIAL_QUEUE_CAPACITY;
455     ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
456     int mask = size - 1;
457     int top = queueTop;
458     int oldMask;
459     if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
460     for (int b = queueBase; b != top; ++b) {
461     long u = ((b & oldMask) << ASHIFT) + ABASE;
462     Object x = UNSAFE.getObjectVolatile(oldQ, u);
463     if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
464     UNSAFE.putObjectVolatile
465     (q, ((b & mask) << ASHIFT) + ABASE, x);
466     }
467     }
468 dl 1.1 }
469    
470     /**
471     * Tries to take a task from the base of the queue, failing if
472 dl 1.31 * empty or contended. Note: Specializations of this code appear
473 dl 1.35 * in locallyDeqTask and elsewhere.
474 jsr166 1.11 *
475     * @return a task, or null if none or contended
476 dl 1.1 */
477 dl 1.7 final ForkJoinTask<?> deqTask() {
478 dl 1.63 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
479     if (queueTop != (b = queueBase) &&
480 dl 1.1 (q = queue) != null && // must read q after b
481 dl 1.63 (i = (q.length - 1) & b) >= 0 &&
482     (t = q[i]) != null && queueBase == b &&
483     UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
484     queueBase = b + 1;
485 dl 1.1 return t;
486     }
487     return null;
488     }
489    
490     /**
491 dl 1.63 * Tries to take a task from the base of own queue. Called only
492     * by this thread.
493 dl 1.23 *
494     * @return a task, or null if none
495     */
496     final ForkJoinTask<?> locallyDeqTask() {
497 dl 1.63 ForkJoinTask<?> t; int m, b, i;
498 dl 1.31 ForkJoinTask<?>[] q = queue;
499 dl 1.63 if (q != null && (m = q.length - 1) >= 0) {
500     while (queueTop != (b = queueBase)) {
501     if ((t = q[i = m & b]) != null &&
502     queueBase == b &&
503     UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
504 dl 1.31 t, null)) {
505 dl 1.63 queueBase = b + 1;
506 dl 1.23 return t;
507     }
508     }
509     }
510     return null;
511     }
512    
513     /**
514 dl 1.63 * Returns a popped task, or null if empty.
515 dl 1.46 * Called only by this thread.
516 dl 1.1 */
517 dl 1.40 private ForkJoinTask<?> popTask() {
518 dl 1.63 int m;
519 dl 1.40 ForkJoinTask<?>[] q = queue;
520 dl 1.63 if (q != null && (m = q.length - 1) >= 0) {
521     for (int s; (s = queueTop) != queueBase;) {
522     int i = m & --s;
523     long u = (i << ASHIFT) + ABASE; // raw offset
524 dl 1.40 ForkJoinTask<?> t = q[i];
525     if (t == null) // lost to stealer
526     break;
527     if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
528 dl 1.63 queueTop = s; // or putOrderedInt
529 dl 1.40 return t;
530     }
531 dl 1.5 }
532 dl 1.1 }
533     return null;
534     }
535    
536     /**
537 dl 1.33 * Specialized version of popTask to pop only if topmost element
538 dl 1.63 * is the given task. Called only by this thread.
539 jsr166 1.11 *
540     * @param t the task. Caller must ensure non-null.
541 dl 1.1 */
542     final boolean unpushTask(ForkJoinTask<?> t) {
543 dl 1.63 ForkJoinTask<?>[] q;
544 dl 1.31 int s;
545 dl 1.63 if ((q = queue) != null && (s = queueTop) != queueBase &&
546 dl 1.33 UNSAFE.compareAndSwapObject
547 dl 1.63 (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
548     queueTop = s; // or putOrderedInt
549 dl 1.1 return true;
550     }
551     return false;
552     }
553    
554     /**
555 jsr166 1.45 * Returns next task, or null if empty or contended.
556 dl 1.1 */
557 dl 1.2 final ForkJoinTask<?> peekTask() {
558 dl 1.63 int m;
559 dl 1.1 ForkJoinTask<?>[] q = queue;
560 dl 1.63 if (q == null || (m = q.length - 1) < 0)
561 dl 1.7 return null;
562 dl 1.63 int i = locallyFifo ? queueBase : (queueTop - 1);
563     return q[i & m];
564 dl 1.1 }
565    
566 dl 1.63 // Support methods for ForkJoinPool
567 dl 1.50
568 dl 1.1 /**
569 dl 1.63 * Runs the given task, plus any local tasks until queue is empty
570 dl 1.31 */
571 dl 1.63 final void execTask(ForkJoinTask<?> t) {
572     currentSteal = t;
573 dl 1.31 for (;;) {
574 dl 1.63 if (t != null)
575     t.doExec();
576     if (queueTop == queueBase)
577 dl 1.40 break;
578 dl 1.63 t = locallyFifo ? locallyDeqTask() : popTask();
579 dl 1.36 }
580 dl 1.63 ++stealCount;
581     currentSteal = null;
582 dl 1.1 }
583 dl 1.5
584 dl 1.31 /**
585 dl 1.5 * Removes and cancels all tasks in queue. Can be called from any
586     * thread.
587 dl 1.1 */
588 dl 1.5 final void cancelTasks() {
589 dl 1.40 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
590 dl 1.63 if (cj != null && cj.status >= 0)
591 dl 1.36 cj.cancelIgnoringExceptions();
592     ForkJoinTask<?> cs = currentSteal;
593 dl 1.59 if (cs != null && cs.status >= 0)
594 dl 1.36 cs.cancelIgnoringExceptions();
595 dl 1.63 while (queueBase != queueTop) {
596 dl 1.31 ForkJoinTask<?> t = deqTask();
597     if (t != null)
598     t.cancelIgnoringExceptions();
599     }
600 dl 1.1 }
601    
602     /**
603 jsr166 1.11 * Drains tasks to given collection c.
604     *
605 dl 1.7 * @return the number of tasks drained
606     */
607 dl 1.22 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
608 dl 1.7 int n = 0;
609 dl 1.63 while (queueBase != queueTop) {
610 dl 1.31 ForkJoinTask<?> t = deqTask();
611     if (t != null) {
612     c.add(t);
613     ++n;
614     }
615 dl 1.7 }
616     return n;
617     }
618    
619 dl 1.31 // Support methods for ForkJoinTask
620    
621 dl 1.7 /**
622 dl 1.63 * Returns an estimate of the number of tasks in the queue.
623     */
624     final int getQueueSize() {
625     return queueTop - queueBase;
626     }
627    
628     /**
629 dl 1.36 * Gets and removes a local task.
630     *
631     * @return a task, if available
632     */
633     final ForkJoinTask<?> pollLocalTask() {
634 dl 1.63 return locallyFifo ? locallyDeqTask() : popTask();
635 dl 1.36 }
636    
637     /**
638     * Gets and removes a local or stolen task.
639     *
640     * @return a task, if available
641     */
642     final ForkJoinTask<?> pollTask() {
643 dl 1.63 ForkJoinWorkerThread[] ws;
644 dl 1.39 ForkJoinTask<?> t = pollLocalTask();
645 dl 1.63 if (t != null || (ws = pool.workers) == null)
646     return t;
647     int n = ws.length; // cheap version of FJP.scan
648     int steps = n << 1;
649     int r = nextSeed();
650     int i = 0;
651     while (i < steps) {
652     ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
653     if (w != null && w.queueBase != w.queueTop && w.queue != null) {
654     if ((t = w.deqTask()) != null)
655     return t;
656     i = 0;
657     }
658 dl 1.39 }
659 dl 1.63 return null;
660 dl 1.36 }
661    
662     /**
663 dl 1.63 * The maximum stolen->joining link depth allowed in helpJoinTask,
664     * as well as the maximum number of retries (allowing on average
665     * one staleness retry per level) per attempt to instead try
666     * compensation. Depths for legitimate chains are unbounded, but
667     * we use a fixed constant to avoid (otherwise unchecked) cycles
668     * and bound staleness of traversal parameters at the expense of
669     * sometimes blocking when we could be helping.
670     */
671     private static final int MAX_HELP = 16;
672    
673     /**
674     * Possibly runs some tasks and/or blocks, until joinMe is done.
675 dl 1.36 *
676     * @param joinMe the task to join
677 dl 1.63 * @return completion status on exit
678 dl 1.36 */
679 dl 1.63 final int joinTask(ForkJoinTask<?> joinMe) {
680 dl 1.36 ForkJoinTask<?> prevJoin = currentJoin;
681 dl 1.63 currentJoin = joinMe;
682     for (int s, retries = MAX_HELP;;) {
683     if ((s = joinMe.status) < 0) {
684     currentJoin = prevJoin;
685     return s;
686     }
687     if (retries > 0) {
688     if (queueTop != queueBase) {
689     if (!localHelpJoinTask(joinMe))
690     retries = 0; // cannot help
691     }
692     else if (retries == MAX_HELP >>> 1) {
693     --retries; // check uncommon case
694     if (tryDeqAndExec(joinMe) >= 0)
695     Thread.yield(); // for politeness
696     }
697     else
698 jsr166 1.65 retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
699 dl 1.63 }
700     else {
701     retries = MAX_HELP; // restart if not done
702     pool.tryAwaitJoin(joinMe);
703     }
704     }
705 dl 1.36 }
706    
707     /**
708 dl 1.63 * If present, pops and executes the given task, or any other
709     * cancelled task
710     *
711     * @return false if any other non-cancelled task exists in local queue
712     */
713     private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
714     int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
715     if ((s = queueTop) != queueBase && (q = queue) != null &&
716     (i = (q.length - 1) & --s) >= 0 &&
717     (t = q[i]) != null) {
718     if (t != joinMe && t.status >= 0)
719     return false;
720     if (UNSAFE.compareAndSwapObject
721     (q, (i << ASHIFT) + ABASE, t, null)) {
722     queueTop = s; // or putOrderedInt
723     t.doExec();
724     }
725     }
726     return true;
727     }
728    
729     /**
730     * Tries to locate and execute tasks for a stealer of the given
731     * task, or in turn one of its stealers, Traces
732 dl 1.52 * currentSteal->currentJoin links looking for a thread working on
733     * a descendant of the given task and with a non-empty queue to
734 dl 1.63 * steal back and execute tasks from. The implementation is very
735     * branchy to cope with potential inconsistencies or loops
736     * encountering chains that are stale, unknown, or of length
737     * greater than MAX_HELP links. All of these cases are dealt with
738     * by just retrying by caller.
739 dl 1.35 *
740     * @param joinMe the task to join
741 dl 1.63 * @param canSteal true if local queue is empty
742     * @return true if ran a task
743     */
744     private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
745     boolean helped = false;
746     int m = pool.scanGuard & SMASK;
747     ForkJoinWorkerThread[] ws = pool.workers;
748     if (ws != null && ws.length > m && joinMe.status >= 0) {
749     int levels = MAX_HELP; // remaining chain length
750     ForkJoinTask<?> task = joinMe; // base of chain
751     outer:for (ForkJoinWorkerThread thread = this;;) {
752 dl 1.59 // Try to find v, the stealer of task, by first using hint
753 dl 1.63 ForkJoinWorkerThread v = ws[thread.stealHint & m];
754 dl 1.59 if (v == null || v.currentSteal != task) {
755 dl 1.63 for (int j = 0; ;) { // search array
756     if ((v = ws[j]) != null && v.currentSteal == task) {
757     thread.stealHint = j;
758     break; // save hint for next time
759 dl 1.36 }
760 dl 1.63 if (++j > m)
761     break outer; // can't find stealer
762 dl 1.59 }
763     }
764     // Try to help v, using specialized form of deqTask
765     for (;;) {
766 dl 1.63 ForkJoinTask<?>[] q; int b, i;
767 dl 1.59 if (joinMe.status < 0)
768     break outer;
769 dl 1.63 if ((b = v.queueBase) == v.queueTop ||
770     (q = v.queue) == null ||
771     (i = (q.length-1) & b) < 0)
772     break; // empty
773     long u = (i << ASHIFT) + ABASE;
774 dl 1.59 ForkJoinTask<?> t = q[i];
775     if (task.status < 0)
776 dl 1.63 break outer; // stale
777     if (t != null && v.queueBase == b &&
778 dl 1.59 UNSAFE.compareAndSwapObject(q, u, t, null)) {
779 dl 1.63 v.queueBase = b + 1;
780     v.stealHint = poolIndex;
781     ForkJoinTask<?> ps = currentSteal;
782     currentSteal = t;
783     t.doExec();
784     currentSteal = ps;
785     helped = true;
786 dl 1.60 }
787 dl 1.35 }
788 dl 1.59 // Try to descend to find v's stealer
789     ForkJoinTask<?> next = v.currentJoin;
790 dl 1.63 if (--levels > 0 && task.status >= 0 &&
791     next != null && next != task) {
792     task = next;
793     thread = v;
794     }
795     else
796     break; // max levels, stale, dead-end, or cyclic
797     }
798     }
799     return helped;
800     }
801    
802     /**
803     * Performs an uncommon case for joinTask: If task t is at base of
804     * some workers queue, steals and executes it.
805     *
806     * @param t the task
807     * @return t's status
808     */
809     private int tryDeqAndExec(ForkJoinTask<?> t) {
810     int m = pool.scanGuard & SMASK;
811     ForkJoinWorkerThread[] ws = pool.workers;
812     if (ws != null && ws.length > m && t.status >= 0) {
813     for (int j = 0; j <= m; ++j) {
814     ForkJoinTask<?>[] q; int b, i;
815     ForkJoinWorkerThread v = ws[j];
816     if (v != null &&
817     (b = v.queueBase) != v.queueTop &&
818     (q = v.queue) != null &&
819     (i = (q.length - 1) & b) >= 0 &&
820     q[i] == t) {
821     long u = (i << ASHIFT) + ABASE;
822     if (v.queueBase == b &&
823     UNSAFE.compareAndSwapObject(q, u, t, null)) {
824     v.queueBase = b + 1;
825     v.stealHint = poolIndex;
826     ForkJoinTask<?> ps = currentSteal;
827     currentSteal = t;
828     t.doExec();
829     currentSteal = ps;
830     }
831     break;
832     }
833 dl 1.35 }
834     }
835 dl 1.63 return t.status;
836 dl 1.35 }
837    
838     /**
839 dl 1.63 * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns
840     * an estimate of the number of tasks, offset by a function of
841     * number of idle workers.
842 dl 1.31 *
843     * This method provides a cheap heuristic guide for task
844     * partitioning when programmers, frameworks, tools, or languages
845     * have little or no idea about task granularity. In essence by
846     * offering this method, we ask users only about tradeoffs in
847     * overhead vs expected throughput and its variance, rather than
848     * how finely to partition tasks.
849     *
850     * In a steady state strict (tree-structured) computation, each
851     * thread makes available for stealing enough tasks for other
852     * threads to remain active. Inductively, if all threads play by
853     * the same rules, each thread should make available only a
854     * constant number of tasks.
855     *
856     * The minimum useful constant is just 1. But using a value of 1
857     * would require immediate replenishment upon each steal to
858     * maintain enough tasks, which is infeasible. Further,
859     * partitionings/granularities of offered tasks should minimize
860     * steal rates, which in general means that threads nearer the top
861     * of computation tree should generate more than those nearer the
862     * bottom. In perfect steady state, each thread is at
863     * approximately the same level of computation tree. However,
864     * producing extra tasks amortizes the uncertainty of progress and
865     * diffusion assumptions.
866     *
867     * So, users will want to use values larger, but not much larger
868     * than 1 to both smooth over transient shortages and hedge
869     * against uneven progress; as traded off against the cost of
870     * extra task overhead. We leave the user to pick a threshold
871     * value to compare with the results of this call to guide
872     * decisions, but recommend values such as 3.
873     *
874     * When all threads are active, it is on average OK to estimate
875     * surplus strictly locally. In steady-state, if one thread is
876     * maintaining say 2 surplus tasks, then so are others. So we can
877 dl 1.63 * just use estimated queue length (although note that (queueTop -
878     * queueBase) can be an overestimate because of stealers lagging
879     * increments of queueBase). However, this strategy alone leads
880     * to serious mis-estimates in some non-steady-state conditions
881     * (ramp-up, ramp-down, other stalls). We can detect many of these
882     * by further considering the number of "idle" threads, that are
883 dl 1.31 * known to have zero queued tasks, so compensate by a factor of
884     * (#idle/#active) threads.
885 dl 1.1 */
886 dl 1.31 final int getEstimatedSurplusTaskCount() {
887 dl 1.63 return queueTop - queueBase - pool.idlePerActive();
888 dl 1.5 }
889    
890     /**
891 dl 1.63 * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
892     * pool's active count ctl maintenance, but rather than blocking
893     * when tasks cannot be found, we rescan until all others cannot
894     * find tasks either. The bracketing by pool quiescerCounts
895     * updates suppresses pool auto-shutdown mechanics that could
896     * otherwise prematurely terminate the pool because all threads
897     * appear to be inactive.
898 dl 1.1 */
899 dl 1.5 final void helpQuiescePool() {
900 dl 1.63 boolean active = true;
901 dl 1.42 ForkJoinTask<?> ps = currentSteal; // to restore below
902 dl 1.63 ForkJoinPool p = pool;
903     p.addQuiescerCount(1);
904 dl 1.5 for (;;) {
905 dl 1.63 ForkJoinWorkerThread[] ws = p.workers;
906     ForkJoinWorkerThread v = null;
907     int n;
908     if (queueTop != queueBase)
909     v = this;
910     else if (ws != null && (n = ws.length) > 1) {
911     ForkJoinWorkerThread w;
912     int r = nextSeed(); // cheap version of FJP.scan
913     int steps = n << 1;
914     for (int i = 0; i < steps; ++i) {
915     if ((w = ws[(i + r) & (n - 1)]) != null &&
916     w.queueBase != w.queueTop) {
917     v = w;
918     break;
919     }
920     }
921     }
922     if (v != null) {
923     ForkJoinTask<?> t;
924     if (!active) {
925     active = true;
926     p.addActiveCount(1);
927     }
928     if ((t = (v != this) ? v.deqTask() :
929 jsr166 1.65 locallyFifo ? locallyDeqTask() : popTask()) != null) {
930 dl 1.63 currentSteal = t;
931     t.doExec();
932     currentSteal = ps;
933     }
934     }
935 dl 1.31 else {
936     if (active) {
937 dl 1.63 active = false;
938     p.addActiveCount(-1);
939 dl 1.31 }
940     if (p.isQuiescent()) {
941 dl 1.63 p.addActiveCount(1);
942     p.addQuiescerCount(-1);
943     break;
944 dl 1.31 }
945     }
946 dl 1.5 }
947 dl 1.1 }
948    
949 jsr166 1.20 // Unsafe mechanics
950 dl 1.63 private static final sun.misc.Unsafe UNSAFE;
951     private static final long ABASE;
952     private static final int ASHIFT;
953 jsr166 1.20
954     static {
955 dl 1.63 int s;
956     try {
957     UNSAFE = getUnsafe();
958     Class a = ForkJoinTask[].class;
959     ABASE = UNSAFE.arrayBaseOffset(a);
960     s = UNSAFE.arrayIndexScale(a);
961     } catch (Exception e) {
962     throw new Error(e);
963     }
964 jsr166 1.20 if ((s & (s-1)) != 0)
965     throw new Error("data type scale not a power of two");
966 dl 1.63 ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
967 jsr166 1.20 }
968    
969     /**
970     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
971     * Replace with a simple call to Unsafe.getUnsafe when integrating
972     * into a jdk.
973     *
974     * @return a sun.misc.Unsafe
975     */
976 jsr166 1.17 private static sun.misc.Unsafe getUnsafe() {
977 jsr166 1.6 try {
978 jsr166 1.17 return sun.misc.Unsafe.getUnsafe();
979 jsr166 1.6 } catch (SecurityException se) {
980     try {
981     return java.security.AccessController.doPrivileged
982 jsr166 1.20 (new java.security
983     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
984 jsr166 1.17 public sun.misc.Unsafe run() throws Exception {
985 jsr166 1.20 java.lang.reflect.Field f = sun.misc
986     .Unsafe.class.getDeclaredField("theUnsafe");
987     f.setAccessible(true);
988     return (sun.misc.Unsafe) f.get(null);
989 jsr166 1.6 }});
990     } catch (java.security.PrivilegedActionException e) {
991 jsr166 1.17 throw new RuntimeException("Could not initialize intrinsics",
992     e.getCause());
993 jsr166 1.6 }
994     }
995     }
996 dl 1.1 }