ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.10 by jsr166, Mon Jul 20 23:07:43 2009 UTC vs.
Revision 1.22 by dl, Wed Jul 29 12:05:55 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 < import java.util.*;
8 >
9   import java.util.concurrent.*;
10 < import java.util.concurrent.atomic.*;
11 < import java.util.concurrent.locks.*;
12 < import sun.misc.Unsafe;
13 < import java.lang.reflect.*;
10 >
11 > import java.util.Collection;
12  
13   /**
14   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 21 | Line 19 | import java.lang.reflect.*;
19   * create such a subclass, you will also need to supply a custom
20   * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
21   *
22 + * @since 1.7
23 + * @author Doug Lea
24   */
25   public class ForkJoinWorkerThread extends Thread {
26      /*
# Line 137 | Line 137 | public class ForkJoinWorkerThread extend
137      private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
138  
139      /**
140 <     * The pool this thread works in. Accessed directly by ForkJoinTask
140 >     * The pool this thread works in. Accessed directly by ForkJoinTask.
141       */
142      final ForkJoinPool pool;
143  
# Line 165 | Line 165 | public class ForkJoinWorkerThread extend
165       * Activity status. When true, this worker is considered active.
166       * Must be false upon construction. It must be true when executing
167       * tasks, and BEFORE stealing a task. It must be false before
168 <     * calling pool.sync
168 >     * calling pool.sync.
169       */
170      private boolean active;
171  
# Line 188 | Line 188 | public class ForkJoinWorkerThread extend
188  
189      /**
190       * Index of this worker in pool array. Set once by pool before
191 <     * running, and accessed directly by pool during cleanup etc
191 >     * running, and accessed directly by pool during cleanup etc.
192       */
193      int poolIndex;
194  
# Line 205 | Line 205 | public class ForkJoinWorkerThread extend
205  
206      /**
207       * Creates a ForkJoinWorkerThread operating in the given pool.
208 +     *
209       * @param pool the pool this thread works in
210       * @throws NullPointerException if pool is null
211       */
# Line 218 | Line 219 | public class ForkJoinWorkerThread extend
219      // Public access methods
220  
221      /**
222 <     * Returns the pool hosting this thread
222 >     * Returns the pool hosting this thread.
223 >     *
224       * @return the pool
225       */
226      public ForkJoinPool getPool() {
# Line 231 | Line 233 | public class ForkJoinWorkerThread extend
233       * threads (minus one) that have ever been created in the pool.
234       * This method may be useful for applications that track status or
235       * collect results per-worker rather than per-task.
236 <     * @return the index number.
236 >     *
237 >     * @return the index number
238       */
239      public int getPoolIndex() {
240          return poolIndex;
# Line 240 | Line 243 | public class ForkJoinWorkerThread extend
243      /**
244       * Establishes local first-in-first-out scheduling mode for forked
245       * tasks that are never joined.
246 +     *
247       * @param async if true, use locally FIFO scheduling
248       */
249      void setAsyncMode(boolean async) {
# Line 261 | Line 265 | public class ForkJoinWorkerThread extend
265      final boolean shutdownNow()   { return transitionRunStateTo(TERMINATING); }
266  
267      /**
268 <     * Transition to at least the given state. Return true if not
269 <     * already at least given state.
268 >     * Transitions to at least the given state.
269 >     *
270 >     * @return {@code true} if not already at least at given state
271       */
272      private boolean transitionRunStateTo(int state) {
273          for (;;) {
274              int s = runState;
275              if (s >= state)
276                  return false;
277 <            if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
277 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
278                  return true;
279          }
280      }
281  
282      /**
283 <     * Try to set status to active; fail on contention
283 >     * Tries to set status to active; fails on contention.
284       */
285      private boolean tryActivate() {
286          if (!active) {
# Line 287 | Line 292 | public class ForkJoinWorkerThread extend
292      }
293  
294      /**
295 <     * Try to set status to active; fail on contention
295 >     * Tries to set status to inactive; fails on contention.
296       */
297      private boolean tryInactivate() {
298          if (active) {
# Line 299 | Line 304 | public class ForkJoinWorkerThread extend
304      }
305  
306      /**
307 <     * Computes next value for random victim probe. Scans don't
307 >     * Computes next value for random victim probe.  Scans don't
308       * require a very high quality generator, but also not a crummy
309 <     * one. Marsaglia xor-shift is cheap and works well.
309 >     * one.  Marsaglia xor-shift is cheap and works well.
310       */
311      private static int xorShift(int r) {
312          r ^= r << 1;
# Line 331 | Line 336 | public class ForkJoinWorkerThread extend
336      }
337  
338      /**
339 <     * Execute tasks until shut down.
339 >     * Executes tasks until shut down.
340       */
341      private void mainLoop() {
342          while (!isShutdown()) {
# Line 363 | Line 368 | public class ForkJoinWorkerThread extend
368      }
369  
370      /**
371 <     * Perform cleanup associated with termination of this worker
371 >     * Performs cleanup associated with termination of this worker
372       * thread.  If you override this method, you must invoke
373 <     * super.onTermination at the end of the overridden method.
373 >     * {@code super.onTermination} at the end of the overridden method.
374       *
375       * @param exception the exception causing this thread to abort due
376 <     * to an unrecoverable error, or null if completed normally.
376 >     * to an unrecoverable error, or {@code null} if completed normally
377       */
378      protected void onTermination(Throwable exception) {
379          // Execute remaining local tasks unless aborting or terminating
# Line 377 | Line 382 | public class ForkJoinWorkerThread extend
382                  ForkJoinTask<?> t = popTask();
383                  if (t != null)
384                      t.quietlyExec();
385 <            } catch(Throwable ex) {
385 >            } catch (Throwable ex) {
386                  exception = ex;
387              }
388          }
389          // Cancel other tasks, transition status, notify pool, and
390          // propagate exception to uncaught exception handler
391          try {
392 <            do;while (!tryInactivate()); // ensure inactive
392 >            do {} while (!tryInactivate()); // ensure inactive
393              cancelTasks();
394              runState = TERMINATED;
395              pool.workerTerminated(this);
# Line 404 | Line 409 | public class ForkJoinWorkerThread extend
409       * Caller must ensure q is non-null and index is in range.
410       */
411      private static void setSlot(ForkJoinTask<?>[] q, int i,
412 <                                ForkJoinTask<?> t){
413 <        _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
412 >                                ForkJoinTask<?> t) {
413 >        UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
414      }
415  
416      /**
# Line 414 | Line 419 | public class ForkJoinWorkerThread extend
419       */
420      private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
421                                         ForkJoinTask<?> t) {
422 <        return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
422 >        return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
423      }
424  
425      /**
426       * Sets sp in store-order.
427       */
428      private void storeSp(int s) {
429 <        _unsafe.putOrderedInt(this, spOffset, s);
429 >        UNSAFE.putOrderedInt(this, spOffset, s);
430      }
431  
432      // Main queue methods
433  
434      /**
435       * Pushes a task. Called only by current thread.
436 +     *
437       * @param t the task. Caller must ensure non-null.
438       */
439      final void pushTask(ForkJoinTask<?> t) {
# Line 445 | Line 451 | public class ForkJoinWorkerThread extend
451      /**
452       * Tries to take a task from the base of the queue, failing if
453       * either empty or contended.
454 <     * @return a task, or null if none or contended.
454 >     *
455 >     * @return a task, or null if none or contended
456       */
457      final ForkJoinTask<?> deqTask() {
458          ForkJoinTask<?> t;
# Line 487 | Line 494 | public class ForkJoinWorkerThread extend
494       * Specialized version of popTask to pop only if
495       * topmost element is the given task. Called only
496       * by current thread while active.
497 <     * @param t the task. Caller must ensure non-null
497 >     *
498 >     * @param t the task. Caller must ensure non-null.
499       */
500      final boolean unpushTask(ForkJoinTask<?> t) {
501          ForkJoinTask<?>[] q = queue;
# Line 508 | Line 516 | public class ForkJoinWorkerThread extend
516          if (q == null)
517              return null;
518          int mask = q.length - 1;
519 <        int i = locallyFifo? base : (sp - 1);
519 >        int i = locallyFifo ? base : (sp - 1);
520          return q[i & mask];
521      }
522  
# Line 571 | Line 579 | public class ForkJoinWorkerThread extend
579                      ForkJoinWorkerThread v = ws[mask & idx];
580                      if (v == null || v.sp == v.base) {
581                          if (probes <= mask)
582 <                            idx = (probes++ < 0)? r : (idx + 1);
582 >                            idx = (probes++ < 0) ? r : (idx + 1);
583                          else
584                              break;
585                      }
# Line 587 | Line 595 | public class ForkJoinWorkerThread extend
595      }
596  
597      /**
598 <     * gets and removes a local or stolen a task
598 >     * Gets and removes a local or stolen task.
599 >     *
600       * @return a task, if available
601       */
602      final ForkJoinTask<?> pollTask() {
603 <        ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
603 >        ForkJoinTask<?> t = locallyFifo ? deqTask() : popTask();
604          if (t == null && (t = scan()) != null)
605              ++stealCount;
606          return t;
607      }
608  
609      /**
610 <     * gets a local task
610 >     * Gets a local task.
611 >     *
612       * @return a task, if available
613       */
614      final ForkJoinTask<?> pollLocalTask() {
615 <        return locallyFifo? deqTask() : popTask();
615 >        return locallyFifo ? deqTask() : popTask();
616      }
617  
618      /**
619       * Returns a pool submission, if one exists, activating first.
620 +     *
621       * @return a submission, if available
622       */
623      private ForkJoinTask<?> pollSubmission() {
# Line 632 | Line 643 | public class ForkJoinWorkerThread extend
643      }
644  
645      /**
646 <     * Drains tasks to given collection c
646 >     * Drains tasks to given collection c.
647 >     *
648       * @return the number of tasks drained
649       */
650 <    final int drainTasksTo(Collection<ForkJoinTask<?>> c) {
650 >    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
651          int n = 0;
652          ForkJoinTask<?> t;
653          while (base != sp && (t = deqTask()) != null) {
# Line 646 | Line 658 | public class ForkJoinWorkerThread extend
658      }
659  
660      /**
661 <     * Get and clear steal count for accumulation by pool.  Called
661 >     * Gets and clears steal count for accumulation by pool.  Called
662       * only when known to be idle (in pool.sync and termination).
663       */
664      final int getAndClearStealCount() {
# Line 656 | Line 668 | public class ForkJoinWorkerThread extend
668      }
669  
670      /**
671 <     * Returns true if at least one worker in the given array appears
672 <     * to have at least one queued task.
671 >     * Returns {@code true} if at least one worker in the given array
672 >     * appears to have at least one queued task.
673 >     *
674       * @param ws array of workers
675       */
676      static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
# Line 680 | Line 693 | public class ForkJoinWorkerThread extend
693       * Returns an estimate of the number of tasks in the queue.
694       */
695      final int getQueueSize() {
696 <        int n = sp - base;
697 <        return n < 0? 0 : n; // suppress momentarily negative values
696 >        // suppress momentarily negative values
697 >        return Math.max(0, sp - base);
698      }
699  
700      /**
# Line 694 | Line 707 | public class ForkJoinWorkerThread extend
707      }
708  
709      /**
710 <     * Scan, returning early if joinMe done
710 >     * Scans, returning early if joinMe done.
711       */
712      final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
713          ForkJoinTask<?> t = pollTask();
# Line 706 | Line 719 | public class ForkJoinWorkerThread extend
719      }
720  
721      /**
722 <     * Runs tasks until pool isQuiescent
722 >     * Runs tasks until {@code pool.isQuiescent()}.
723       */
724      final void helpQuiescePool() {
725          for (;;) {
# Line 716 | Line 729 | public class ForkJoinWorkerThread extend
729              else if (tryInactivate() && pool.isQuiescent())
730                  break;
731          }
732 <        do;while (!tryActivate()); // re-activate on exit
732 >        do {} while (!tryActivate()); // re-activate on exit
733      }
734  
735 <    // Temporary Unsafe mechanics for preliminary release
736 <    private static Unsafe getUnsafe() throws Throwable {
735 >    // Unsafe mechanics
736 >
737 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
738 >    private static final long spOffset =
739 >        objectFieldOffset("sp", ForkJoinWorkerThread.class);
740 >    private static final long runStateOffset =
741 >        objectFieldOffset("runState", ForkJoinWorkerThread.class);
742 >    private static final long qBase;
743 >    private static final int qShift;
744 >
745 >    static {
746 >        qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
747 >        int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
748 >        if ((s & (s-1)) != 0)
749 >            throw new Error("data type scale not a power of two");
750 >        qShift = 31 - Integer.numberOfLeadingZeros(s);
751 >    }
752 >
753 >    private static long objectFieldOffset(String field, Class<?> klazz) {
754 >        try {
755 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
756 >        } catch (NoSuchFieldException e) {
757 >            // Convert Exception to corresponding Error
758 >            NoSuchFieldError error = new NoSuchFieldError(field);
759 >            error.initCause(e);
760 >            throw error;
761 >        }
762 >    }
763 >
764 >    /**
765 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
766 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
767 >     * into a jdk.
768 >     *
769 >     * @return a sun.misc.Unsafe
770 >     */
771 >    private static sun.misc.Unsafe getUnsafe() {
772          try {
773 <            return Unsafe.getUnsafe();
773 >            return sun.misc.Unsafe.getUnsafe();
774          } catch (SecurityException se) {
775              try {
776                  return java.security.AccessController.doPrivileged
777 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
778 <                        public Unsafe run() throws Exception {
779 <                            return getUnsafePrivileged();
777 >                    (new java.security
778 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
779 >                        public sun.misc.Unsafe run() throws Exception {
780 >                            java.lang.reflect.Field f = sun.misc
781 >                                .Unsafe.class.getDeclaredField("theUnsafe");
782 >                            f.setAccessible(true);
783 >                            return (sun.misc.Unsafe) f.get(null);
784                          }});
785              } catch (java.security.PrivilegedActionException e) {
786 <                throw e.getCause();
786 >                throw new RuntimeException("Could not initialize intrinsics",
787 >                                           e.getCause());
788              }
789          }
790      }
738
739    private static Unsafe getUnsafePrivileged()
740            throws NoSuchFieldException, IllegalAccessException {
741        Field f = Unsafe.class.getDeclaredField("theUnsafe");
742        f.setAccessible(true);
743        return (Unsafe) f.get(null);
744    }
745
746    private static long fieldOffset(String fieldName)
747            throws NoSuchFieldException {
748        return _unsafe.objectFieldOffset
749            (ForkJoinWorkerThread.class.getDeclaredField(fieldName));
750    }
751
752    static final Unsafe _unsafe;
753    static final long baseOffset;
754    static final long spOffset;
755    static final long runStateOffset;
756    static final long qBase;
757    static final int qShift;
758    static {
759        try {
760            _unsafe = getUnsafe();
761            baseOffset = fieldOffset("base");
762            spOffset = fieldOffset("sp");
763            runStateOffset = fieldOffset("runState");
764            qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
765            int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
766            if ((s & (s-1)) != 0)
767                throw new Error("data type scale not a power of two");
768            qShift = 31 - Integer.numberOfLeadingZeros(s);
769        } catch (Throwable e) {
770            throw new RuntimeException("Could not initialize intrinsics", e);
771        }
772    }
791   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines