ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.5 by dl, Mon Jan 12 17:16:18 2009 UTC vs.
Revision 1.10 by jsr166, Mon Jul 20 23:07:43 2009 UTC

# Line 17 | Line 17 | import java.lang.reflect.*;
17   * subclassable solely for the sake of adding functionality -- there
18   * are no overridable methods dealing with scheduling or
19   * execution. However, you can override initialization and termination
20 < * cleanup methods surrounding the main task processing loop.  If you
21 < * do create such a subclass, you will also need to supply a custom
20 > * methods surrounding the main task processing loop.  If you do
21 > * create such a subclass, you will also need to supply a custom
22   * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
23 < *
23 > *
24   */
25   public class ForkJoinWorkerThread extends Thread {
26      /*
# Line 44 | Line 44 | public class ForkJoinWorkerThread extend
44       * of tasks. To accomplish this, we shift the CAS arbitrating pop
45       * vs deq (steal) from being on the indices ("base" and "sp") to
46       * the slots themselves (mainly via method "casSlotNull()"). So,
47 <     * both a successful pop and deq mainly entail CAS'ing a nonnull
47 >     * both a successful pop and deq mainly entail CAS'ing a non-null
48       * slot to null.  Because we rely on CASes of references, we do
49       * not need tag bits on base or sp.  They are simple ints as used
50       * in any circular array-based queue (see for example ArrayDeque).
# Line 56 | Line 56 | public class ForkJoinWorkerThread extend
56       * considered individually, is not wait-free. One thief cannot
57       * successfully continue until another in-progress one (or, if
58       * previously empty, a push) completes.  However, in the
59 <     * aggregate, we ensure at least probablistic non-blockingness. If
59 >     * aggregate, we ensure at least probabilistic non-blockingness. If
60       * an attempted steal fails, a thief always chooses a different
61       * random victim target to try next. So, in order for one thief to
62       * progress, it suffices for any in-progress deq or new push on
# Line 75 | Line 75 | public class ForkJoinWorkerThread extend
75       * push) require store order and CASes (in pop and deq) require
76       * (volatile) CAS semantics. Since these combinations aren't
77       * supported using ordinary volatiles, the only way to accomplish
78 <     * these effciently is to use direct Unsafe calls. (Using external
78 >     * these efficiently is to use direct Unsafe calls. (Using external
79       * AtomicIntegers and AtomicReferenceArrays for the indices and
80       * array is significantly slower because of memory locality and
81       * indirection effects.) Further, performance on most platforms is
# Line 199 | Line 199 | public class ForkJoinWorkerThread extend
199      long lastEventCount;
200  
201      /**
202 +     * True if use local fifo, not default lifo, for local polling
203 +     */
204 +    private boolean locallyFifo;
205 +
206 +    /**
207       * Creates a ForkJoinWorkerThread operating in the given pool.
208       * @param pool the pool this thread works in
209       * @throws NullPointerException if pool is null
# Line 210 | Line 215 | public class ForkJoinWorkerThread extend
215          // Remaining initialization is deferred to onStart
216      }
217  
218 <    // Public access methods
218 >    // Public access methods
219  
220      /**
221       * Returns the pool hosting this thread
# Line 232 | Line 237 | public class ForkJoinWorkerThread extend
237          return poolIndex;
238      }
239  
240 +    /**
241 +     * Establishes local first-in-first-out scheduling mode for forked
242 +     * tasks that are never joined.
243 +     * @param async if true, use locally FIFO scheduling
244 +     */
245 +    void setAsyncMode(boolean async) {
246 +        locallyFifo = async;
247 +    }
248  
249      // Runstate management
250  
# Line 323 | Line 336 | public class ForkJoinWorkerThread extend
336      private void mainLoop() {
337          while (!isShutdown()) {
338              ForkJoinTask<?> t = pollTask();
339 <            if (t != null || (t = pollSubmission()) != null)
339 >            if (t != null || (t = pollSubmission()) != null)
340                  t.quietlyExec();
341              else if (tryInactivate())
342                  pool.sync(this);
# Line 372 | Line 385 | public class ForkJoinWorkerThread extend
385          // propagate exception to uncaught exception handler
386          try {
387              do;while (!tryInactivate()); // ensure inactive
388 <            cancelTasks();        
388 >            cancelTasks();
389              runState = TERMINATED;
390              pool.workerTerminated(this);
391          } catch (Throwable ex) {        // Shouldn't ever happen
# Line 384 | Line 397 | public class ForkJoinWorkerThread extend
397          }
398      }
399  
400 <    // Intrinsics-based support for queue operations.  
400 >    // Intrinsics-based support for queue operations.
401  
402      /**
403 <     * Add in store-order the given task at given slot of q to
404 <     * null. Caller must ensure q is nonnull and index is in range.
403 >     * Adds in store-order the given task at given slot of q to null.
404 >     * Caller must ensure q is non-null and index is in range.
405       */
406      private static void setSlot(ForkJoinTask<?>[] q, int i,
407                                  ForkJoinTask<?> t){
# Line 396 | Line 409 | public class ForkJoinWorkerThread extend
409      }
410  
411      /**
412 <     * CAS given slot of q to null. Caller must ensure q is nonnull
412 >     * CAS given slot of q to null. Caller must ensure q is non-null
413       * and index is in range.
414       */
415      private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
# Line 415 | Line 428 | public class ForkJoinWorkerThread extend
428  
429      /**
430       * Pushes a task. Called only by current thread.
431 <     * @param t the task. Caller must ensure nonnull
431 >     * @param t the task. Caller must ensure non-null.
432       */
433      final void pushTask(ForkJoinTask<?> t) {
434          ForkJoinTask<?>[] q = queue;
# Line 434 | Line 447 | public class ForkJoinWorkerThread extend
447       * either empty or contended.
448       * @return a task, or null if none or contended.
449       */
450 <    private ForkJoinTask<?> deqTask() {
450 >    final ForkJoinTask<?> deqTask() {
451          ForkJoinTask<?> t;
452          ForkJoinTask<?>[] q;
453          int i;
# Line 451 | Line 464 | public class ForkJoinWorkerThread extend
464  
465      /**
466       * Returns a popped task, or null if empty. Ensures active status
467 <     * if nonnull. Called only by current thread.
467 >     * if non-null. Called only by current thread.
468       */
469      final ForkJoinTask<?> popTask() {
470          int s = sp;
# Line 474 | Line 487 | public class ForkJoinWorkerThread extend
487       * Specialized version of popTask to pop only if
488       * topmost element is the given task. Called only
489       * by current thread while active.
490 <     * @param t the task. Caller must ensure nonnull
490 >     * @param t the task. Caller must ensure non-null
491       */
492      final boolean unpushTask(ForkJoinTask<?> t) {
493          ForkJoinTask<?>[] q = queue;
# Line 488 | Line 501 | public class ForkJoinWorkerThread extend
501      }
502  
503      /**
504 <     * Returns next task to pop.
504 >     * Returns next task.
505       */
506      final ForkJoinTask<?> peekTask() {
507          ForkJoinTask<?>[] q = queue;
508 <        return q == null? null : q[(sp - 1) & (q.length - 1)];
508 >        if (q == null)
509 >            return null;
510 >        int mask = q.length - 1;
511 >        int i = locallyFifo? base : (sp - 1);
512 >        return q[i & mask];
513      }
514  
515      /**
# Line 570 | Line 587 | public class ForkJoinWorkerThread extend
587      }
588  
589      /**
590 <     * Pops or steals a task
590 >     * gets and removes a local or stolen a task
591       * @return a task, if available
592       */
593      final ForkJoinTask<?> pollTask() {
594 <        ForkJoinTask<?> t = popTask();
594 >        ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
595          if (t == null && (t = scan()) != null)
596              ++stealCount;
597          return t;
598      }
599  
600      /**
601 +     * gets a local task
602 +     * @return a task, if available
603 +     */
604 +    final ForkJoinTask<?> pollLocalTask() {
605 +        return locallyFifo? deqTask() : popTask();
606 +    }
607 +
608 +    /**
609       * Returns a pool submission, if one exists, activating first.
610       * @return a submission, if available
611       */
# Line 607 | Line 632 | public class ForkJoinWorkerThread extend
632      }
633  
634      /**
635 +     * Drains tasks to given collection c
636 +     * @return the number of tasks drained
637 +     */
638 +    final int drainTasksTo(Collection<ForkJoinTask<?>> c) {
639 +        int n = 0;
640 +        ForkJoinTask<?> t;
641 +        while (base != sp && (t = deqTask()) != null) {
642 +            c.add(t);
643 +            ++n;
644 +        }
645 +        return n;
646 +    }
647 +
648 +    /**
649       * Get and clear steal count for accumulation by pool.  Called
650       * only when known to be idle (in pool.sync and termination).
651       */
# Line 665 | Line 704 | public class ForkJoinWorkerThread extend
704          }
705          return t;
706      }
707 <    
707 >
708      /**
709       * Runs tasks until pool isQuiescent
710       */
711      final void helpQuiescePool() {
712          for (;;) {
713              ForkJoinTask<?> t = pollTask();
714 <            if (t != null)
714 >            if (t != null)
715                  t.quietlyExec();
716              else if (tryInactivate() && pool.isQuiescent())
717                  break;
# Line 681 | Line 720 | public class ForkJoinWorkerThread extend
720      }
721  
722      // Temporary Unsafe mechanics for preliminary release
723 +    private static Unsafe getUnsafe() throws Throwable {
724 +        try {
725 +            return Unsafe.getUnsafe();
726 +        } catch (SecurityException se) {
727 +            try {
728 +                return java.security.AccessController.doPrivileged
729 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
730 +                        public Unsafe run() throws Exception {
731 +                            return getUnsafePrivileged();
732 +                        }});
733 +            } catch (java.security.PrivilegedActionException e) {
734 +                throw e.getCause();
735 +            }
736 +        }
737 +    }
738 +
739 +    private static Unsafe getUnsafePrivileged()
740 +            throws NoSuchFieldException, IllegalAccessException {
741 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
742 +        f.setAccessible(true);
743 +        return (Unsafe) f.get(null);
744 +    }
745 +
746 +    private static long fieldOffset(String fieldName)
747 +            throws NoSuchFieldException {
748 +        return _unsafe.objectFieldOffset
749 +            (ForkJoinWorkerThread.class.getDeclaredField(fieldName));
750 +    }
751  
752      static final Unsafe _unsafe;
753      static final long baseOffset;
# Line 690 | Line 757 | public class ForkJoinWorkerThread extend
757      static final int qShift;
758      static {
759          try {
760 <            if (ForkJoinWorkerThread.class.getClassLoader() != null) {
761 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
762 <                f.setAccessible(true);
763 <                _unsafe = (Unsafe)f.get(null);
697 <            }
698 <            else
699 <                _unsafe = Unsafe.getUnsafe();
700 <            baseOffset = _unsafe.objectFieldOffset
701 <                (ForkJoinWorkerThread.class.getDeclaredField("base"));
702 <            spOffset = _unsafe.objectFieldOffset
703 <                (ForkJoinWorkerThread.class.getDeclaredField("sp"));
704 <            runStateOffset = _unsafe.objectFieldOffset
705 <                (ForkJoinWorkerThread.class.getDeclaredField("runState"));
760 >            _unsafe = getUnsafe();
761 >            baseOffset = fieldOffset("base");
762 >            spOffset = fieldOffset("sp");
763 >            runStateOffset = fieldOffset("runState");
764              qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
765              int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
766              if ((s & (s-1)) != 0)
767                  throw new Error("data type scale not a power of two");
768              qShift = 31 - Integer.numberOfLeadingZeros(s);
769 <        } catch (Exception e) {
769 >        } catch (Throwable e) {
770              throw new RuntimeException("Could not initialize intrinsics", e);
771          }
772      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines