ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.5 by dl, Mon Jan 12 17:16:18 2009 UTC vs.
Revision 1.7 by dl, Thu Jul 16 15:32:34 2009 UTC

# Line 17 | Line 17 | import java.lang.reflect.*;
17   * subclassable solely for the sake of adding functionality -- there
18   * are no overridable methods dealing with scheduling or
19   * execution. However, you can override initialization and termination
20 < * cleanup methods surrounding the main task processing loop.  If you
21 < * do create such a subclass, you will also need to supply a custom
20 > * methods surrounding the main task processing loop.  If you do
21 > * create such a subclass, you will also need to supply a custom
22   * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
23 < *
23 > *
24   */
25   public class ForkJoinWorkerThread extends Thread {
26      /*
# Line 199 | Line 199 | public class ForkJoinWorkerThread extend
199      long lastEventCount;
200  
201      /**
202 +     * True if use local fifo, not default lifo, for local polling
203 +     */
204 +    private boolean locallyFifo;
205 +
206 +    /**
207       * Creates a ForkJoinWorkerThread operating in the given pool.
208       * @param pool the pool this thread works in
209       * @throws NullPointerException if pool is null
# Line 210 | Line 215 | public class ForkJoinWorkerThread extend
215          // Remaining initialization is deferred to onStart
216      }
217  
218 <    // Public access methods
218 >    // Public access methods
219  
220      /**
221       * Returns the pool hosting this thread
# Line 232 | Line 237 | public class ForkJoinWorkerThread extend
237          return poolIndex;
238      }
239  
240 +    /**
241 +     * Establishes local first-in-first-out scheduling mode for forked
242 +     * tasks that are never joined.
243 +     * @param async if true, use locally FIFO scheduling
244 +     */
245 +    void setAsyncMode(boolean async) {
246 +        locallyFifo = async;
247 +    }
248  
249      // Runstate management
250  
# Line 323 | Line 336 | public class ForkJoinWorkerThread extend
336      private void mainLoop() {
337          while (!isShutdown()) {
338              ForkJoinTask<?> t = pollTask();
339 <            if (t != null || (t = pollSubmission()) != null)
339 >            if (t != null || (t = pollSubmission()) != null)
340                  t.quietlyExec();
341              else if (tryInactivate())
342                  pool.sync(this);
# Line 372 | Line 385 | public class ForkJoinWorkerThread extend
385          // propagate exception to uncaught exception handler
386          try {
387              do;while (!tryInactivate()); // ensure inactive
388 <            cancelTasks();        
388 >            cancelTasks();
389              runState = TERMINATED;
390              pool.workerTerminated(this);
391          } catch (Throwable ex) {        // Shouldn't ever happen
# Line 384 | Line 397 | public class ForkJoinWorkerThread extend
397          }
398      }
399  
400 <    // Intrinsics-based support for queue operations.  
400 >    // Intrinsics-based support for queue operations.
401  
402      /**
403       * Add in store-order the given task at given slot of q to
# Line 434 | Line 447 | public class ForkJoinWorkerThread extend
447       * either empty or contended.
448       * @return a task, or null if none or contended.
449       */
450 <    private ForkJoinTask<?> deqTask() {
450 >    final ForkJoinTask<?> deqTask() {
451          ForkJoinTask<?> t;
452          ForkJoinTask<?>[] q;
453          int i;
# Line 488 | Line 501 | public class ForkJoinWorkerThread extend
501      }
502  
503      /**
504 <     * Returns next task to pop.
504 >     * Returns next task.
505       */
506      final ForkJoinTask<?> peekTask() {
507          ForkJoinTask<?>[] q = queue;
508 <        return q == null? null : q[(sp - 1) & (q.length - 1)];
508 >        if (q == null)
509 >            return null;
510 >        int mask = q.length - 1;
511 >        int i = locallyFifo? base : (sp - 1);
512 >        return q[i & mask];
513      }
514  
515      /**
# Line 570 | Line 587 | public class ForkJoinWorkerThread extend
587      }
588  
589      /**
590 <     * Pops or steals a task
590 >     * gets and removes a local or stolen a task
591       * @return a task, if available
592       */
593      final ForkJoinTask<?> pollTask() {
594 <        ForkJoinTask<?> t = popTask();
594 >        ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
595          if (t == null && (t = scan()) != null)
596              ++stealCount;
597          return t;
598      }
599  
600      /**
601 +     * gets a local task
602 +     * @return a task, if available
603 +     */
604 +    final ForkJoinTask<?> pollLocalTask() {
605 +        return locallyFifo? deqTask() : popTask();
606 +    }
607 +
608 +    /**
609       * Returns a pool submission, if one exists, activating first.
610       * @return a submission, if available
611       */
# Line 607 | Line 632 | public class ForkJoinWorkerThread extend
632      }
633  
634      /**
635 +     * Drains tasks to given collection c
636 +     * @return the number of tasks drained
637 +     */
638 +    final int drainTasksTo(Collection<ForkJoinTask<?>> c) {
639 +        int n = 0;
640 +        ForkJoinTask<?> t;
641 +        while (base != sp && (t = deqTask()) != null) {
642 +            c.add(t);
643 +            ++n;
644 +        }
645 +        return n;
646 +    }
647 +
648 +    /**
649       * Get and clear steal count for accumulation by pool.  Called
650       * only when known to be idle (in pool.sync and termination).
651       */
# Line 665 | Line 704 | public class ForkJoinWorkerThread extend
704          }
705          return t;
706      }
707 <    
707 >
708      /**
709       * Runs tasks until pool isQuiescent
710       */
711      final void helpQuiescePool() {
712          for (;;) {
713              ForkJoinTask<?> t = pollTask();
714 <            if (t != null)
714 >            if (t != null)
715                  t.quietlyExec();
716              else if (tryInactivate() && pool.isQuiescent())
717                  break;
# Line 681 | Line 720 | public class ForkJoinWorkerThread extend
720      }
721  
722      // Temporary Unsafe mechanics for preliminary release
723 +    private static Unsafe getUnsafe() throws Throwable {
724 +        try {
725 +            return Unsafe.getUnsafe();
726 +        } catch (SecurityException se) {
727 +            try {
728 +                return java.security.AccessController.doPrivileged
729 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
730 +                        public Unsafe run() throws Exception {
731 +                            return getUnsafePrivileged();
732 +                        }});
733 +            } catch (java.security.PrivilegedActionException e) {
734 +                throw e.getCause();
735 +            }
736 +        }
737 +    }
738 +
739 +    private static Unsafe getUnsafePrivileged()
740 +            throws NoSuchFieldException, IllegalAccessException {
741 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
742 +        f.setAccessible(true);
743 +        return (Unsafe) f.get(null);
744 +    }
745 +
746 +    private static long fieldOffset(String fieldName)
747 +            throws NoSuchFieldException {
748 +        return _unsafe.objectFieldOffset
749 +            (ForkJoinWorkerThread.class.getDeclaredField(fieldName));
750 +    }
751  
752      static final Unsafe _unsafe;
753      static final long baseOffset;
# Line 690 | Line 757 | public class ForkJoinWorkerThread extend
757      static final int qShift;
758      static {
759          try {
760 <            if (ForkJoinWorkerThread.class.getClassLoader() != null) {
761 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
762 <                f.setAccessible(true);
763 <                _unsafe = (Unsafe)f.get(null);
697 <            }
698 <            else
699 <                _unsafe = Unsafe.getUnsafe();
700 <            baseOffset = _unsafe.objectFieldOffset
701 <                (ForkJoinWorkerThread.class.getDeclaredField("base"));
702 <            spOffset = _unsafe.objectFieldOffset
703 <                (ForkJoinWorkerThread.class.getDeclaredField("sp"));
704 <            runStateOffset = _unsafe.objectFieldOffset
705 <                (ForkJoinWorkerThread.class.getDeclaredField("runState"));
760 >            _unsafe = getUnsafe();
761 >            baseOffset = fieldOffset("base");
762 >            spOffset = fieldOffset("sp");
763 >            runStateOffset = fieldOffset("runState");
764              qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
765              int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
766              if ((s & (s-1)) != 0)
767                  throw new Error("data type scale not a power of two");
768              qShift = 31 - Integer.numberOfLeadingZeros(s);
769 <        } catch (Exception e) {
769 >        } catch (Throwable e) {
770              throw new RuntimeException("Could not initialize intrinsics", e);
771          }
772      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines