ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.14 by jsr166, Wed Jul 22 20:55:22 2009 UTC vs.
Revision 1.21 by jsr166, Mon Jul 27 20:57:44 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 < import java.util.*;
8 >
9   import java.util.concurrent.*;
10 < import java.util.concurrent.atomic.*;
11 < import java.util.concurrent.locks.*;
12 < import sun.misc.Unsafe;
13 < import java.lang.reflect.*;
10 >
11 > import java.util.Collection;
12  
13   /**
14   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 139 | Line 137 | public class ForkJoinWorkerThread extend
137      private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
138  
139      /**
140 <     * The pool this thread works in. Accessed directly by ForkJoinTask
140 >     * The pool this thread works in. Accessed directly by ForkJoinTask.
141       */
142      final ForkJoinPool pool;
143  
# Line 167 | Line 165 | public class ForkJoinWorkerThread extend
165       * Activity status. When true, this worker is considered active.
166       * Must be false upon construction. It must be true when executing
167       * tasks, and BEFORE stealing a task. It must be false before
168 <     * calling pool.sync
168 >     * calling pool.sync.
169       */
170      private boolean active;
171  
# Line 190 | Line 188 | public class ForkJoinWorkerThread extend
188  
189      /**
190       * Index of this worker in pool array. Set once by pool before
191 <     * running, and accessed directly by pool during cleanup etc
191 >     * running, and accessed directly by pool during cleanup etc.
192       */
193      int poolIndex;
194  
# Line 267 | Line 265 | public class ForkJoinWorkerThread extend
265      final boolean shutdownNow()   { return transitionRunStateTo(TERMINATING); }
266  
267      /**
268 <     * Transitions to at least the given state.  Returns true if not
269 <     * already at least at given state.
268 >     * Transitions to at least the given state.
269 >     *
270 >     * @return {@code true} if not already at least at given state
271       */
272      private boolean transitionRunStateTo(int state) {
273          for (;;) {
# Line 293 | Line 292 | public class ForkJoinWorkerThread extend
292      }
293  
294      /**
295 <     * Tries to set status to active; fails on contention.
295 >     * Tries to set status to inactive; fails on contention.
296       */
297      private boolean tryInactivate() {
298          if (active) {
# Line 371 | Line 370 | public class ForkJoinWorkerThread extend
370      /**
371       * Performs cleanup associated with termination of this worker
372       * thread.  If you override this method, you must invoke
373 <     * super.onTermination at the end of the overridden method.
373 >     * {@code super.onTermination} at the end of the overridden method.
374       *
375       * @param exception the exception causing this thread to abort due
376 <     * to an unrecoverable error, or null if completed normally
376 >     * to an unrecoverable error, or {@code null} if completed normally
377       */
378      protected void onTermination(Throwable exception) {
379          // Execute remaining local tasks unless aborting or terminating
# Line 383 | Line 382 | public class ForkJoinWorkerThread extend
382                  ForkJoinTask<?> t = popTask();
383                  if (t != null)
384                      t.quietlyExec();
385 <            } catch(Throwable ex) {
385 >            } catch (Throwable ex) {
386                  exception = ex;
387              }
388          }
389          // Cancel other tasks, transition status, notify pool, and
390          // propagate exception to uncaught exception handler
391          try {
392 <            do;while (!tryInactivate()); // ensure inactive
392 >            do {} while (!tryInactivate()); // ensure inactive
393              cancelTasks();
394              runState = TERMINATED;
395              pool.workerTerminated(this);
# Line 517 | Line 516 | public class ForkJoinWorkerThread extend
516          if (q == null)
517              return null;
518          int mask = q.length - 1;
519 <        int i = locallyFifo? base : (sp - 1);
519 >        int i = locallyFifo ? base : (sp - 1);
520          return q[i & mask];
521      }
522  
# Line 580 | Line 579 | public class ForkJoinWorkerThread extend
579                      ForkJoinWorkerThread v = ws[mask & idx];
580                      if (v == null || v.sp == v.base) {
581                          if (probes <= mask)
582 <                            idx = (probes++ < 0)? r : (idx + 1);
582 >                            idx = (probes++ < 0) ? r : (idx + 1);
583                          else
584                              break;
585                      }
# Line 601 | Line 600 | public class ForkJoinWorkerThread extend
600       * @return a task, if available
601       */
602      final ForkJoinTask<?> pollTask() {
603 <        ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
603 >        ForkJoinTask<?> t = locallyFifo ? deqTask() : popTask();
604          if (t == null && (t = scan()) != null)
605              ++stealCount;
606          return t;
# Line 613 | Line 612 | public class ForkJoinWorkerThread extend
612       * @return a task, if available
613       */
614      final ForkJoinTask<?> pollLocalTask() {
615 <        return locallyFifo? deqTask() : popTask();
615 >        return locallyFifo ? deqTask() : popTask();
616      }
617  
618      /**
# Line 669 | Line 668 | public class ForkJoinWorkerThread extend
668      }
669  
670      /**
671 <     * Returns true if at least one worker in the given array appears
672 <     * to have at least one queued task.
671 >     * Returns {@code true} if at least one worker in the given array
672 >     * appears to have at least one queued task.
673 >     *
674       * @param ws array of workers
675       */
676      static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
# Line 693 | Line 693 | public class ForkJoinWorkerThread extend
693       * Returns an estimate of the number of tasks in the queue.
694       */
695      final int getQueueSize() {
696 <        int n = sp - base;
697 <        return n < 0? 0 : n; // suppress momentarily negative values
696 >        // suppress momentarily negative values
697 >        return Math.max(0, sp - base);
698      }
699  
700      /**
# Line 707 | Line 707 | public class ForkJoinWorkerThread extend
707      }
708  
709      /**
710 <     * Scans, returning early if joinMe done
710 >     * Scans, returning early if joinMe done.
711       */
712      final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
713          ForkJoinTask<?> t = pollTask();
# Line 719 | Line 719 | public class ForkJoinWorkerThread extend
719      }
720  
721      /**
722 <     * Runs tasks until pool isQuiescent.
722 >     * Runs tasks until {@code pool.isQuiescent()}.
723       */
724      final void helpQuiescePool() {
725          for (;;) {
# Line 729 | Line 729 | public class ForkJoinWorkerThread extend
729              else if (tryInactivate() && pool.isQuiescent())
730                  break;
731          }
732 <        do;while (!tryActivate()); // re-activate on exit
732 >        do {} while (!tryActivate()); // re-activate on exit
733 >    }
734 >
735 >    // Unsafe mechanics
736 >
737 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
738 >    private static final long spOffset =
739 >        objectFieldOffset("sp", ForkJoinWorkerThread.class);
740 >    private static final long runStateOffset =
741 >        objectFieldOffset("runState", ForkJoinWorkerThread.class);
742 >    private static final long qBase;
743 >    private static final int qShift;
744 >
745 >    static {
746 >        qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
747 >        int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
748 >        if ((s & (s-1)) != 0)
749 >            throw new Error("data type scale not a power of two");
750 >        qShift = 31 - Integer.numberOfLeadingZeros(s);
751 >    }
752 >
753 >    private static long objectFieldOffset(String field, Class<?> klazz) {
754 >        try {
755 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
756 >        } catch (NoSuchFieldException e) {
757 >            // Convert Exception to corresponding Error
758 >            NoSuchFieldError error = new NoSuchFieldError(field);
759 >            error.initCause(e);
760 >            throw error;
761 >        }
762      }
763  
764 <    // Temporary Unsafe mechanics for preliminary release
765 <    private static Unsafe getUnsafe() throws Throwable {
764 >    /**
765 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
766 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
767 >     * into a jdk.
768 >     *
769 >     * @return a sun.misc.Unsafe
770 >     */
771 >    private static sun.misc.Unsafe getUnsafe() {
772          try {
773 <            return Unsafe.getUnsafe();
773 >            return sun.misc.Unsafe.getUnsafe();
774          } catch (SecurityException se) {
775              try {
776                  return java.security.AccessController.doPrivileged
777 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
778 <                        public Unsafe run() throws Exception {
779 <                            return getUnsafePrivileged();
777 >                    (new java.security
778 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
779 >                        public sun.misc.Unsafe run() throws Exception {
780 >                            java.lang.reflect.Field f = sun.misc
781 >                                .Unsafe.class.getDeclaredField("theUnsafe");
782 >                            f.setAccessible(true);
783 >                            return (sun.misc.Unsafe) f.get(null);
784                          }});
785              } catch (java.security.PrivilegedActionException e) {
786 <                throw e.getCause();
786 >                throw new RuntimeException("Could not initialize intrinsics",
787 >                                           e.getCause());
788              }
789          }
790      }
751
752    private static Unsafe getUnsafePrivileged()
753            throws NoSuchFieldException, IllegalAccessException {
754        Field f = Unsafe.class.getDeclaredField("theUnsafe");
755        f.setAccessible(true);
756        return (Unsafe) f.get(null);
757    }
758
759    private static long fieldOffset(String fieldName)
760            throws NoSuchFieldException {
761        return UNSAFE.objectFieldOffset
762            (ForkJoinWorkerThread.class.getDeclaredField(fieldName));
763    }
764
765    static final Unsafe UNSAFE;
766    static final long baseOffset;
767    static final long spOffset;
768    static final long runStateOffset;
769    static final long qBase;
770    static final int qShift;
771    static {
772        try {
773            UNSAFE = getUnsafe();
774            baseOffset = fieldOffset("base");
775            spOffset = fieldOffset("sp");
776            runStateOffset = fieldOffset("runState");
777            qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
778            int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
779            if ((s & (s-1)) != 0)
780                throw new Error("data type scale not a power of two");
781            qShift = 31 - Integer.numberOfLeadingZeros(s);
782        } catch (Throwable e) {
783            throw new RuntimeException("Could not initialize intrinsics", e);
784        }
785    }
791   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines