ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.11 by jsr166, Thu Mar 19 04:49:44 2009 UTC vs.
Revision 1.19 by jsr166, Fri Jul 24 23:47:01 2009 UTC

# Line 9 | Line 9 | package jsr166y;
9   import java.util.concurrent.*;
10   import java.util.concurrent.atomic.*;
11   import java.util.concurrent.locks.LockSupport;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
12  
13   /**
14   * A reusable synchronization barrier, similar in functionality to a
# Line 93 | Line 91 | import java.lang.reflect.*;
91   * idiom is for the method setting this up to first register, then
92   * start the actions, then deregister, as in:
93   *
94 < * <pre>
95 < *  void runTasks(List&lt;Runnable&gt; list) {
96 < *    final Phaser phaser = new Phaser(1); // "1" to register self
97 < *    for (Runnable r : list) {
98 < *      phaser.register();
99 < *      new Thread() {
100 < *        public void run() {
101 < *          phaser.arriveAndAwaitAdvance(); // await all creation
102 < *          r.run();
103 < *          phaser.arriveAndDeregister();   // signal completion
104 < *        }
105 < *      }.start();
94 > *  <pre> {@code
95 > * void runTasks(List<Runnable> list) {
96 > *   final Phaser phaser = new Phaser(1); // "1" to register self
97 > *   for (Runnable r : list) {
98 > *     phaser.register();
99 > *     new Thread() {
100 > *       public void run() {
101 > *         phaser.arriveAndAwaitAdvance(); // await all creation
102 > *         r.run();
103 > *         phaser.arriveAndDeregister();   // signal completion
104 > *       }
105 > *     }.start();
106   *   }
107   *
108   *   doSomethingOnBehalfOfWorkers();
# Line 113 | Line 111 | import java.lang.reflect.*;
111   *   p = phaser.awaitAdvance(p); // ... and await arrival
112   *   otherActions(); // do other things while tasks execute
113   *   phaser.awaitAdvance(p); // await final completion
114 < * }
117 < * </pre>
114 > * }}</pre>
115   *
116   * <p>One way to cause a set of threads to repeatedly perform actions
117   * for a given number of iterations is to override {@code onAdvance}:
118   *
119 < * <pre>
120 < *  void startTasks(List&lt;Runnable&gt; list, final int iterations) {
121 < *    final Phaser phaser = new Phaser() {
122 < *       public boolean onAdvance(int phase, int registeredParties) {
123 < *         return phase &gt;= iterations || registeredParties == 0;
119 > *  <pre> {@code
120 > * void startTasks(List<Runnable> list, final int iterations) {
121 > *   final Phaser phaser = new Phaser() {
122 > *     public boolean onAdvance(int phase, int registeredParties) {
123 > *       return phase >= iterations || registeredParties == 0;
124 > *     }
125 > *   };
126 > *   phaser.register();
127 > *   for (Runnable r : list) {
128 > *     phaser.register();
129 > *     new Thread() {
130 > *       public void run() {
131 > *         do {
132 > *           r.run();
133 > *           phaser.arriveAndAwaitAdvance();
134 > *         } while(!phaser.isTerminated();
135   *       }
136 < *    };
129 < *    phaser.register();
130 < *    for (Runnable r : list) {
131 < *      phaser.register();
132 < *      new Thread() {
133 < *        public void run() {
134 < *           do {
135 < *             r.run();
136 < *             phaser.arriveAndAwaitAdvance();
137 < *           } while(!phaser.isTerminated();
138 < *        }
139 < *      }.start();
136 > *     }.start();
137   *   }
138   *   phaser.arriveAndDeregister(); // deregister self, don't wait
139 < * }
143 < * </pre>
139 > * }}</pre>
140   *
141   * <p> To create a set of tasks using a tree of Phasers,
142   * you could use code of the following form, assuming a
143   * Task class with a constructor accepting a Phaser that
144   * it registers for upon construction:
145 < * <pre>
146 < *  void build(Task[] actions, int lo, int hi, Phaser b) {
147 < *    int step = (hi - lo) / TASKS_PER_PHASER;
148 < *    if (step &gt; 1) {
149 < *       int i = lo;
150 < *       while (i &lt; hi) {
151 < *         int r = Math.min(i + step, hi);
152 < *         build(actions, i, r, new Phaser(b));
153 < *         i = r;
154 < *       }
155 < *    }
156 < *    else {
157 < *      for (int i = lo; i &lt; hi; ++i)
158 < *        actions[i] = new Task(b);
159 < *        // assumes new Task(b) performs b.register()
160 < *    }
161 < *  }
162 < *  // .. initially called, for n tasks via
167 < *  build(new Task[n], 0, n, new Phaser());
168 < * </pre>
145 > *  <pre> {@code
146 > * void build(Task[] actions, int lo, int hi, Phaser b) {
147 > *   int step = (hi - lo) / TASKS_PER_PHASER;
148 > *   if (step > 1) {
149 > *     int i = lo;
150 > *     while (i < hi) {
151 > *       int r = Math.min(i + step, hi);
152 > *       build(actions, i, r, new Phaser(b));
153 > *       i = r;
154 > *     }
155 > *   } else {
156 > *     for (int i = lo; i < hi; ++i)
157 > *       actions[i] = new Task(b);
158 > *       // assumes new Task(b) performs b.register()
159 > *   }
160 > * }
161 > * // .. initially called, for n tasks via
162 > * build(new Task[n], 0, n, new Phaser());}</pre>
163   *
164   * The best value of {@code TASKS_PER_PHASER} depends mainly on
165   * expected barrier synchronization rates. A value as low as four may
# Line 179 | Line 173 | import java.lang.reflect.*;
173   * parties result in IllegalStateExceptions. However, you can and
174   * should create tiered phasers to accommodate arbitrarily large sets
175   * of participants.
176 + *
177 + * @since 1.7
178 + * @author Doug Lea
179   */
180   public class Phaser {
181      /*
# Line 212 | Line 209 | public class Phaser {
209      private static final int phaseMask  = 0x7fffffff;
210  
211      private static int unarrivedOf(long s) {
212 <        return (int)(s & ushortMask);
212 >        return (int) (s & ushortMask);
213      }
214  
215      private static int partiesOf(long s) {
216 <        return ((int)s) >>> 16;
216 >        return ((int) s) >>> 16;
217      }
218  
219      private static int phaseOf(long s) {
220 <        return (int)(s >>> 32);
220 >        return (int) (s >>> 32);
221      }
222  
223      private static int arrivedOf(long s) {
# Line 228 | Line 225 | public class Phaser {
225      }
226  
227      private static long stateFor(int phase, int parties, int unarrived) {
228 <        return ((((long)phase) << 32) | (((long)parties) << 16) |
229 <                (long)unarrived);
228 >        return ((((long) phase) << 32) | (((long) parties) << 16) |
229 >                (long) unarrived);
230      }
231  
232      private static long trippedStateFor(int phase, int parties) {
233 <        long lp = (long)parties;
234 <        return (((long)phase) << 32) | (lp << 16) | lp;
233 >        long lp = (long) parties;
234 >        return (((long) phase) << 32) | (lp << 16) | lp;
235      }
236  
237      /**
238 <     * Returns message string for bad bounds exceptions
238 >     * Returns message string for bad bounds exceptions.
239       */
240      private static String badBounds(int parties, int unarrived) {
241          return ("Attempt to set " + unarrived +
# Line 267 | Line 264 | public class Phaser {
264      private final AtomicReference<QNode> oddQ  = new AtomicReference<QNode>();
265  
266      private AtomicReference<QNode> queueFor(int phase) {
267 <        return (phase & 1) == 0? evenQ : oddQ;
267 >        return ((phase & 1) == 0) ? evenQ : oddQ;
268      }
269  
270      /**
# Line 275 | Line 272 | public class Phaser {
272       * root if necessary.
273       */
274      private long getReconciledState() {
275 <        return parent == null? state : reconcileState();
275 >        return (parent == null) ? state : reconcileState();
276      }
277  
278      /**
# Line 313 | Line 310 | public class Phaser {
310      /**
311       * Creates a new Phaser with the given numbers of registered
312       * unarrived parties, initial phase number 0, and no parent.
313 <     * @param parties the number of parties required to trip barrier.
313 >     *
314 >     * @param parties the number of parties required to trip barrier
315       * @throws IllegalArgumentException if parties less than zero
316 <     * or greater than the maximum number of parties supported.
316 >     * or greater than the maximum number of parties supported
317       */
318      public Phaser(int parties) {
319          this(null, parties);
# Line 326 | Line 324 | public class Phaser {
324       * initially registered parties. If parent is non-null this phaser
325       * is registered with the parent and its initial phase number is
326       * the same as that of parent phaser.
327 <     * @param parent the parent phaser.
327 >     *
328 >     * @param parent the parent phaser
329       */
330      public Phaser(Phaser parent) {
331          int phase = 0;
# Line 342 | Line 341 | public class Phaser {
341  
342      /**
343       * Creates a new Phaser with the given parent and numbers of
344 <     * registered unarrived parties. If parent is non-null this phaser
344 >     * registered unarrived parties. If parent is non-null, this phaser
345       * is registered with the parent and its initial phase number is
346       * the same as that of parent phaser.
347 <     * @param parent the parent phaser.
348 <     * @param parties the number of parties required to trip barrier.
347 >     *
348 >     * @param parent the parent phaser
349 >     * @param parties the number of parties required to trip barrier
350       * @throws IllegalArgumentException if parties less than zero
351 <     * or greater than the maximum number of parties supported.
351 >     * or greater than the maximum number of parties supported
352       */
353      public Phaser(Phaser parent, int parties) {
354          if (parties < 0 || parties > ushortMask)
# Line 366 | Line 366 | public class Phaser {
366  
367      /**
368       * Adds a new unarrived party to this phaser.
369 +     *
370       * @return the current barrier phase number upon registration
371       * @throws IllegalStateException if attempting to register more
372 <     * than the maximum supported number of parties.
372 >     * than the maximum supported number of parties
373       */
374      public int register() {
375          return doRegister(1);
# Line 376 | Line 377 | public class Phaser {
377  
378      /**
379       * Adds the given number of new unarrived parties to this phaser.
380 <     * @param parties the number of parties required to trip barrier.
380 >     *
381 >     * @param parties the number of parties required to trip barrier
382       * @return the current barrier phase number upon registration
383       * @throws IllegalStateException if attempting to register more
384 <     * than the maximum supported number of parties.
384 >     * than the maximum supported number of parties
385       */
386      public int bulkRegister(int parties) {
387          if (parties < 0)
# Line 399 | Line 401 | public class Phaser {
401              phase = phaseOf(s);
402              int unarrived = unarrivedOf(s) + registrations;
403              int parties = partiesOf(s) + registrations;
404 <            if (phase < 0)
404 >            if (phase < 0)
405                  break;
406              if (parties > ushortMask || unarrived > ushortMask)
407                  throw new IllegalStateException(badBounds(parties, unarrived));
# Line 415 | Line 417 | public class Phaser {
417       * in turn wait for others via {@link #awaitAdvance}).
418       *
419       * @return the barrier phase number upon entry to this method, or a
420 <     * negative value if terminated;
420 >     * negative value if terminated
421       * @throws IllegalStateException if not terminated and the number
422 <     * of unarrived parties would become negative.
422 >     * of unarrived parties would become negative
423       */
424      public int arrive() {
425          int phase;
# Line 437 | Line 439 | public class Phaser {
439                  if (par == null) {      // directly trip
440                      if (casState
441                          (s,
442 <                         trippedStateFor(onAdvance(phase, parties)? -1 :
442 >                         trippedStateFor(onAdvance(phase, parties) ? -1 :
443                                           ((phase + 1) & phaseMask), parties))) {
444                          releaseWaiters(phase);
445                          break;
# Line 467 | Line 469 | public class Phaser {
469       * zero parties, this phaser is also deregistered from its parent.
470       *
471       * @return the current barrier phase number upon entry to
472 <     * this method, or a negative value if terminated;
472 >     * this method, or a negative value if terminated
473       * @throws IllegalStateException if not terminated and the number
474 <     * of registered or unarrived parties would become negative.
474 >     * of registered or unarrived parties would become negative
475       */
476      public int arriveAndDeregister() {
477          // similar code to arrive, but too different to merge
# Line 498 | Line 500 | public class Phaser {
500                  if (unarrived == 0) {
501                      if (casState
502                          (s,
503 <                         trippedStateFor(onAdvance(phase, parties)? -1 :
503 >                         trippedStateFor(onAdvance(phase, parties) ? -1 :
504                                           ((phase + 1) & phaseMask), parties))) {
505                          releaseWaiters(phase);
506                          break;
# Line 520 | Line 522 | public class Phaser {
522       * to {@code awaitAdvance(arrive())}.  If you instead need to
523       * await with interruption of timeout, and/or deregister upon
524       * arrival, you can arrange them using analogous constructions.
525 +     *
526       * @return the phase on entry to this method
527       * @throws IllegalStateException if not terminated and the number
528 <     * of unarrived parties would become negative.
528 >     * of unarrived parties would become negative
529       */
530      public int arriveAndAwaitAdvance() {
531          return awaitAdvance(arrive());
# Line 532 | Line 535 | public class Phaser {
535       * Awaits the phase of the barrier to advance from the given
536       * value, or returns immediately if argument is negative or this
537       * barrier is terminated.
538 +     *
539       * @param phase the phase on entry to this method
540       * @return the phase on exit from this method
541       */
# Line 553 | Line 557 | public class Phaser {
557       * value, or returns immediately if argument is negative or this
558       * barrier is terminated, or throws InterruptedException if
559       * interrupted while waiting.
560 +     *
561       * @param phase the phase on entry to this method
562       * @return the phase on exit from this method
563       * @throws InterruptedException if thread interrupted while waiting
564       */
565 <    public int awaitAdvanceInterruptibly(int phase)
565 >    public int awaitAdvanceInterruptibly(int phase)
566          throws InterruptedException {
567          if (phase < 0)
568              return phase;
# Line 574 | Line 579 | public class Phaser {
579       * Awaits the phase of the barrier to advance from the given value
580       * or the given timeout elapses, or returns immediately if
581       * argument is negative or this barrier is terminated.
582 +     *
583       * @param phase the phase on entry to this method
584       * @return the phase on exit from this method
585       * @throws InterruptedException if thread interrupted while waiting
586       * @throws TimeoutException if timed out while waiting
587       */
588 <    public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
588 >    public int awaitAdvanceInterruptibly(int phase,
589 >                                         long timeout, TimeUnit unit)
590          throws InterruptedException, TimeoutException {
591          if (phase < 0)
592              return phase;
# Line 620 | Line 627 | public class Phaser {
627       * Returns the current phase number. The maximum phase number is
628       * {@code Integer.MAX_VALUE}, after which it restarts at
629       * zero. Upon termination, the phase number is negative.
630 +     *
631       * @return the phase number, or a negative value if terminated
632       */
633      public final int getPhase() {
# Line 628 | Line 636 | public class Phaser {
636  
637      /**
638       * Returns {@code true} if the current phase number equals the given phase.
639 +     *
640       * @param phase the phase
641       * @return {@code true} if the current phase number equals the given phase
642       */
# Line 637 | Line 646 | public class Phaser {
646  
647      /**
648       * Returns the number of parties registered at this barrier.
649 +     *
650       * @return the number of parties
651       */
652      public int getRegisteredParties() {
# Line 646 | Line 656 | public class Phaser {
656      /**
657       * Returns the number of parties that have arrived at the current
658       * phase of this barrier.
659 +     *
660       * @return the number of arrived parties
661       */
662      public int getArrivedParties() {
# Line 655 | Line 666 | public class Phaser {
666      /**
667       * Returns the number of registered parties that have not yet
668       * arrived at the current phase of this barrier.
669 +     *
670       * @return the number of unarrived parties
671       */
672      public int getUnarrivedParties() {
# Line 663 | Line 675 | public class Phaser {
675  
676      /**
677       * Returns the parent of this phaser, or null if none.
678 +     *
679       * @return the parent of this phaser, or null if none
680       */
681      public Phaser getParent() {
# Line 672 | Line 685 | public class Phaser {
685      /**
686       * Returns the root ancestor of this phaser, which is the same as
687       * this phaser if it has no parent.
688 +     *
689       * @return the root ancestor of this phaser
690       */
691      public Phaser getRoot() {
# Line 680 | Line 694 | public class Phaser {
694  
695      /**
696       * Returns {@code true} if this barrier has been terminated.
697 +     *
698       * @return {@code true} if this barrier has been terminated
699       */
700      public boolean isTerminated() {
# Line 795 | Line 810 | public class Phaser {
810                  try {
811                      ForkJoinPool.managedBlock(this, false);
812                  } catch (InterruptedException ie) {
813 <                }
813 >                }
814              }
815              return wasInterrupted;
816          }
# Line 803 | Line 818 | public class Phaser {
818      }
819  
820      /**
821 <     * Removes and signals waiting threads from wait queue
821 >     * Removes and signals waiting threads from wait queue.
822       */
823      private void releaseWaiters(int phase) {
824          AtomicReference<QNode> head = queueFor(phase);
# Line 815 | Line 830 | public class Phaser {
830      }
831  
832      /**
833 <     * Tries to enqueue given node in the appropriate wait queue
833 >     * Tries to enqueue given node in the appropriate wait queue.
834 >     *
835       * @return true if successful
836       */
837      private boolean tryEnqueue(QNode node) {
# Line 825 | Line 841 | public class Phaser {
841  
842      /**
843       * Enqueues node and waits unless aborted or signalled.
844 +     *
845       * @return current phase
846       */
847      private int untimedWait(int phase) {
# Line 912 | Line 929 | public class Phaser {
929          return p;
930      }
931  
932 <    // Temporary Unsafe mechanics for preliminary release
933 <    private static Unsafe getUnsafe() throws Throwable {
932 >    // Unsafe mechanics for jsr166y 3rd party package.
933 >    private static sun.misc.Unsafe getUnsafe() {
934          try {
935 <            return Unsafe.getUnsafe();
935 >            return sun.misc.Unsafe.getUnsafe();
936          } catch (SecurityException se) {
937              try {
938                  return java.security.AccessController.doPrivileged
939 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
940 <                        public Unsafe run() throws Exception {
941 <                            return getUnsafePrivileged();
939 >                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
940 >                        public sun.misc.Unsafe run() throws Exception {
941 >                            return getUnsafeByReflection();
942                          }});
943              } catch (java.security.PrivilegedActionException e) {
944 <                throw e.getCause();
944 >                throw new RuntimeException("Could not initialize intrinsics",
945 >                                           e.getCause());
946              }
947          }
948      }
949  
950 <    private static Unsafe getUnsafePrivileged()
950 >    private static sun.misc.Unsafe getUnsafeByReflection()
951              throws NoSuchFieldException, IllegalAccessException {
952 <        Field f = Unsafe.class.getDeclaredField("theUnsafe");
952 >        java.lang.reflect.Field f =
953 >            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
954          f.setAccessible(true);
955 <        return (Unsafe)f.get(null);
955 >        return (sun.misc.Unsafe) f.get(null);
956      }
957  
958 <    private static long fieldOffset(String fieldName)
940 <            throws NoSuchFieldException {
941 <        return _unsafe.objectFieldOffset
942 <            (Phaser.class.getDeclaredField(fieldName));
943 <    }
944 <
945 <    static final Unsafe _unsafe;
946 <    static final long stateOffset;
947 <
948 <    static {
958 >    private static long fieldOffset(String fieldName, Class<?> klazz) {
959          try {
960 <            _unsafe = getUnsafe();
961 <            stateOffset = fieldOffset("state");
962 <        } catch (Exception e) {
963 <            throw new RuntimeException("Could not initialize intrinsics", e);
960 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
961 >        } catch (NoSuchFieldException e) {
962 >            // Convert Exception to Error
963 >            NoSuchFieldError error = new NoSuchFieldError(fieldName);
964 >            error.initCause(e);
965 >            throw error;
966          }
967      }
968  
969 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
970 +    static final long stateOffset =
971 +        fieldOffset("state", Phaser.class);
972 +
973      final boolean casState(long cmp, long val) {
974 <        return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val);
974 >        return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
975      }
976   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines