ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.25 by jsr166, Sun Aug 2 17:02:06 2009 UTC vs.
Revision 1.35 by dl, Sun Aug 23 13:37:08 2009 UTC

# Line 19 | Line 19 | import java.util.concurrent.locks.LockSu
19   *
20   * <ul>
21   *
22 < * <li> The number of parties synchronizing on a phaser may vary over
23 < * time.  A task may register to be a party at any time, and may
24 < * deregister upon arriving at the barrier.  As is the case with most
25 < * basic synchronization constructs, registration and deregistration
26 < * affect only internal counts; they do not establish any further
27 < * internal bookkeeping, so tasks cannot query whether they are
22 > * <li> The number of parties <em>registered</em> to synchronize on a
23 > * phaser may vary over time.  Tasks may be registered at any time
24 > * (using methods {@link #register}, {@link #bulkRegister}, or forms
25 > * of constructors establishing initial numbers of parties), and may
26 > * optionally be deregistered upon any arrival (using {@link
27 > * #arriveAndDeregister}).  As is the case with most basic
28 > * synchronization constructs, registration and deregistration affect
29 > * only internal counts; they do not establish any further internal
30 > * bookkeeping, so tasks cannot query whether they are
31   * registered. (However, you can introduce such bookkeeping by
32   * subclassing this class.)
33   *
34   * <li> Each generation has an associated phase value, starting at
35 < * zero, and advancing when all parties reach the barrier (wrapping
36 < * around to zero after reaching {@code Integer.MAX_VALUE}).
35 > * zero, and advancing when all parties arrive at the barrier
36 > * (wrapping around to zero after reaching {@code Integer.MAX_VALUE}).
37   *
38 < * <li> Like a {@code CyclicBarrier}, a Phaser may be repeatedly
38 > * <li> Like a {@code CyclicBarrier}, a phaser may be repeatedly
39   * awaited.  Method {@link #arriveAndAwaitAdvance} has effect
40   * analogous to {@link java.util.concurrent.CyclicBarrier#await
41   * CyclicBarrier.await}.  However, phasers separate two aspects of
42 < * coordination, that may also be invoked independently:
42 > * coordination, which may also be invoked independently:
43   *
44   * <ul>
45   *
46   *   <li> Arriving at a barrier. Methods {@link #arrive} and
47   *       {@link #arriveAndDeregister} do not block, but return
48 < *       the phase value current upon entry to the method.
48 > *       an associated <em>arrival phase number</em>;
49 > *       that is, the phase number of the barrier to which the
50 > *       arrival applied.
51   *
52   *   <li> Awaiting others. Method {@link #awaitAdvance} requires an
53 < *       argument indicating the entry phase, and returns when the
54 < *       barrier advances to a new phase.
53 > *       argument indicating an arrival phase number, and returns
54 > *       when the barrier advances to a new phase.
55   * </ul>
56   *
57   *
58   * <li> Barrier actions, performed by the task triggering a phase
59 < * advance while others may be waiting, are arranged by overriding
60 < * method {@link #onAdvance}, that also controls termination.
61 < * Overriding this method may be used to similar but more flexible
62 < * effect as providing a barrier action to a {@code CyclicBarrier}.
59 > * advance, are arranged by overriding method {@link #onAdvance(int,
60 > * int)}, which also controls termination. Overriding this method is
61 > * similar to, but more flexible than, providing a barrier action to a
62 > * {@code CyclicBarrier}.
63   *
64   * <li> Phasers may enter a <em>termination</em> state in which all
65   * actions immediately return without updating phaser state or waiting
66   * for advance, and indicating (via a negative phase value) that
67 < * execution is complete.  Termination is triggered by executing the
68 < * overridable {@code onAdvance} method that is invoked each time the
69 < * barrier is about to be tripped. When a phaser is controlling an
70 < * action with a fixed number of iterations, it is often convenient to
71 < * override this method to cause termination when the current phase
72 < * number reaches a threshold. Method {@link #forceTermination} is also
73 < * available to abruptly release waiting threads and allow them to
69 < * terminate.
67 > * execution is complete.  Termination is triggered when an invocation
68 > * of {@code onAdvance} returns {@code true}.  When a phaser is
69 > * controlling an action with a fixed number of iterations, it is
70 > * often convenient to override this method to cause termination when
71 > * the current phase number reaches a threshold. Method {@link
72 > * #forceTermination} is also available to abruptly release waiting
73 > * threads and allow them to terminate.
74   *
75   * <li> Phasers may be tiered to reduce contention. Phasers with large
76   * numbers of parties that would otherwise experience heavy
# Line 82 | Line 86 | import java.util.concurrent.locks.LockSu
86   * within handlers of those exceptions, often after invoking
87   * {@code forceTermination}.
88   *
89 < * <li>Phasers ensure lack of starvation when used by ForkJoinTasks.
89 > * <li>Phasers may be used to coordinate tasks executing in a {@link
90 > * ForkJoinPool}, which will ensure sufficient parallelism to execute
91 > * tasks when others are blocked waiting for a phase to advance.
92   *
93   * </ul>
94   *
# Line 94 | Line 100 | import java.util.concurrent.locks.LockSu
100   * first register, then start the actions, then deregister, as in:
101   *
102   *  <pre> {@code
103 < * void runTasks(List<Runnable> list) {
103 > * void runTasks(List<Runnable> tasks) {
104   *   final Phaser phaser = new Phaser(1); // "1" to register self
105 < *   for (Runnable r : list) {
105 > *   // create and start threads
106 > *   for (Runnable task : tasks) {
107   *     phaser.register();
108   *     new Thread() {
109   *       public void run() {
110   *         phaser.arriveAndAwaitAdvance(); // await all creation
111 < *         r.run();
105 < *         phaser.arriveAndDeregister();   // signal completion
111 > *         task.run();
112   *       }
113   *     }.start();
114   *   }
115   *
116 < *   doSomethingOnBehalfOfWorkers();
117 < *   phaser.arrive(); // allow threads to start
112 < *   int p = phaser.arriveAndDeregister(); // deregister self  ...
113 < *   p = phaser.awaitAdvance(p); // ... and await arrival
114 < *   otherActions(); // do other things while tasks execute
115 < *   phaser.awaitAdvance(p); // await final completion
116 > *   // allow threads to start and deregister self
117 > *   phaser.arriveAndDeregister();
118   * }}</pre>
119   *
120   * <p>One way to cause a set of threads to repeatedly perform actions
121   * for a given number of iterations is to override {@code onAdvance}:
122   *
123   *  <pre> {@code
124 < * void startTasks(List<Runnable> list, final int iterations) {
124 > * void startTasks(List<Runnable> tasks, final int iterations) {
125   *   final Phaser phaser = new Phaser() {
126   *     public boolean onAdvance(int phase, int registeredParties) {
127   *       return phase >= iterations || registeredParties == 0;
128   *     }
129   *   };
130   *   phaser.register();
131 < *   for (Runnable r : list) {
131 > *   for (Runnable task : tasks) {
132   *     phaser.register();
133   *     new Thread() {
134   *       public void run() {
135   *         do {
136 < *           r.run();
136 > *           task.run();
137   *           phaser.arriveAndAwaitAdvance();
138   *         } while(!phaser.isTerminated();
139   *       }
# Line 172 | Line 174 | import java.util.concurrent.locks.LockSu
174   *
175   * <p><b>Implementation notes</b>: This implementation restricts the
176   * maximum number of parties to 65535. Attempts to register additional
177 < * parties result in IllegalStateExceptions. However, you can and
177 > * parties result in {@code IllegalStateException}. However, you can and
178   * should create tiered phasers to accommodate arbitrarily large sets
179   * of participants.
180   *
# Line 369 | Line 371 | public class Phaser {
371      /**
372       * Adds a new unarrived party to this phaser.
373       *
374 <     * @return the current barrier phase number upon registration
374 >     * @return the arrival phase number to which this registration applied
375       * @throws IllegalStateException if attempting to register more
376       * than the maximum supported number of parties
377       */
# Line 381 | Line 383 | public class Phaser {
383       * Adds the given number of new unarrived parties to this phaser.
384       *
385       * @param parties the number of parties required to trip barrier
386 <     * @return the current barrier phase number upon registration
386 >     * @return the arrival phase number to which this registration applied
387       * @throws IllegalStateException if attempting to register more
388       * than the maximum supported number of parties
389       */
# Line 418 | Line 420 | public class Phaser {
420       * Arrives at the barrier, but does not wait for others.  (You can
421       * in turn wait for others via {@link #awaitAdvance}).
422       *
423 <     * @return the barrier phase number upon entry to this method, or a
422 <     * negative value if terminated
423 >     * @return the arrival phase number, or a negative value if terminated
424       * @throws IllegalStateException if not terminated and the number
425       * of unarrived parties would become negative
426       */
# Line 464 | Line 465 | public class Phaser {
465      }
466  
467      /**
468 <     * Arrives at the barrier, and deregisters from it, without
469 <     * waiting for others. Deregistration reduces number of parties
468 >     * Arrives at the barrier and deregisters from it without waiting
469 >     * for others. Deregistration reduces the number of parties
470       * required to trip the barrier in future phases.  If this phaser
471       * has a parent, and deregistration causes this phaser to have
472 <     * zero parties, this phaser is also deregistered from its parent.
472 >     * zero parties, this phaser also arrives at and is deregistered
473 >     * from its parent.
474       *
475 <     * @return the current barrier phase number upon entry to
474 <     * this method, or a negative value if terminated
475 >     * @return the arrival phase number, or a negative value if terminated
476       * @throws IllegalStateException if not terminated and the number
477       * of registered or unarrived parties would become negative
478       */
# Line 521 | Line 522 | public class Phaser {
522  
523      /**
524       * Arrives at the barrier and awaits others. Equivalent in effect
525 <     * to {@code awaitAdvance(arrive())}.  If you instead need to
526 <     * await with interruption of timeout, and/or deregister upon
527 <     * arrival, you can arrange them using analogous constructions.
525 >     * to {@code awaitAdvance(arrive())}.  If you need to await with
526 >     * interruption or timeout, you can arrange this with an analogous
527 >     * construction using one of the other forms of the awaitAdvance
528 >     * method.  If instead you need to deregister upon arrival use
529 >     * {@code arriveAndDeregister}.
530       *
531 <     * @return the phase on entry to this method
531 >     * @return the arrival phase number, or a negative number if terminated
532       * @throws IllegalStateException if not terminated and the number
533       * of unarrived parties would become negative
534       */
# Line 534 | Line 537 | public class Phaser {
537      }
538  
539      /**
540 <     * Awaits the phase of the barrier to advance from the given
541 <     * value, or returns immediately if argument is negative or this
542 <     * barrier is terminated.
543 <     *
544 <     * @param phase the phase on entry to this method
545 <     * @return the phase on exit from this method
540 >     * Awaits the phase of the barrier to advance from the given phase
541 >     * value, returning immediately if the current phase of the
542 >     * barrier is not equal to the given phase value or this barrier
543 >     * is terminated.
544 >     *
545 >     * @param phase an arrival phase number, or negative value if
546 >     * terminated; this argument is normally the value returned by a
547 >     * previous call to {@code arrive} or its variants
548 >     * @return the next arrival phase number, or a negative value
549 >     * if terminated or argument is negative
550       */
551      public int awaitAdvance(int phase) {
552          if (phase < 0)
# Line 555 | Line 562 | public class Phaser {
562      }
563  
564      /**
565 <     * Awaits the phase of the barrier to advance from the given
566 <     * value, or returns immediately if argument is negative or this
567 <     * barrier is terminated, or throws InterruptedException if
568 <     * interrupted while waiting.
569 <     *
570 <     * @param phase the phase on entry to this method
571 <     * @return the phase on exit from this method
565 >     * Awaits the phase of the barrier to advance from the given phase
566 >     * value, throwing {@code InterruptedException} if interrupted while
567 >     * waiting, or returning immediately if the current phase of the
568 >     * barrier is not equal to the given phase value or this barrier
569 >     * is terminated.
570 >     *
571 >     * @param phase an arrival phase number, or negative value if
572 >     * terminated; this argument is normally the value returned by a
573 >     * previous call to {@code arrive} or its variants
574 >     * @return the next arrival phase number, or a negative value
575 >     * if terminated or argument is negative
576       * @throws InterruptedException if thread interrupted while waiting
577       */
578      public int awaitAdvanceInterruptibly(int phase)
# Line 578 | Line 589 | public class Phaser {
589      }
590  
591      /**
592 <     * Awaits the phase of the barrier to advance from the given value
593 <     * or the given timeout elapses, or returns immediately if
594 <     * argument is negative or this barrier is terminated.
595 <     *
596 <     * @param phase the phase on entry to this method
597 <     * @return the phase on exit from this method
592 >     * Awaits the phase of the barrier to advance from the given phase
593 >     * value or the given timeout to elapse, throwing
594 >     * {@code InterruptedException} if interrupted while waiting, or
595 >     * returning immediately if the current phase of the barrier is not
596 >     * equal to the given phase value or this barrier is terminated.
597 >     *
598 >     * @param phase an arrival phase number, or negative value if
599 >     * terminated; this argument is normally the value returned by a
600 >     * previous call to {@code arrive} or its variants
601 >     * @param timeout how long to wait before giving up, in units of
602 >     *        {@code unit}
603 >     * @param unit a {@code TimeUnit} determining how to interpret the
604 >     *        {@code timeout} parameter
605 >     * @return the next arrival phase number, or a negative value
606 >     * if terminated or argument is negative
607       * @throws InterruptedException if thread interrupted while waiting
608       * @throws TimeoutException if timed out while waiting
609       */
# Line 637 | Line 657 | public class Phaser {
657      }
658  
659      /**
640     * Returns {@code true} if the current phase number equals the given phase.
641     *
642     * @param phase the phase
643     * @return {@code true} if the current phase number equals the given phase
644     */
645    public final boolean hasPhase(int phase) {
646        return phaseOf(getReconciledState()) == phase;
647    }
648
649    /**
660       * Returns the number of parties registered at this barrier.
661       *
662       * @return the number of parties
# Line 721 | Line 731 | public class Phaser {
731       * effects visible to participating tasks, but it is in general
732       * only sensible to do so in designs where all parties register
733       * before any arrive, and all {@link #awaitAdvance} at each phase.
734 <     * Otherwise, you cannot ensure lack of interference. In
735 <     * particular, this method may be invoked more than once per
726 <     * transition if other parties successfully register while the
727 <     * invocation of this method is in progress, thus postponing the
728 <     * transition until those parties also arrive, re-triggering this
729 <     * method.
734 >     * Otherwise, you cannot ensure lack of interference from other
735 >     * parties during the invocation of this method.
736       *
737       * @param phase the phase number on entering the barrier
738       * @param registeredParties the current number of registered parties

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines