ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.2 by dl, Wed Jan 7 16:07:37 2009 UTC vs.
Revision 1.27 by dl, Mon Aug 3 13:01:15 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 < import java.util.*;
8 >
9   import java.util.concurrent.*;
10 < import java.util.concurrent.atomic.*;
11 < import java.util.concurrent.locks.*;
12 < import sun.misc.Unsafe;
13 < import java.lang.reflect.*;
10 >
11 > import java.util.Collection;
12  
13   /**
14   * A thread managed by a {@link ForkJoinPool}.  This class is
15   * subclassable solely for the sake of adding functionality -- there
16 < * are no overridable methods dealing with scheduling or
17 < * execution. However, you can override initialization and termination
18 < * cleanup methods surrounding the main task processing loop.  If you
19 < * do create such a subclass, you will also need to supply a custom
20 < * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
21 < *
22 < * <p>This class also provides methods for generating per-thread
23 < * random numbers, with the same properties as {@link
24 < * java.util.Random} but with each generator isolated from those of
27 < * other threads.
16 > * are no overridable methods dealing with scheduling or execution.
17 > * However, you can override initialization and termination methods
18 > * surrounding the main task processing loop.  If you do create such a
19 > * subclass, you will also need to supply a custom {@link
20 > * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
21 > * ForkJoinPool}.
22 > *
23 > * @since 1.7
24 > * @author Doug Lea
25   */
26   public class ForkJoinWorkerThread extends Thread {
27      /*
# Line 48 | Line 45 | public class ForkJoinWorkerThread extend
45       * of tasks. To accomplish this, we shift the CAS arbitrating pop
46       * vs deq (steal) from being on the indices ("base" and "sp") to
47       * the slots themselves (mainly via method "casSlotNull()"). So,
48 <     * both a successful pop and deq mainly entail CAS'ing a nonnull
48 >     * both a successful pop and deq mainly entail CAS'ing a non-null
49       * slot to null.  Because we rely on CASes of references, we do
50       * not need tag bits on base or sp.  They are simple ints as used
51       * in any circular array-based queue (see for example ArrayDeque).
# Line 60 | Line 57 | public class ForkJoinWorkerThread extend
57       * considered individually, is not wait-free. One thief cannot
58       * successfully continue until another in-progress one (or, if
59       * previously empty, a push) completes.  However, in the
60 <     * aggregate, we ensure at least probablistic non-blockingness. If
60 >     * aggregate, we ensure at least probabilistic non-blockingness. If
61       * an attempted steal fails, a thief always chooses a different
62       * random victim target to try next. So, in order for one thief to
63       * progress, it suffices for any in-progress deq or new push on
# Line 69 | Line 66 | public class ForkJoinWorkerThread extend
66       * which gives threads a chance to activate if necessary before
67       * stealing (see below).
68       *
69 +     * This approach also enables support for "async mode" where local
70 +     * task processing is in FIFO, not LIFO order; simply by using a
71 +     * version of deq rather than pop when locallyFifo is true (as set
72 +     * by the ForkJoinPool).  This allows use in message-passing
73 +     * frameworks in which tasks are never joined.
74 +     *
75       * Efficient implementation of this approach currently relies on
76       * an uncomfortable amount of "Unsafe" mechanics. To maintain
77       * correct orderings, reads and writes of variable base require
# Line 79 | Line 82 | public class ForkJoinWorkerThread extend
82       * push) require store order and CASes (in pop and deq) require
83       * (volatile) CAS semantics. Since these combinations aren't
84       * supported using ordinary volatiles, the only way to accomplish
85 <     * these effciently is to use direct Unsafe calls. (Using external
85 >     * these efficiently is to use direct Unsafe calls. (Using external
86       * AtomicIntegers and AtomicReferenceArrays for the indices and
87       * array is significantly slower because of memory locality and
88       * indirection effects.) Further, performance on most platforms is
# Line 134 | Line 137 | public class ForkJoinWorkerThread extend
137  
138      /**
139       * Maximum work-stealing queue array size.  Must be less than or
140 <     * equal to 1 << 30 to ensure lack of index wraparound.
140 >     * equal to 1 << 28 to ensure lack of index wraparound. (This
141 >     * is less than usual bounds, because we need leftshift by 3
142 >     * to be in int range).
143       */
144 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30;
144 >    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
145  
146      /**
147 <     * Generator of seeds for per-thread random numbers.
147 >     * The pool this thread works in. Accessed directly by ForkJoinTask.
148       */
149 <    private static final Random randomSeedGenerator = new Random();
149 >    final ForkJoinPool pool;
150  
151      /**
152       * The work-stealing queue array. Size must be a power of two.
153 +     * Initialized when thread starts, to improve memory locality.
154       */
155      private ForkJoinTask<?>[] queue;
156  
# Line 163 | Line 169 | public class ForkJoinWorkerThread extend
169      private volatile int base;
170  
171      /**
172 <     * The pool this thread works in.
173 <     */
174 <    final ForkJoinPool pool;
175 <
170 <    /**
171 <     * Index of this worker in pool array. Set once by pool before
172 <     * running, and accessed directly by pool during cleanup etc
172 >     * Activity status. When true, this worker is considered active.
173 >     * Must be false upon construction. It must be true when executing
174 >     * tasks, and BEFORE stealing a task. It must be false before
175 >     * calling pool.sync.
176       */
177 <    int poolIndex;
177 >    private boolean active;
178  
179      /**
180       * Run state of this worker. Supports simple versions of the usual
# Line 179 | Line 182 | public class ForkJoinWorkerThread extend
182       */
183      private volatile int runState;
184  
182    // Runstate values. Order matters
183    private static final int RUNNING     = 0;
184    private static final int SHUTDOWN    = 1;
185    private static final int TERMINATING = 2;
186    private static final int TERMINATED  = 3;
187
185      /**
186 <     * Activity status. When true, this worker is considered active.
187 <     * Must be false upon construction. It must be true when executing
191 <     * tasks, and BEFORE stealing a task. It must be false before
192 <     * blocking on the Pool Barrier.
186 >     * Seed for random number generator for choosing steal victims.
187 >     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
188       */
189 <    private boolean active;
189 >    private int seed;
190  
191      /**
192       * Number of steals, transferred to pool when idle
# Line 199 | Line 194 | public class ForkJoinWorkerThread extend
194      private int stealCount;
195  
196      /**
197 <     * Seed for random number generator for choosing steal victims
197 >     * Index of this worker in pool array. Set once by pool before
198 >     * running, and accessed directly by pool during cleanup etc.
199       */
200 <    private int randomVictimSeed;
200 >    int poolIndex;
201  
202      /**
203 <     * Seed for embedded Jurandom
203 >     * The last barrier event waited for. Accessed in pool callback
204 >     * methods, but only by current thread.
205       */
206 <    private long juRandomSeed;
206 >    long lastEventCount;
207  
208      /**
209 <     * The last barrier event waited for
209 >     * True if use local fifo, not default lifo, for local polling
210       */
211 <    private long eventCount;
211 >    private boolean locallyFifo;
212  
213      /**
214       * Creates a ForkJoinWorkerThread operating in the given pool.
215 +     *
216       * @param pool the pool this thread works in
217       * @throws NullPointerException if pool is null
218       */
219      protected ForkJoinWorkerThread(ForkJoinPool pool) {
220          if (pool == null) throw new NullPointerException();
221          this.pool = pool;
222 <        // remaining initialization deferred to onStart
222 >        // Note: poolIndex is set by pool during construction
223 >        // Remaining initialization is deferred to onStart
224      }
225  
226 <    // public access methods
226 >    // Public access methods
227  
228      /**
229 <     * Returns the pool hosting the current task execution.
229 >     * Returns the pool hosting this thread.
230 >     *
231       * @return the pool
232       */
233 <    public static ForkJoinPool getPool() {
234 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
233 >    public ForkJoinPool getPool() {
234 >        return pool;
235 >    }
236 >
237 >    /**
238 >     * Returns the index number of this thread in its pool.  The
239 >     * returned value ranges from zero to the maximum number of
240 >     * threads (minus one) that have ever been created in the pool.
241 >     * This method may be useful for applications that track status or
242 >     * collect results per-worker rather than per-task.
243 >     *
244 >     * @return the index number
245 >     */
246 >    public int getPoolIndex() {
247 >        return poolIndex;
248      }
249  
250      /**
251 <     * Returns the index number of the current worker thread in its
252 <     * pool.  The returned value ranges from zero to the maximum
253 <     * number of threads (minus one) that have ever been created in
254 <     * the pool.  This method may be useful for applications that
255 <     * track status or collect results on a per-worker basis.
256 <     * @return the index number.
251 >     * Establishes local first-in-first-out scheduling mode for forked
252 >     * tasks that are never joined.
253 >     *
254 >     * @param async if true, use locally FIFO scheduling
255 >     */
256 >    void setAsyncMode(boolean async) {
257 >        locallyFifo = async;
258 >    }
259 >
260 >    // Runstate management
261 >
262 >    // Runstate values. Order matters
263 >    private static final int RUNNING     = 0;
264 >    private static final int SHUTDOWN    = 1;
265 >    private static final int TERMINATING = 2;
266 >    private static final int TERMINATED  = 3;
267 >
268 >    final boolean isShutdown()    { return runState >= SHUTDOWN;  }
269 >    final boolean isTerminating() { return runState >= TERMINATING;  }
270 >    final boolean isTerminated()  { return runState == TERMINATED; }
271 >    final boolean shutdown()      { return transitionRunStateTo(SHUTDOWN); }
272 >    final boolean shutdownNow()   { return transitionRunStateTo(TERMINATING); }
273 >
274 >    /**
275 >     * Transitions to at least the given state.
276 >     *
277 >     * @return {@code true} if not already at least at given state
278       */
279 <    public static int getPoolIndex() {
280 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
279 >    private boolean transitionRunStateTo(int state) {
280 >        for (;;) {
281 >            int s = runState;
282 >            if (s >= state)
283 >                return false;
284 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
285 >                return true;
286 >        }
287      }
288  
289 <    //  Access methods used by Pool
289 >    /**
290 >     * Tries to set status to active; fails on contention.
291 >     */
292 >    private boolean tryActivate() {
293 >        if (!active) {
294 >            if (!pool.tryIncrementActiveCount())
295 >                return false;
296 >            active = true;
297 >        }
298 >        return true;
299 >    }
300  
301      /**
302 <     * Get and clear steal count for accumulation by pool.  Called
253 <     * only when known to be idle (in pool.sync and termination).
302 >     * Tries to set status to inactive; fails on contention.
303       */
304 <    final int getAndClearStealCount() {
305 <        int sc = stealCount;
306 <        stealCount = 0;
307 <        return sc;
304 >    private boolean tryInactivate() {
305 >        if (active) {
306 >            if (!pool.tryDecrementActiveCount())
307 >                return false;
308 >            active = false;
309 >        }
310 >        return true;
311      }
312  
313      /**
314 <     * Returns estimate of the number of tasks in the queue, without
315 <     * correcting for transient negative values
314 >     * Computes next value for random victim probe.  Scans don't
315 >     * require a very high quality generator, but also not a crummy
316 >     * one.  Marsaglia xor-shift is cheap and works well.
317       */
318 <    final int getRawQueueSize() {
319 <        return sp - base;
318 >    private static int xorShift(int r) {
319 >        r ^= (r << 13);
320 >        r ^= (r >>> 17);
321 >        return r ^ (r << 5);
322      }
323  
324 <    // Intrinsics-based support for queue operations.
270 <    // Currently these three (setSp, setSlot, casSlotNull) are
271 <    // usually manually inlined to improve performance
324 >    // Lifecycle methods
325  
326      /**
327 <     * Sets sp in store-order.
327 >     * This method is required to be public, but should never be
328 >     * called explicitly. It performs the main run loop to execute
329 >     * ForkJoinTasks.
330 >     */
331 >    public void run() {
332 >        Throwable exception = null;
333 >        try {
334 >            onStart();
335 >            pool.sync(this); // await first pool event
336 >            mainLoop();
337 >        } catch (Throwable ex) {
338 >            exception = ex;
339 >        } finally {
340 >            onTermination(exception);
341 >        }
342 >    }
343 >
344 >    /**
345 >     * Executes tasks until shut down.
346 >     */
347 >    private void mainLoop() {
348 >        while (!isShutdown()) {
349 >            ForkJoinTask<?> t = pollTask();
350 >            if (t != null || (t = pollSubmission()) != null)
351 >                t.quietlyExec();
352 >            else if (tryInactivate())
353 >                pool.sync(this);
354 >        }
355 >    }
356 >
357 >    /**
358 >     * Initializes internal state after construction but before
359 >     * processing any tasks. If you override this method, you must
360 >     * invoke super.onStart() at the beginning of the method.
361 >     * Initialization requires care: Most fields must have legal
362 >     * default values, to ensure that attempted accesses from other
363 >     * threads work correctly even before this thread starts
364 >     * processing tasks.
365       */
366 <    private void setSp(int s) {
367 <        _unsafe.putOrderedInt(this, spOffset, s);
366 >    protected void onStart() {
367 >        // Allocate while starting to improve chances of thread-local
368 >        // isolation
369 >        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
370 >        // Initial value of seed need not be especially random but
371 >        // should differ across workers and must be nonzero
372 >        int p = poolIndex + 1;
373 >        seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
374 >    }
375 >
376 >    /**
377 >     * Performs cleanup associated with termination of this worker
378 >     * thread.  If you override this method, you must invoke
379 >     * {@code super.onTermination} at the end of the overridden method.
380 >     *
381 >     * @param exception the exception causing this thread to abort due
382 >     * to an unrecoverable error, or {@code null} if completed normally
383 >     */
384 >    protected void onTermination(Throwable exception) {
385 >        // Execute remaining local tasks unless aborting or terminating
386 >        while (exception == null && pool.isProcessingTasks() && base != sp) {
387 >            try {
388 >                ForkJoinTask<?> t = popTask();
389 >                if (t != null)
390 >                    t.quietlyExec();
391 >            } catch (Throwable ex) {
392 >                exception = ex;
393 >            }
394 >        }
395 >        // Cancel other tasks, transition status, notify pool, and
396 >        // propagate exception to uncaught exception handler
397 >        try {
398 >            do {} while (!tryInactivate()); // ensure inactive
399 >            cancelTasks();
400 >            runState = TERMINATED;
401 >            pool.workerTerminated(this);
402 >        } catch (Throwable ex) {        // Shouldn't ever happen
403 >            if (exception == null)      // but if so, at least rethrown
404 >                exception = ex;
405 >        } finally {
406 >            if (exception != null)
407 >                ForkJoinTask.rethrowException(exception);
408 >        }
409      }
410  
411 +    // Intrinsics-based support for queue operations.
412 +
413      /**
414 <     * Add in store-order the given task at given slot of q to
415 <     * null. Caller must ensure q is nonnull and index is in range.
414 >     * Adds in store-order the given task at given slot of q to null.
415 >     * Caller must ensure q is non-null and index is in range.
416       */
417      private static void setSlot(ForkJoinTask<?>[] q, int i,
418 <                                ForkJoinTask<?> t){
419 <        _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
418 >                                ForkJoinTask<?> t) {
419 >        UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
420      }
421  
422      /**
423 <     * CAS given slot of q to null. Caller must ensure q is nonnull
423 >     * CAS given slot of q to null. Caller must ensure q is non-null
424       * and index is in range.
425       */
426      private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
427                                         ForkJoinTask<?> t) {
428 <        return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
428 >        return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
429 >    }
430 >
431 >    /**
432 >     * Sets sp in store-order.
433 >     */
434 >    private void storeSp(int s) {
435 >        UNSAFE.putOrderedInt(this, spOffset, s);
436      }
437  
438      // Main queue methods
439  
440      /**
441       * Pushes a task. Called only by current thread.
442 <     * @param t the task. Caller must ensure nonnull
442 >     *
443 >     * @param t the task. Caller must ensure non-null.
444       */
445      final void pushTask(ForkJoinTask<?> t) {
446          ForkJoinTask<?>[] q = queue;
447          int mask = q.length - 1;
448          int s = sp;
449 <        _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
450 <        _unsafe.putOrderedInt(this, spOffset, ++s);
449 >        setSlot(q, s & mask, t);
450 >        storeSp(++s);
451          if ((s -= base) == 1)
452 <            pool.signalNonEmptyWorkerQueue();
452 >            pool.signalWork();
453          else if (s >= mask)
454              growQueue();
455      }
# Line 316 | Line 457 | public class ForkJoinWorkerThread extend
457      /**
458       * Tries to take a task from the base of the queue, failing if
459       * either empty or contended.
460 <     * @return a task, or null if none or contended.
460 >     *
461 >     * @return a task, or null if none or contended
462       */
463 <    private ForkJoinTask<?> deqTask() {
322 <        ForkJoinTask<?>[] q;
463 >    final ForkJoinTask<?> deqTask() {
464          ForkJoinTask<?> t;
465 +        ForkJoinTask<?>[] q;
466          int i;
467          int b;
468          if (sp != (b = base) &&
469              (q = queue) != null && // must read q after b
470              (t = q[i = (q.length - 1) & b]) != null &&
471 <            _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
471 >            casSlotNull(q, i, t)) {
472              base = b + 1;
473              return t;
474          }
# Line 334 | Line 476 | public class ForkJoinWorkerThread extend
476      }
477  
478      /**
479 <     * Returns a popped task, or null if empty.  Called only by
480 <     * current thread.
479 >     * Tries to take a task from the base of own queue, activating if
480 >     * necessary, failing only if empty. Called only by current thread.
481 >     *
482 >     * @return a task, or null if none
483 >     */
484 >    final ForkJoinTask<?> locallyDeqTask() {
485 >        int b;
486 >        while (sp != (b = base)) {
487 >            if (tryActivate()) {
488 >                ForkJoinTask<?>[] q = queue;
489 >                int i = (q.length - 1) & b;
490 >                ForkJoinTask<?> t = q[i];
491 >                if (t != null && casSlotNull(q, i, t)) {
492 >                    base = b + 1;
493 >                    return t;
494 >                }
495 >            }
496 >        }
497 >        return null;
498 >    }
499 >
500 >    /**
501 >     * Returns a popped task, or null if empty. Ensures active status
502 >     * if non-null. Called only by current thread.
503       */
504      final ForkJoinTask<?> popTask() {
341        ForkJoinTask<?> t;
342        int i;
343        ForkJoinTask<?>[] q = queue;
344        int mask = q.length - 1;
505          int s = sp;
506 <        if (s != base &&
507 <            (t = q[i = (s - 1) & mask]) != null &&
508 <            _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
509 <            _unsafe.putOrderedInt(this, spOffset, s - 1);
510 <            return t;
506 >        while (s != base) {
507 >            if (tryActivate()) {
508 >                ForkJoinTask<?>[] q = queue;
509 >                int mask = q.length - 1;
510 >                int i = (s - 1) & mask;
511 >                ForkJoinTask<?> t = q[i];
512 >                if (t == null || !casSlotNull(q, i, t))
513 >                    break;
514 >                storeSp(s - 1);
515 >                return t;
516 >            }
517          }
518          return null;
519      }
# Line 355 | Line 521 | public class ForkJoinWorkerThread extend
521      /**
522       * Specialized version of popTask to pop only if
523       * topmost element is the given task. Called only
524 <     * by current thread.
525 <     * @param t the task. Caller must ensure nonnull
524 >     * by current thread while active.
525 >     *
526 >     * @param t the task. Caller must ensure non-null.
527       */
528      final boolean unpushTask(ForkJoinTask<?> t) {
529          ForkJoinTask<?>[] q = queue;
530          int mask = q.length - 1;
531          int s = sp - 1;
532 <        if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase,
533 <                                         t, null)) {
367 <            _unsafe.putOrderedInt(this, spOffset, s);
532 >        if (casSlotNull(q, s & mask, t)) {
533 >            storeSp(s);
534              return true;
535          }
536          return false;
537      }
538  
539      /**
540 <     * Returns next task to pop.
540 >     * Returns next task or null if empty or contended
541       */
542      final ForkJoinTask<?> peekTask() {
543          ForkJoinTask<?>[] q = queue;
544 <        return q == null? null : q[(sp - 1) & (q.length - 1)];
544 >        if (q == null)
545 >            return null;
546 >        int mask = q.length - 1;
547 >        int i = locallyFifo ? base : (sp - 1);
548 >        return q[i & mask];
549      }
550  
551      /**
# Line 402 | Line 572 | public class ForkJoinWorkerThread extend
572                  t = null;
573              setSlot(newQ, b & newMask, t);
574          } while (++b != bf);
575 <        pool.signalIdleWorkers(false);
406 <    }
407 <
408 <    // Runstate management
409 <
410 <    final boolean isShutdown()    { return runState >= SHUTDOWN;  }
411 <    final boolean isTerminating() { return runState >= TERMINATING;  }
412 <    final boolean isTerminated()  { return runState == TERMINATED; }
413 <    final boolean shutdown()      { return transitionRunStateTo(SHUTDOWN); }
414 <    final boolean shutdownNow()   { return transitionRunStateTo(TERMINATING); }
415 <
416 <    /**
417 <     * Transition to at least the given state. Return true if not
418 <     * already at least given state.
419 <     */
420 <    private boolean transitionRunStateTo(int state) {
421 <        for (;;) {
422 <            int s = runState;
423 <            if (s >= state)
424 <                return false;
425 <            if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
426 <                return true;
427 <        }
575 >        pool.signalWork();
576      }
577  
578      /**
579 <     * Ensure status is active and if necessary adjust pool active count
580 <     */
581 <    final void activate() {
582 <        if (!active) {
583 <            active = true;
584 <            pool.incrementActiveCount();
585 <        }
586 <    }
587 <
588 <    /**
589 <     * Ensure status is inactive and if necessary adjust pool active count
579 >     * Tries to steal a task from another worker. Starts at a random
580 >     * index of workers array, and probes workers until finding one
581 >     * with non-empty queue or finding that all are empty.  It
582 >     * randomly selects the first n probes. If these are empty, it
583 >     * resorts to a full circular traversal, which is necessary to
584 >     * accurately set active status by caller. Also restarts if pool
585 >     * events occurred since last scan, which forces refresh of
586 >     * workers array, in case barrier was associated with resize.
587 >     *
588 >     * This method must be both fast and quiet -- usually avoiding
589 >     * memory accesses that could disrupt cache sharing etc other than
590 >     * those needed to check for and take tasks. This accounts for,
591 >     * among other things, updating random seed in place without
592 >     * storing it until exit.
593 >     *
594 >     * @return a task, or null if none found
595       */
596 <    final void inactivate() {
597 <        if (active) {
598 <            active = false;
599 <            pool.decrementActiveCount();
600 <        }
596 >    private ForkJoinTask<?> scan() {
597 >        ForkJoinTask<?> t = null;
598 >        int r = seed;                    // extract once to keep scan quiet
599 >        ForkJoinWorkerThread[] ws;       // refreshed on outer loop
600 >        int mask;                        // must be power 2 minus 1 and > 0
601 >        outer:do {
602 >            if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
603 >                int idx = r;
604 >                int probes = ~mask;      // use random index while negative
605 >                for (;;) {
606 >                    r = xorShift(r);     // update random seed
607 >                    ForkJoinWorkerThread v = ws[mask & idx];
608 >                    if (v == null || v.sp == v.base) {
609 >                        if (probes <= mask)
610 >                            idx = (probes++ < 0) ? r : (idx + 1);
611 >                        else
612 >                            break;
613 >                    }
614 >                    else if (!tryActivate() || (t = v.deqTask()) == null)
615 >                        continue outer;  // restart on contention
616 >                    else
617 >                        break outer;
618 >                }
619 >            }
620 >        } while (pool.hasNewSyncEvent(this)); // retry on pool events
621 >        seed = r;
622 >        return t;
623      }
624  
450    // Lifecycle methods
451
625      /**
626 <     * Initializes internal state after construction but before
627 <     * processing any tasks. If you override this method, you must
628 <     * invoke super.onStart() at the beginning of the method.
456 <     * Initialization requires care: Most fields must have legal
457 <     * default values, to ensure that attempted accesses from other
458 <     * threads work correctly even before this thread starts
459 <     * processing tasks.
626 >     * Gets and removes a local or stolen task.
627 >     *
628 >     * @return a task, if available
629       */
630 <    protected void onStart() {
631 <        juRandomSeed = randomSeedGenerator.nextLong();
632 <        do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero
633 <        if (queue == null)
634 <            queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
466 <
467 <        // Heuristically allow one initial thread to warm up; others wait
468 <        if (poolIndex < pool.getParallelism() - 1) {
469 <            eventCount = pool.sync(this, 0);
470 <            activate();
471 <        }
630 >    final ForkJoinTask<?> pollTask() {
631 >        ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
632 >        if (t == null && (t = scan()) != null)
633 >            ++stealCount;
634 >        return t;
635      }
636  
637      /**
638 <     * Perform cleanup associated with termination of this worker
476 <     * thread.  If you override this method, you must invoke
477 <     * super.onTermination at the end of the overridden method.
638 >     * Gets a local task.
639       *
640 <     * @param exception the exception causing this thread to abort due
480 <     * to an unrecoverable error, or null if completed normally.
640 >     * @return a task, if available
641       */
642 <    protected void onTermination(Throwable exception) {
643 <        try {
484 <            clearLocalTasks();
485 <            inactivate();
486 <            cancelTasks();
487 <        } finally {
488 <            terminate(exception);
489 <        }
642 >    final ForkJoinTask<?> pollLocalTask() {
643 >        return locallyFifo ? locallyDeqTask() : popTask();
644      }
645  
646      /**
647 <     * Notify pool of termination and, if exception is nonnull,
648 <     * rethrow it to trigger this thread's uncaughtExceptionHandler
647 >     * Returns a pool submission, if one exists, activating first.
648 >     *
649 >     * @return a submission, if available
650       */
651 <    private void terminate(Throwable exception) {
652 <        transitionRunStateTo(TERMINATED);
653 <        try {
654 <            pool.workerTerminated(this);
655 <        } finally {
656 <            if (exception != null)
502 <                ForkJoinTask.rethrowException(exception);
651 >    private ForkJoinTask<?> pollSubmission() {
652 >        ForkJoinPool p = pool;
653 >        while (p.hasQueuedSubmissions()) {
654 >            ForkJoinTask<?> t;
655 >            if (tryActivate() && (t = p.pollSubmission()) != null)
656 >                return t;
657          }
658 +        return null;
659      }
660  
661 <    /**
507 <     * Run local tasks on exit from main.
508 <     */
509 <    private void clearLocalTasks() {
510 <        while (base != sp && !pool.isTerminating()) {
511 <            ForkJoinTask<?> t = popTask();
512 <            if (t != null) {
513 <                activate(); // ensure active status
514 <                t.quietlyExec();
515 <            }
516 <        }
517 <    }
661 >    // Methods accessed only by Pool
662  
663      /**
664       * Removes and cancels all tasks in queue.  Can be called from any
665       * thread.
666       */
667      final void cancelTasks() {
668 <        while (base != sp) {
669 <            ForkJoinTask<?> t = deqTask();
670 <            if (t != null)
527 <                t.cancelIgnoreExceptions();
528 <        }
529 <    }
530 <
531 <    /**
532 <     * This method is required to be public, but should never be
533 <     * called explicitly. It performs the main run loop to execute
534 <     * ForkJoinTasks.
535 <     */
536 <    public void run() {
537 <        Throwable exception = null;
538 <        try {
539 <            onStart();
540 <            while (!isShutdown())
541 <                step();
542 <        } catch (Throwable ex) {
543 <            exception = ex;
544 <        } finally {
545 <            onTermination(exception);
546 <        }
668 >        ForkJoinTask<?> t;
669 >        while (base != sp && (t = deqTask()) != null)
670 >            t.cancelIgnoringExceptions();
671      }
672  
673      /**
674 <     * Main top-level action.
674 >     * Drains tasks to given collection c.
675 >     *
676 >     * @return the number of tasks drained
677       */
678 <    private void step() {
679 <        ForkJoinTask<?> t = sp != base? popTask() : null;
680 <        if (t != null || (t = scan(null, true)) != null) {
681 <            activate();
682 <            t.quietlyExec();
683 <        }
558 <        else {
559 <            inactivate();
560 <            eventCount = pool.sync(this, eventCount);
678 >    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
679 >        int n = 0;
680 >        ForkJoinTask<?> t;
681 >        while (base != sp && (t = deqTask()) != null) {
682 >            c.add(t);
683 >            ++n;
684          }
685 +        return n;
686      }
687  
564    // scanning for and stealing tasks
565
688      /**
689 <     * Computes next value for random victim probe. Scans don't
690 <     * require a very high quality generator, but also not a crummy
569 <     * one. Marsaglia xor-shift is cheap and works well.
570 <     *
571 <     * This is currently unused, and manually inlined
689 >     * Gets and clears steal count for accumulation by pool.  Called
690 >     * only when known to be idle (in pool.sync and termination).
691       */
692 <    private static int xorShift(int r) {
693 <        r ^= r << 1;
694 <        r ^= r >>> 3;
695 <        r ^= r << 10;
577 <        return r;
692 >    final int getAndClearStealCount() {
693 >        int sc = stealCount;
694 >        stealCount = 0;
695 >        return sc;
696      }
697  
698      /**
699 <     * Tries to steal a task from another worker and/or, if enabled,
700 <     * submission queue. Starts at a random index of workers array,
583 <     * and probes workers until finding one with non-empty queue or
584 <     * finding that all are empty.  It randomly selects the first n-1
585 <     * probes. If these are empty, it resorts to full circular
586 <     * traversal, which is necessary to accurately set active status
587 <     * by caller. Also restarts if pool barrier has tripped since last
588 <     * scan, which forces refresh of workers array, in case barrier
589 <     * was associated with resize.
699 >     * Returns {@code true} if at least one worker in the given array
700 >     * appears to have at least one queued task.
701       *
702 <     * This method must be both fast and quiet -- usually avoiding
592 <     * memory accesses that could disrupt cache sharing etc other than
593 <     * those needed to check for and take tasks. This accounts for,
594 <     * among other things, updating random seed in place without
595 <     * storing it until exit. (Note that we only need to store it if
596 <     * we found a task; otherwise it doesn't matter if we start at the
597 <     * same place next time.)
598 <     *
599 <     * @param joinMe if non null; exit early if done
600 <     * @param checkSubmissions true if OK to take submissions
601 <     * @return a task, or null if none found
702 >     * @param ws array of workers
703       */
704 <    private ForkJoinTask<?> scan(ForkJoinTask<?> joinMe,
705 <                                 boolean checkSubmissions) {
706 <        ForkJoinPool p = pool;
707 <        if (p == null)                    // Never null, but avoids
708 <            return null;                  //   implicit nullchecks below
709 <        int r = randomVictimSeed;         // extract once to keep scan quiet
710 <        restart:                          // outer loop refreshes ws array
711 <        while (joinMe == null || joinMe.status >= 0) {
611 <            int mask;
612 <            ForkJoinWorkerThread[] ws = p.workers;
613 <            if (ws != null && (mask = ws.length - 1) > 0) {
614 <                int probes = -mask;       // use random index while negative
615 <                int idx = r;
616 <                for (;;) {
617 <                    ForkJoinWorkerThread v;
618 <                    // inlined xorshift to update seed
619 <                    r ^= r << 1;  r ^= r >>> 3; r ^= r << 10;
620 <                    if ((v = ws[mask & idx]) != null && v.sp != v.base) {
621 <                        ForkJoinTask<?> t;
622 <                        activate();
623 <                        if ((joinMe == null || joinMe.status >= 0) &&
624 <                            (t = v.deqTask()) != null) {
625 <                            randomVictimSeed = r;
626 <                            ++stealCount;
627 <                            return t;
628 <                        }
629 <                        continue restart; // restart on contention
630 <                    }
631 <                    if ((probes >> 1) <= mask) // n-1 random then circular
632 <                        idx = (probes++ < 0)? r : (idx + 1);
633 <                    else
634 <                        break;
704 >    static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
705 >        if (ws != null) {
706 >            int len = ws.length;
707 >            for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
708 >                for (int i = 0; i < len; ++i) {
709 >                    ForkJoinWorkerThread w = ws[i];
710 >                    if (w != null && w.sp != w.base)
711 >                        return true;
712                  }
713              }
637            if (checkSubmissions && p.hasQueuedSubmissions()) {
638                activate();
639                ForkJoinTask<?> t = p.pollSubmission();
640                if (t != null)
641                    return t;
642            }
643            else {
644                long ec = eventCount;     // restart on pool event
645                if ((eventCount = p.getEventCount()) == ec)
646                    break;
647            }
648        }
649        return null;
650    }
651
652    /**
653     * Callback from pool.sync to rescan before blocking.  If a
654     * task is found, it is pushed so it can be executed upon return.
655     * @return true if found and pushed a task
656     */
657    final boolean prescan() {
658        ForkJoinTask<?> t = scan(null, true);
659        if (t != null) {
660            pushTask(t);
661            return true;
662        }
663        else {
664            inactivate();
665            return false;
666        }
667    }
668
669    // Support for ForkJoinTask methods
670
671    /**
672     * Implements ForkJoinTask.helpJoin
673     */
674    final int helpJoinTask(ForkJoinTask<?> joinMe) {
675        ForkJoinTask<?> t = null;
676        int s;
677        while ((s = joinMe.status) >= 0) {
678            if (t == null) {
679                if ((t = scan(joinMe, false)) == null)  // block if no work
680                    return joinMe.awaitDone(this, false);
681                // else recheck status before exec
682            }
683            else {
684                t.quietlyExec();
685                t = null;
686            }
714          }
715 <        if (t != null) // unsteal
689 <            pushTask(t);
690 <        return s;
691 <    }
692 <
693 <    /**
694 <     * Pops or steals a task
695 <     * @return task, or null if none available
696 <     */
697 <    final ForkJoinTask<?> getLocalOrStolenTask() {
698 <        ForkJoinTask<?> t = popTask();
699 <        return t != null? t : scan(null, false);
715 >        return false;
716      }
717  
718 <    /**
703 <     * Runs tasks until pool isQuiescent
704 <     */
705 <    final void helpQuiescePool() {
706 <        for (;;) {
707 <            ForkJoinTask<?> t = getLocalOrStolenTask();
708 <            if (t != null) {
709 <                activate();
710 <                t.quietlyExec();
711 <            }
712 <            else {
713 <                inactivate();
714 <                if (pool.isQuiescent()) {
715 <                    activate(); // re-activate on exit
716 <                    break;
717 <                }
718 <            }
719 <        }
720 <    }
718 >    // Support methods for ForkJoinTask
719  
720      /**
721       * Returns an estimate of the number of tasks in the queue.
722       */
723      final int getQueueSize() {
724 <        int b = base;
725 <        int n = sp - b;
728 <        return n <= 0? 0 : n; // suppress momentarily negative values
724 >        // suppress momentarily negative values
725 >        return Math.max(0, sp - base);
726      }
727  
728      /**
# Line 733 | Line 730 | public class ForkJoinWorkerThread extend
730       * function of number of idle workers.
731       */
732      final int getEstimatedSurplusTaskCount() {
733 +        // The halving approximates weighting idle vs non-idle workers
734          return (sp - base) - (pool.getIdleThreadCount() >>> 1);
735      }
736  
739    // Per-worker exported random numbers
740
741    // Same constants as java.util.Random
742    final static long JURandomMultiplier = 0x5DEECE66DL;
743    final static long JURandomAddend = 0xBL;
744    final static long JURandomMask = (1L << 48) - 1;
745
746    private final int nextJURandom(int bits) {
747        long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) &
748            JURandomMask;
749        juRandomSeed = next;
750        return (int)(next >>> (48 - bits));
751    }
752
753    private final int nextJURandomInt(int n) {
754        if (n <= 0)
755            throw new IllegalArgumentException("n must be positive");
756        int bits = nextJURandom(31);
757        if ((n & -n) == n)
758            return (int)((n * (long)bits) >> 31);
759
760        for (;;) {
761            int val = bits % n;
762            if (bits - val + (n-1) >= 0)
763                return val;
764            bits = nextJURandom(31);
765        }
766    }
767
768    private final long nextJURandomLong() {
769        return ((long)(nextJURandom(32)) << 32) + nextJURandom(32);
770    }
771
772    private final long nextJURandomLong(long n) {
773        if (n <= 0)
774            throw new IllegalArgumentException("n must be positive");
775        long offset = 0;
776        while (n >= Integer.MAX_VALUE) { // randomly pick half range
777            int bits = nextJURandom(2); // 2nd bit for odd vs even split
778            long half = n >>> 1;
779            long nextn = ((bits & 2) == 0)? half : n - half;
780            if ((bits & 1) == 0)
781                offset += n - nextn;
782            n = nextn;
783        }
784        return offset + nextJURandomInt((int)n);
785    }
786
787    private final double nextJURandomDouble() {
788        return (((long)(nextJURandom(26)) << 27) + nextJURandom(27))
789            / (double)(1L << 53);
790    }
791
737      /**
738 <     * Returns a random integer using a per-worker random
794 <     * number generator with the same properties as
795 <     * {@link java.util.Random#nextInt}
796 <     * @return the next pseudorandom, uniformly distributed {@code int}
797 <     *         value from this worker's random number generator's sequence
738 >     * Scans, returning early if joinMe done.
739       */
740 <    public static int nextRandomInt() {
741 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
742 <            nextJURandom(32);
740 >    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
741 >        ForkJoinTask<?> t = pollTask();
742 >        if (t != null && joinMe.status < 0 && sp == base) {
743 >            pushTask(t); // unsteal if done and this task would be stealable
744 >            t = null;
745 >        }
746 >        return t;
747      }
748  
749      /**
750 <     * Returns a random integer using a per-worker random
806 <     * number generator with the same properties as
807 <     * {@link java.util.Random#nextInt(int)}
808 <     * @param n the bound on the random number to be returned.  Must be
809 <     *        positive.
810 <     * @return the next pseudorandom, uniformly distributed {@code int}
811 <     *         value between {@code 0} (inclusive) and {@code n} (exclusive)
812 <     *         from this worker's random number generator's sequence
813 <     * @throws IllegalArgumentException if n is not positive
750 >     * Runs tasks until {@code pool.isQuiescent()}.
751       */
752 <    public static int nextRandomInt(int n) {
753 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
754 <            nextJURandomInt(n);
752 >    final void helpQuiescePool() {
753 >        for (;;) {
754 >            ForkJoinTask<?> t = pollTask();
755 >            if (t != null)
756 >                t.quietlyExec();
757 >            else if (tryInactivate() && pool.isQuiescent())
758 >                break;
759 >        }
760 >        do {} while (!tryActivate()); // re-activate on exit
761      }
762  
763 <    /**
764 <     * Returns a random long using a per-worker random
765 <     * number generator with the same properties as
766 <     * {@link java.util.Random#nextLong}
767 <     * @return the next pseudorandom, uniformly distributed {@code long}
768 <     *         value from this worker's random number generator's sequence
769 <     */
770 <    public static long nextRandomLong() {
771 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
772 <            nextJURandomLong();
763 >    // Unsafe mechanics
764 >
765 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
766 >    private static final long spOffset =
767 >        objectFieldOffset("sp", ForkJoinWorkerThread.class);
768 >    private static final long runStateOffset =
769 >        objectFieldOffset("runState", ForkJoinWorkerThread.class);
770 >    private static final long qBase;
771 >    private static final int qShift;
772 >
773 >    static {
774 >        qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
775 >        int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
776 >        if ((s & (s-1)) != 0)
777 >            throw new Error("data type scale not a power of two");
778 >        qShift = 31 - Integer.numberOfLeadingZeros(s);
779      }
780  
781 <    /**
782 <     * Returns a random integer using a per-worker random
783 <     * number generator with the same properties as
784 <     * {@link java.util.Random#nextInt(int)}
785 <     * @param n the bound on the random number to be returned.  Must be
786 <     *        positive.
787 <     * @return the next pseudorandom, uniformly distributed {@code int}
788 <     *         value between {@code 0} (inclusive) and {@code n} (exclusive)
789 <     *         from this worker's random number generator's sequence
841 <     * @throws IllegalArgumentException if n is not positive
842 <     */
843 <    public static long nextRandomLong(long n) {
844 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
845 <            nextJURandomLong(n);
781 >    private static long objectFieldOffset(String field, Class<?> klazz) {
782 >        try {
783 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
784 >        } catch (NoSuchFieldException e) {
785 >            // Convert Exception to corresponding Error
786 >            NoSuchFieldError error = new NoSuchFieldError(field);
787 >            error.initCause(e);
788 >            throw error;
789 >        }
790      }
791  
792      /**
793 <     * Returns a random double using a per-worker random
794 <     * number generator with the same properties as
795 <     * {@link java.util.Random#nextDouble}
796 <     * @return the next pseudorandom, uniformly distributed {@code double}
797 <     *         value between {@code 0.0} and {@code 1.0} from this
854 <     *         worker's random number generator's sequence
793 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
794 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
795 >     * into a jdk.
796 >     *
797 >     * @return a sun.misc.Unsafe
798       */
799 <    public static double nextRandomDouble() {
857 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
858 <            nextJURandomDouble();
859 <    }
860 <
861 <    // Temporary Unsafe mechanics for preliminary release
862 <
863 <    static final Unsafe _unsafe;
864 <    static final long baseOffset;
865 <    static final long spOffset;
866 <    static final long qBase;
867 <    static final int qShift;
868 <    static final long runStateOffset;
869 <    static {
799 >    private static sun.misc.Unsafe getUnsafe() {
800          try {
801 <            if (ForkJoinWorkerThread.class.getClassLoader() != null) {
802 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
803 <                f.setAccessible(true);
804 <                _unsafe = (Unsafe)f.get(null);
801 >            return sun.misc.Unsafe.getUnsafe();
802 >        } catch (SecurityException se) {
803 >            try {
804 >                return java.security.AccessController.doPrivileged
805 >                    (new java.security
806 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
807 >                        public sun.misc.Unsafe run() throws Exception {
808 >                            java.lang.reflect.Field f = sun.misc
809 >                                .Unsafe.class.getDeclaredField("theUnsafe");
810 >                            f.setAccessible(true);
811 >                            return (sun.misc.Unsafe) f.get(null);
812 >                        }});
813 >            } catch (java.security.PrivilegedActionException e) {
814 >                throw new RuntimeException("Could not initialize intrinsics",
815 >                                           e.getCause());
816              }
876            else
877                _unsafe = Unsafe.getUnsafe();
878            baseOffset = _unsafe.objectFieldOffset
879                (ForkJoinWorkerThread.class.getDeclaredField("base"));
880            spOffset = _unsafe.objectFieldOffset
881                (ForkJoinWorkerThread.class.getDeclaredField("sp"));
882            runStateOffset = _unsafe.objectFieldOffset
883                (ForkJoinWorkerThread.class.getDeclaredField("runState"));
884            qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
885            int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
886            if ((s & (s-1)) != 0)
887                throw new Error("data type scale not a power of two");
888            qShift = 31 - Integer.numberOfLeadingZeros(s);
889        } catch (Exception e) {
890            throw new RuntimeException("Could not initialize intrinsics", e);
817          }
818      }
819   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines