ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.67 by jsr166, Wed Jun 8 05:12:25 2011 UTC vs.
Revision 1.68 by dl, Thu Jan 26 00:08:13 2012 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.Collection;
10 import java.util.concurrent.RejectedExecutionException;
11
9   /**
10   * A thread managed by a {@link ForkJoinPool}, which executes
11   * {@link ForkJoinTask}s.
# Line 25 | Line 22 | import java.util.concurrent.RejectedExec
22   */
23   public class ForkJoinWorkerThread extends Thread {
24      /*
28     * Overview:
29     *
25       * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
26 <     * ForkJoinTasks. This class includes bookkeeping in support of
27 <     * worker activation, suspension, and lifecycle control described
33 <     * in more detail in the internal documentation of class
34 <     * ForkJoinPool. And as described further below, this class also
35 <     * includes special-cased support for some ForkJoinTask
36 <     * methods. But the main mechanics involve work-stealing:
37 <     *
38 <     * Work-stealing queues are special forms of Deques that support
39 <     * only three of the four possible end-operations -- push, pop,
40 <     * and deq (aka steal), under the further constraints that push
41 <     * and pop are called only from the owning thread, while deq may
42 <     * be called from other threads.  (If you are unfamiliar with
43 <     * them, you probably want to read Herlihy and Shavit's book "The
44 <     * Art of Multiprocessor programming", chapter 16 describing these
45 <     * in more detail before proceeding.)  The main work-stealing
46 <     * queue design is roughly similar to those in the papers "Dynamic
47 <     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
48 <     * (http://research.sun.com/scalable/pubs/index.html) and
49 <     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
50 <     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
51 <     * The main differences ultimately stem from gc requirements that
52 <     * we null out taken slots as soon as we can, to maintain as small
53 <     * a footprint as possible even in programs generating huge
54 <     * numbers of tasks. To accomplish this, we shift the CAS
55 <     * arbitrating pop vs deq (steal) from being on the indices
56 <     * ("queueBase" and "queueTop") to the slots themselves (mainly
57 <     * via method "casSlotNull()"). So, both a successful pop and deq
58 <     * mainly entail a CAS of a slot from non-null to null.  Because
59 <     * we rely on CASes of references, we do not need tag bits on
60 <     * queueBase or queueTop.  They are simple ints as used in any
61 <     * circular array-based queue (see for example ArrayDeque).
62 <     * Updates to the indices must still be ordered in a way that
63 <     * guarantees that queueTop == queueBase means the queue is empty,
64 <     * but otherwise may err on the side of possibly making the queue
65 <     * appear nonempty when a push, pop, or deq have not fully
66 <     * committed. Note that this means that the deq operation,
67 <     * considered individually, is not wait-free. One thief cannot
68 <     * successfully continue until another in-progress one (or, if
69 <     * previously empty, a push) completes.  However, in the
70 <     * aggregate, we ensure at least probabilistic non-blockingness.
71 <     * If an attempted steal fails, a thief always chooses a different
72 <     * random victim target to try next. So, in order for one thief to
73 <     * progress, it suffices for any in-progress deq or new push on
74 <     * any empty queue to complete.
75 <     *
76 <     * This approach also enables support for "async mode" where local
77 <     * task processing is in FIFO, not LIFO order; simply by using a
78 <     * version of deq rather than pop when locallyFifo is true (as set
79 <     * by the ForkJoinPool).  This allows use in message-passing
80 <     * frameworks in which tasks are never joined.  However neither
81 <     * mode considers affinities, loads, cache localities, etc, so
82 <     * rarely provide the best possible performance on a given
83 <     * machine, but portably provide good throughput by averaging over
84 <     * these factors.  (Further, even if we did try to use such
85 <     * information, we do not usually have a basis for exploiting
86 <     * it. For example, some sets of tasks profit from cache
87 <     * affinities, but others are harmed by cache pollution effects.)
88 <     *
89 <     * When a worker would otherwise be blocked waiting to join a
90 <     * task, it first tries a form of linear helping: Each worker
91 <     * records (in field currentSteal) the most recent task it stole
92 <     * from some other worker. Plus, it records (in field currentJoin)
93 <     * the task it is currently actively joining. Method joinTask uses
94 <     * these markers to try to find a worker to help (i.e., steal back
95 <     * a task from and execute it) that could hasten completion of the
96 <     * actively joined task. In essence, the joiner executes a task
97 <     * that would be on its own local deque had the to-be-joined task
98 <     * not been stolen. This may be seen as a conservative variant of
99 <     * the approach in Wagner & Calder "Leapfrogging: a portable
100 <     * technique for implementing efficient futures" SIGPLAN Notices,
101 <     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
102 <     * in that: (1) We only maintain dependency links across workers
103 <     * upon steals, rather than use per-task bookkeeping.  This may
104 <     * require a linear scan of workers array to locate stealers, but
105 <     * usually doesn't because stealers leave hints (that may become
106 <     * stale/wrong) of where to locate them. This isolates cost to
107 <     * when it is needed, rather than adding to per-task overhead.
108 <     * (2) It is "shallow", ignoring nesting and potentially cyclic
109 <     * mutual steals.  (3) It is intentionally racy: field currentJoin
110 <     * is updated only while actively joining, which means that we
111 <     * miss links in the chain during long-lived tasks, GC stalls etc
112 <     * (which is OK since blocking in such cases is usually a good
113 <     * idea).  (4) We bound the number of attempts to find work (see
114 <     * MAX_HELP) and fall back to suspending the worker and if
115 <     * necessary replacing it with another.
116 <     *
117 <     * Efficient implementation of these algorithms currently relies
118 <     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
119 <     * correct orderings, reads and writes of variable queueBase
120 <     * require volatile ordering.  Variable queueTop need not be
121 <     * volatile because non-local reads always follow those of
122 <     * queueBase.  Similarly, because they are protected by volatile
123 <     * queueBase reads, reads of the queue array and its slots by
124 <     * other threads do not need volatile load semantics, but writes
125 <     * (in push) require store order and CASes (in pop and deq)
126 <     * require (volatile) CAS semantics.  (Michael, Saraswat, and
127 <     * Vechev's algorithm has similar properties, but without support
128 <     * for nulling slots.)  Since these combinations aren't supported
129 <     * using ordinary volatiles, the only way to accomplish these
130 <     * efficiently is to use direct Unsafe calls. (Using external
131 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
132 <     * array is significantly slower because of memory locality and
133 <     * indirection effects.)
134 <     *
135 <     * Further, performance on most platforms is very sensitive to
136 <     * placement and sizing of the (resizable) queue array.  Even
137 <     * though these queues don't usually become all that big, the
138 <     * initial size must be large enough to counteract cache
139 <     * contention effects across multiple queues (especially in the
140 <     * presence of GC cardmarking). Also, to improve thread-locality,
141 <     * queues are initialized after starting.
142 <     */
143 <
144 <    /**
145 <     * Mask for pool indices encoded as shorts
146 <     */
147 <    private static final int  SMASK  = 0xffff;
148 <
149 <    /**
150 <     * Capacity of work-stealing queue array upon initialization.
151 <     * Must be a power of two. Initial size must be at least 4, but is
152 <     * padded to minimize cache effects.
153 <     */
154 <    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
155 <
156 <    /**
157 <     * Maximum size for queue array. Must be a power of two
158 <     * less than or equal to 1 << (31 - width of array entry) to
159 <     * ensure lack of index wraparound, but is capped at a lower
160 <     * value to help users trap runaway computations.
161 <     */
162 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
163 <
164 <    /**
165 <     * The work-stealing queue array. Size must be a power of two.
166 <     * Initialized when started (as opposed to when constructed), to
167 <     * improve memory locality.
168 <     */
169 <    ForkJoinTask<?>[] queue;
170 <
171 <    /**
172 <     * The pool this thread works in. Accessed directly by ForkJoinTask.
173 <     */
174 <    final ForkJoinPool pool;
175 <
176 <    /**
177 <     * Index (mod queue.length) of next queue slot to push to or pop
178 <     * from. It is written only by owner thread, and accessed by other
179 <     * threads only after reading (volatile) queueBase.  Both queueTop
180 <     * and queueBase are allowed to wrap around on overflow, but
181 <     * (queueTop - queueBase) still estimates size.
182 <     */
183 <    int queueTop;
184 <
185 <    /**
186 <     * Index (mod queue.length) of least valid queue slot, which is
187 <     * always the next position to steal from if nonempty.
188 <     */
189 <    volatile int queueBase;
190 <
191 <    /**
192 <     * The index of most recent stealer, used as a hint to avoid
193 <     * traversal in method helpJoinTask. This is only a hint because a
194 <     * worker might have had multiple steals and this only holds one
195 <     * of them (usually the most current). Declared non-volatile,
196 <     * relying on other prevailing sync to keep reasonably current.
197 <     */
198 <    int stealHint;
199 <
200 <    /**
201 <     * Index of this worker in pool array. Set once by pool before
202 <     * running, and accessed directly by pool to locate this worker in
203 <     * its workers array.
204 <     */
205 <    final int poolIndex;
206 <
207 <    /**
208 <     * Encoded record for pool task waits. Usages are always
209 <     * surrounded by volatile reads/writes
210 <     */
211 <    int nextWait;
212 <
213 <    /**
214 <     * Complement of poolIndex, offset by count of entries of task
215 <     * waits. Accessed by ForkJoinPool to manage event waiters.
216 <     */
217 <    volatile int eventCount;
218 <
219 <    /**
220 <     * Seed for random number generator for choosing steal victims.
221 <     * Uses Marsaglia xorshift. Must be initialized as nonzero.
222 <     */
223 <    int seed;
224 <
225 <    /**
226 <     * Number of steals. Directly accessed (and reset) by pool when
227 <     * idle.
228 <     */
229 <    int stealCount;
230 <
231 <    /**
232 <     * True if this worker should or did terminate
233 <     */
234 <    volatile boolean terminate;
235 <
236 <    /**
237 <     * Set to true before LockSupport.park; false on return
238 <     */
239 <    volatile boolean parked;
240 <
241 <    /**
242 <     * True if use local fifo, not default lifo, for local polling.
243 <     * Shadows value from ForkJoinPool.
244 <     */
245 <    final boolean locallyFifo;
246 <
247 <    /**
248 <     * The task most recently stolen from another worker (or
249 <     * submission queue).  All uses are surrounded by enough volatile
250 <     * reads/writes to maintain as non-volatile.
26 >     * ForkJoinTasks. For explanation, see the internal documentation
27 >     * of class ForkJoinPool.
28       */
252    ForkJoinTask<?> currentSteal;
29  
30 <    /**
31 <     * The task currently being joined, set only when actively trying
256 <     * to help other stealers in helpJoinTask. All uses are surrounded
257 <     * by enough volatile reads/writes to maintain as non-volatile.
258 <     */
259 <    ForkJoinTask<?> currentJoin;
30 >    final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics
31 >    final ForkJoinPool pool;                // the pool this thread works in
32  
33      /**
34       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 266 | Line 38 | public class ForkJoinWorkerThread extend
38       */
39      protected ForkJoinWorkerThread(ForkJoinPool pool) {
40          super(pool.nextWorkerName());
41 <        this.pool = pool;
270 <        int k = pool.registerWorker(this);
271 <        poolIndex = k;
272 <        eventCount = ~k & SMASK; // clear wait count
273 <        locallyFifo = pool.locallyFifo;
41 >        setDaemon(true);
42          Thread.UncaughtExceptionHandler ueh = pool.ueh;
43          if (ueh != null)
44              setUncaughtExceptionHandler(ueh);
45 <        setDaemon(true);
45 >        this.pool = pool;
46 >        this.workQueue = new ForkJoinPool.WorkQueue(this, pool.localMode);
47 >        pool.registerWorker(this);
48      }
49  
280    // Public methods
281
50      /**
51       * Returns the pool hosting this thread.
52       *
# Line 298 | Line 66 | public class ForkJoinWorkerThread extend
66       * @return the index number
67       */
68      public int getPoolIndex() {
69 <        return poolIndex;
69 >        return workQueue.poolIndex;
70      }
71  
304    // Randomization
305
306    /**
307     * Computes next value for random victim probes and backoffs.
308     * Scans don't require a very high quality generator, but also not
309     * a crummy one.  Marsaglia xor-shift is cheap and works well
310     * enough.  Note: This is manually inlined in FJP.scan() to avoid
311     * writes inside busy loops.
312     */
313    private int nextSeed() {
314        int r = seed;
315        r ^= r << 13;
316        r ^= r >>> 17;
317        r ^= r << 5;
318        return seed = r;
319    }
320
321    // Run State management
322
72      /**
73       * Initializes internal state after construction but before
74       * processing any tasks. If you override this method, you must
# Line 330 | Line 79 | public class ForkJoinWorkerThread extend
79       * processing tasks.
80       */
81      protected void onStart() {
333        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
334        int r = ForkJoinPool.workerSeedGenerator.nextInt();
335        seed = (r == 0) ? 1 : r; //  must be nonzero
82      }
83  
84      /**
# Line 344 | Line 90 | public class ForkJoinWorkerThread extend
90       * to an unrecoverable error, or {@code null} if completed normally
91       */
92      protected void onTermination(Throwable exception) {
347        try {
348            terminate = true;
349            cancelTasks();
350            pool.deregisterWorker(this, exception);
351        } catch (Throwable ex) {        // Shouldn't ever happen
352            if (exception == null)      // but if so, at least rethrown
353                exception = ex;
354        } finally {
355            if (exception != null)
356                UNSAFE.throwException(exception);
357        }
93      }
94  
95      /**
# Line 366 | Line 101 | public class ForkJoinWorkerThread extend
101          Throwable exception = null;
102          try {
103              onStart();
104 <            pool.work(this);
104 >            pool.runWorker(this);
105          } catch (Throwable ex) {
106              exception = ex;
107          } finally {
373            onTermination(exception);
374        }
375    }
376
377    /*
378     * Intrinsics-based atomic writes for queue slots. These are
379     * basically the same as methods in AtomicReferenceArray, but
380     * specialized for (1) ForkJoinTask elements (2) requirement that
381     * nullness and bounds checks have already been performed by
382     * callers and (3) effective offsets are known not to overflow
383     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
384     * need corresponding version for reads: plain array reads are OK
385     * because they are protected by other volatile reads and are
386     * confirmed by CASes.
387     *
388     * Most uses don't actually call these methods, but instead
389     * contain inlined forms that enable more predictable
390     * optimization.  We don't define the version of write used in
391     * pushTask at all, but instead inline there a store-fenced array
392     * slot write.
393     *
394     * Also in most methods, as a performance (not correctness) issue,
395     * we'd like to encourage compilers not to arbitrarily postpone
396     * setting queueTop after writing slot.  Currently there is no
397     * intrinsic for arranging this, but using Unsafe putOrderedInt
398     * may be a preferable strategy on some compilers even though its
399     * main effect is a pre-, not post- fence. To simplify possible
400     * changes, the option is left in comments next to the associated
401     * assignments.
402     */
403
404    /**
405     * CASes slot i of array q from t to null. Caller must ensure q is
406     * non-null and index is in range.
407     */
408    private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
409                                             ForkJoinTask<?> t) {
410        return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
411    }
412
413    /**
414     * Performs a volatile write of the given task at given slot of
415     * array q.  Caller must ensure q is non-null and index is in
416     * range. This method is used only during resets and backouts.
417     */
418    private static final void writeSlot(ForkJoinTask<?>[] q, int i,
419                                        ForkJoinTask<?> t) {
420        UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
421    }
422
423    // queue methods
424
425    /**
426     * Pushes a task. Call only from this thread.
427     *
428     * @param t the task. Caller must ensure non-null.
429     */
430    final void pushTask(ForkJoinTask<?> t) {
431        ForkJoinTask<?>[] q; int s, m;
432        if ((q = queue) != null) {    // ignore if queue removed
433            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
434            UNSAFE.putOrderedObject(q, u, t);
435            queueTop = s + 1;         // or use putOrderedInt
436            if ((s -= queueBase) <= 2)
437                pool.signalWork();
438            else if (s == m)
439                growQueue();
440        }
441    }
442
443    /**
444     * Creates or doubles queue array.  Transfers elements by
445     * emulating steals (deqs) from old array and placing, oldest
446     * first, into new array.
447     */
448    private void growQueue() {
449        ForkJoinTask<?>[] oldQ = queue;
450        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
451        if (size > MAXIMUM_QUEUE_CAPACITY)
452            throw new RejectedExecutionException("Queue capacity exceeded");
453        if (size < INITIAL_QUEUE_CAPACITY)
454            size = INITIAL_QUEUE_CAPACITY;
455        ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
456        int mask = size - 1;
457        int top = queueTop;
458        int oldMask;
459        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
460            for (int b = queueBase; b != top; ++b) {
461                long u = ((b & oldMask) << ASHIFT) + ABASE;
462                Object x = UNSAFE.getObjectVolatile(oldQ, u);
463                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
464                    UNSAFE.putObjectVolatile
465                        (q, ((b & mask) << ASHIFT) + ABASE, x);
466            }
467        }
468    }
469
470    /**
471     * Tries to take a task from the base of the queue, failing if
472     * empty or contended. Note: Specializations of this code appear
473     * in locallyDeqTask and elsewhere.
474     *
475     * @return a task, or null if none or contended
476     */
477    final ForkJoinTask<?> deqTask() {
478        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
479        if (queueTop != (b = queueBase) &&
480            (q = queue) != null && // must read q after b
481            (i = (q.length - 1) & b) >= 0 &&
482            (t = q[i]) != null && queueBase == b &&
483            UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
484            queueBase = b + 1;
485            return t;
486        }
487        return null;
488    }
489
490    /**
491     * Tries to take a task from the base of own queue.  Called only
492     * by this thread.
493     *
494     * @return a task, or null if none
495     */
496    final ForkJoinTask<?> locallyDeqTask() {
497        ForkJoinTask<?> t; int m, b, i;
498        ForkJoinTask<?>[] q = queue;
499        if (q != null && (m = q.length - 1) >= 0) {
500            while (queueTop != (b = queueBase)) {
501                if ((t = q[i = m & b]) != null &&
502                    queueBase == b &&
503                    UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
504                                                t, null)) {
505                    queueBase = b + 1;
506                    return t;
507                }
508            }
509        }
510        return null;
511    }
512
513    /**
514     * Returns a popped task, or null if empty.
515     * Called only by this thread.
516     */
517    private ForkJoinTask<?> popTask() {
518        int m;
519        ForkJoinTask<?>[] q = queue;
520        if (q != null && (m = q.length - 1) >= 0) {
521            for (int s; (s = queueTop) != queueBase;) {
522                int i = m & --s;
523                long u = (i << ASHIFT) + ABASE; // raw offset
524                ForkJoinTask<?> t = q[i];
525                if (t == null)   // lost to stealer
526                    break;
527                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
528                    queueTop = s; // or putOrderedInt
529                    return t;
530                }
531            }
532        }
533        return null;
534    }
535
536    /**
537     * Specialized version of popTask to pop only if topmost element
538     * is the given task. Called only by this thread.
539     *
540     * @param t the task. Caller must ensure non-null.
541     */
542    final boolean unpushTask(ForkJoinTask<?> t) {
543        ForkJoinTask<?>[] q;
544        int s;
545        if ((q = queue) != null && (s = queueTop) != queueBase &&
546            UNSAFE.compareAndSwapObject
547            (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
548            queueTop = s; // or putOrderedInt
549            return true;
550        }
551        return false;
552    }
553
554    /**
555     * Returns next task, or null if empty or contended.
556     */
557    final ForkJoinTask<?> peekTask() {
558        int m;
559        ForkJoinTask<?>[] q = queue;
560        if (q == null || (m = q.length - 1) < 0)
561            return null;
562        int i = locallyFifo ? queueBase : (queueTop - 1);
563        return q[i & m];
564    }
565
566    // Support methods for ForkJoinPool
567
568    /**
569     * Runs the given task, plus any local tasks until queue is empty
570     */
571    final void execTask(ForkJoinTask<?> t) {
572        currentSteal = t;
573        for (;;) {
574            if (t != null)
575                t.doExec();
576            if (queueTop == queueBase)
577                break;
578            t = locallyFifo ? locallyDeqTask() : popTask();
579        }
580        ++stealCount;
581        currentSteal = null;
582    }
583
584    /**
585     * Removes and cancels all tasks in queue.  Can be called from any
586     * thread.
587     */
588    final void cancelTasks() {
589        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
590        if (cj != null && cj.status >= 0)
591            cj.cancelIgnoringExceptions();
592        ForkJoinTask<?> cs = currentSteal;
593        if (cs != null && cs.status >= 0)
594            cs.cancelIgnoringExceptions();
595        while (queueBase != queueTop) {
596            ForkJoinTask<?> t = deqTask();
597            if (t != null)
598                t.cancelIgnoringExceptions();
599        }
600    }
601
602    /**
603     * Drains tasks to given collection c.
604     *
605     * @return the number of tasks drained
606     */
607    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
608        int n = 0;
609        while (queueBase != queueTop) {
610            ForkJoinTask<?> t = deqTask();
611            if (t != null) {
612                c.add(t);
613                ++n;
614            }
615        }
616        return n;
617    }
618
619    // Support methods for ForkJoinTask
620
621    /**
622     * Returns an estimate of the number of tasks in the queue.
623     */
624    final int getQueueSize() {
625        return queueTop - queueBase;
626    }
627
628    /**
629     * Gets and removes a local task.
630     *
631     * @return a task, if available
632     */
633    final ForkJoinTask<?> pollLocalTask() {
634        return locallyFifo ? locallyDeqTask() : popTask();
635    }
636
637    /**
638     * Gets and removes a local or stolen task.
639     *
640     * @return a task, if available
641     */
642    final ForkJoinTask<?> pollTask() {
643        ForkJoinWorkerThread[] ws;
644        ForkJoinTask<?> t = pollLocalTask();
645        if (t != null || (ws = pool.workers) == null)
646            return t;
647        int n = ws.length; // cheap version of FJP.scan
648        int steps = n << 1;
649        int r = nextSeed();
650        int i = 0;
651        while (i < steps) {
652            ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
653            if (w != null && w.queueBase != w.queueTop && w.queue != null) {
654                if ((t = w.deqTask()) != null)
655                    return t;
656                i = 0;
657            }
658        }
659        return null;
660    }
661
662    /**
663     * The maximum stolen->joining link depth allowed in helpJoinTask,
664     * as well as the maximum number of retries (allowing on average
665     * one staleness retry per level) per attempt to instead try
666     * compensation.  Depths for legitimate chains are unbounded, but
667     * we use a fixed constant to avoid (otherwise unchecked) cycles
668     * and bound staleness of traversal parameters at the expense of
669     * sometimes blocking when we could be helping.
670     */
671    private static final int MAX_HELP = 16;
672
673    /**
674     * Possibly runs some tasks and/or blocks, until joinMe is done.
675     *
676     * @param joinMe the task to join
677     * @return completion status on exit
678     */
679    final int joinTask(ForkJoinTask<?> joinMe) {
680        ForkJoinTask<?> prevJoin = currentJoin;
681        currentJoin = joinMe;
682        for (int s, retries = MAX_HELP;;) {
683            if ((s = joinMe.status) < 0) {
684                currentJoin = prevJoin;
685                return s;
686            }
687            if (retries > 0) {
688                if (queueTop != queueBase) {
689                    if (!localHelpJoinTask(joinMe))
690                        retries = 0;           // cannot help
691                }
692                else if (retries == MAX_HELP >>> 1) {
693                    --retries;                 // check uncommon case
694                    if (tryDeqAndExec(joinMe) >= 0)
695                        Thread.yield();        // for politeness
696                }
697                else
698                    retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
699            }
700            else {
701                retries = MAX_HELP;           // restart if not done
702                pool.tryAwaitJoin(joinMe);
703            }
704        }
705    }
706
707    /**
708     * If present, pops and executes the given task, or any other
709     * cancelled task
710     *
711     * @return false if any other non-cancelled task exists in local queue
712     */
713    private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
714        int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
715        if ((s = queueTop) != queueBase && (q = queue) != null &&
716            (i = (q.length - 1) & --s) >= 0 &&
717            (t = q[i]) != null) {
718            if (t != joinMe && t.status >= 0)
719                return false;
720            if (UNSAFE.compareAndSwapObject
721                (q, (i << ASHIFT) + ABASE, t, null)) {
722                queueTop = s;           // or putOrderedInt
723                t.doExec();
724            }
725        }
726        return true;
727    }
728
729    /**
730     * Tries to locate and execute tasks for a stealer of the given
731     * task, or in turn one of its stealers, Traces
732     * currentSteal->currentJoin links looking for a thread working on
733     * a descendant of the given task and with a non-empty queue to
734     * steal back and execute tasks from.  The implementation is very
735     * branchy to cope with potential inconsistencies or loops
736     * encountering chains that are stale, unknown, or of length
737     * greater than MAX_HELP links.  All of these cases are dealt with
738     * by just retrying by caller.
739     *
740     * @param joinMe the task to join
741     * @param canSteal true if local queue is empty
742     * @return true if ran a task
743     */
744    private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
745        boolean helped = false;
746        int m = pool.scanGuard & SMASK;
747        ForkJoinWorkerThread[] ws = pool.workers;
748        if (ws != null && ws.length > m && joinMe.status >= 0) {
749            int levels = MAX_HELP;              // remaining chain length
750            ForkJoinTask<?> task = joinMe;      // base of chain
751            outer:for (ForkJoinWorkerThread thread = this;;) {
752                // Try to find v, the stealer of task, by first using hint
753                ForkJoinWorkerThread v = ws[thread.stealHint & m];
754                if (v == null || v.currentSteal != task) {
755                    for (int j = 0; ;) {        // search array
756                        if ((v = ws[j]) != null && v.currentSteal == task) {
757                            thread.stealHint = j;
758                            break;              // save hint for next time
759                        }
760                        if (++j > m)
761                            break outer;        // can't find stealer
762                    }
763                }
764                // Try to help v, using specialized form of deqTask
765                for (;;) {
766                    ForkJoinTask<?>[] q; int b, i;
767                    if (joinMe.status < 0)
768                        break outer;
769                    if ((b = v.queueBase) == v.queueTop ||
770                        (q = v.queue) == null ||
771                        (i = (q.length-1) & b) < 0)
772                        break;                  // empty
773                    long u = (i << ASHIFT) + ABASE;
774                    ForkJoinTask<?> t = q[i];
775                    if (task.status < 0)
776                        break outer;            // stale
777                    if (t != null && v.queueBase == b &&
778                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
779                        v.queueBase = b + 1;
780                        v.stealHint = poolIndex;
781                        ForkJoinTask<?> ps = currentSteal;
782                        currentSteal = t;
783                        t.doExec();
784                        currentSteal = ps;
785                        helped = true;
786                    }
787                }
788                // Try to descend to find v's stealer
789                ForkJoinTask<?> next = v.currentJoin;
790                if (--levels > 0 && task.status >= 0 &&
791                    next != null && next != task) {
792                    task = next;
793                    thread = v;
794                }
795                else
796                    break;  // max levels, stale, dead-end, or cyclic
797            }
798        }
799        return helped;
800    }
801
802    /**
803     * Performs an uncommon case for joinTask: If task t is at base of
804     * some workers queue, steals and executes it.
805     *
806     * @param t the task
807     * @return t's status
808     */
809    private int tryDeqAndExec(ForkJoinTask<?> t) {
810        int m = pool.scanGuard & SMASK;
811        ForkJoinWorkerThread[] ws = pool.workers;
812        if (ws != null && ws.length > m && t.status >= 0) {
813            for (int j = 0; j <= m; ++j) {
814                ForkJoinTask<?>[] q; int b, i;
815                ForkJoinWorkerThread v = ws[j];
816                if (v != null &&
817                    (b = v.queueBase) != v.queueTop &&
818                    (q = v.queue) != null &&
819                    (i = (q.length - 1) & b) >= 0 &&
820                    q[i] ==  t) {
821                    long u = (i << ASHIFT) + ABASE;
822                    if (v.queueBase == b &&
823                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
824                        v.queueBase = b + 1;
825                        v.stealHint = poolIndex;
826                        ForkJoinTask<?> ps = currentSteal;
827                        currentSteal = t;
828                        t.doExec();
829                        currentSteal = ps;
830                    }
831                    break;
832                }
833            }
834        }
835        return t.status;
836    }
837
838    /**
839     * Implements ForkJoinTask.getSurplusQueuedTaskCount().  Returns
840     * an estimate of the number of tasks, offset by a function of
841     * number of idle workers.
842     *
843     * This method provides a cheap heuristic guide for task
844     * partitioning when programmers, frameworks, tools, or languages
845     * have little or no idea about task granularity.  In essence by
846     * offering this method, we ask users only about tradeoffs in
847     * overhead vs expected throughput and its variance, rather than
848     * how finely to partition tasks.
849     *
850     * In a steady state strict (tree-structured) computation, each
851     * thread makes available for stealing enough tasks for other
852     * threads to remain active. Inductively, if all threads play by
853     * the same rules, each thread should make available only a
854     * constant number of tasks.
855     *
856     * The minimum useful constant is just 1. But using a value of 1
857     * would require immediate replenishment upon each steal to
858     * maintain enough tasks, which is infeasible.  Further,
859     * partitionings/granularities of offered tasks should minimize
860     * steal rates, which in general means that threads nearer the top
861     * of computation tree should generate more than those nearer the
862     * bottom. In perfect steady state, each thread is at
863     * approximately the same level of computation tree. However,
864     * producing extra tasks amortizes the uncertainty of progress and
865     * diffusion assumptions.
866     *
867     * So, users will want to use values larger, but not much larger
868     * than 1 to both smooth over transient shortages and hedge
869     * against uneven progress; as traded off against the cost of
870     * extra task overhead. We leave the user to pick a threshold
871     * value to compare with the results of this call to guide
872     * decisions, but recommend values such as 3.
873     *
874     * When all threads are active, it is on average OK to estimate
875     * surplus strictly locally. In steady-state, if one thread is
876     * maintaining say 2 surplus tasks, then so are others. So we can
877     * just use estimated queue length (although note that (queueTop -
878     * queueBase) can be an overestimate because of stealers lagging
879     * increments of queueBase).  However, this strategy alone leads
880     * to serious mis-estimates in some non-steady-state conditions
881     * (ramp-up, ramp-down, other stalls). We can detect many of these
882     * by further considering the number of "idle" threads, that are
883     * known to have zero queued tasks, so compensate by a factor of
884     * (#idle/#active) threads.
885     */
886    final int getEstimatedSurplusTaskCount() {
887        return queueTop - queueBase - pool.idlePerActive();
888    }
889
890    /**
891     * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
892     * pool's active count ctl maintenance, but rather than blocking
893     * when tasks cannot be found, we rescan until all others cannot
894     * find tasks either. The bracketing by pool quiescerCounts
895     * updates suppresses pool auto-shutdown mechanics that could
896     * otherwise prematurely terminate the pool because all threads
897     * appear to be inactive.
898     */
899    final void helpQuiescePool() {
900        boolean active = true;
901        ForkJoinTask<?> ps = currentSteal; // to restore below
902        ForkJoinPool p = pool;
903        p.addQuiescerCount(1);
904        for (;;) {
905            ForkJoinWorkerThread[] ws = p.workers;
906            ForkJoinWorkerThread v = null;
907            int n;
908            if (queueTop != queueBase)
909                v = this;
910            else if (ws != null && (n = ws.length) > 1) {
911                ForkJoinWorkerThread w;
912                int r = nextSeed(); // cheap version of FJP.scan
913                int steps = n << 1;
914                for (int i = 0; i < steps; ++i) {
915                    if ((w = ws[(i + r) & (n - 1)]) != null &&
916                        w.queueBase != w.queueTop) {
917                        v = w;
918                        break;
919                    }
920                }
921            }
922            if (v != null) {
923                ForkJoinTask<?> t;
924                if (!active) {
925                    active = true;
926                    p.addActiveCount(1);
927                }
928                if ((t = (v != this) ? v.deqTask() :
929                     locallyFifo ? locallyDeqTask() : popTask()) != null) {
930                    currentSteal = t;
931                    t.doExec();
932                    currentSteal = ps;
933                }
934            }
935            else {
936                if (active) {
937                    active = false;
938                    p.addActiveCount(-1);
939                }
940                if (p.isQuiescent()) {
941                    p.addActiveCount(1);
942                    p.addQuiescerCount(-1);
943                    break;
944                }
945            }
946        }
947    }
948
949    // Unsafe mechanics
950    private static final sun.misc.Unsafe UNSAFE;
951    private static final long ABASE;
952    private static final int ASHIFT;
953
954    static {
955        int s;
956        try {
957            UNSAFE = getUnsafe();
958            Class<?> a = ForkJoinTask[].class;
959            ABASE = UNSAFE.arrayBaseOffset(a);
960            s = UNSAFE.arrayIndexScale(a);
961        } catch (Exception e) {
962            throw new Error(e);
963        }
964        if ((s & (s-1)) != 0)
965            throw new Error("data type scale not a power of two");
966        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
967    }
968
969    /**
970     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
971     * Replace with a simple call to Unsafe.getUnsafe when integrating
972     * into a jdk.
973     *
974     * @return a sun.misc.Unsafe
975     */
976    private static sun.misc.Unsafe getUnsafe() {
977        try {
978            return sun.misc.Unsafe.getUnsafe();
979        } catch (SecurityException se) {
108              try {
109 <                return java.security.AccessController.doPrivileged
110 <                    (new java.security
111 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
112 <                        public sun.misc.Unsafe run() throws Exception {
113 <                            java.lang.reflect.Field f = sun.misc
114 <                                .Unsafe.class.getDeclaredField("theUnsafe");
987 <                            f.setAccessible(true);
988 <                            return (sun.misc.Unsafe) f.get(null);
989 <                        }});
990 <            } catch (java.security.PrivilegedActionException e) {
991 <                throw new RuntimeException("Could not initialize intrinsics",
992 <                                           e.getCause());
109 >                onTermination(exception);
110 >            } catch (Throwable ex) {
111 >                if (exception == null)
112 >                    exception = ex;
113 >            } finally {
114 >                pool.deregisterWorker(this, exception);
115              }
116          }
117      }
118   }
119 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines