jsr166y
Class Phaser

java.lang.Object
  extended by jsr166y.Phaser

public class Phaser
extends Object

A reusable synchronization barrier, similar in functionality to a CyclicBarrier and CountDownLatch but supporting more flexible usage.

Sample usages:

A Phaser may be used instead of a CountdownLatch to control a one-shot action serving a variable number of parties. The typical idiom is for the method setting this up to first register, then start the actions, then deregister, as in:

  void runTasks(List<Runnable> list) {
    final Phaser phaser = new Phaser(1); // "1" to register self
    for (Runnable r : list) {
      phaser.register();
      new Thread() {
        public void run() {
          phaser.arriveAndAwaitAdvance(); // await all creation
          r.run();
          phaser.arriveAndDeregister();   // signal completion
        }
      }.start();
   }
   phaser.arrive(); // allow threads to start
   int p = phaser.arriveAndDeregister(); // deregister self
   otherActions(); // do other things while tasks execute
   phaser.awaitAdvance(p); // wait for all tasks to arrive
 }
 

One way to cause a set of threads to repeatedly perform actions for a given number of iterations is to override onAdvance:

  void startTasks(List<Runnable> list, final int iterations) {
    final Phaser phaser = new Phaser() {
       public boolean onAdvance(int phase, int registeredParties) {
         return phase >= iterations || registeredParties == 0;
       }
    };
    phaser.register();
    for (Runnable r : list) {
      phaser.register();
      new Thread() {
        public void run() {
           do {
             r.run();
             phaser.arriveAndAwaitAdvance();
           } while(!phaser.isTerminated();
        }
      }.start();
   }
   phaser.arriveAndDeregister(); // deregister self, don't wait
 }
 

To create a set of tasks using a tree of Phasers, you could use code of the following form, assuming a Task class with a constructor accepting a Phaser that it registers for upon construction:

  void build(Task[] actions, int lo, int hi, Phaser b) {
    int step = (hi - lo) / TASKS_PER_PHASER;
    if (step > 1) {
       int i = lo;
       while (i < hi) {
         int r = Math.min(i + step, hi);
         build(actions, i, r, new Phaser(b));
         i = r;
       }
    }
    else {
      for (int i = lo; i < hi; ++i)
        actions[i] = new Task(b);
        // assumes new Task(b) performs b.register()
    }
  }
  // .. initially called, for n tasks via
  build(new Task[n], 0, n, new Phaser());
 
The best value of TASKS_PER_PHASER depends mainly on expected barrier synchronization rates. A value as low as four may be appropriate for extremely small per-barrier task bodies (thus high rates), or up to hundreds for extremely large ones.

Implementation notes: This implementation restricts the maximum number of parties to 65535. Attempts to register additional parties result in IllegalStateExceptions. However, you can and should create tiered phasers to accommodate arbitrarily large sets of participants.


Constructor Summary
Phaser()
          Creates a new Phaser without any initially registered parties, initial phase number 0, and no parent.
Phaser(int parties)
          Creates a new Phaser with the given numbers of registered unarrived parties, initial phase number 0, and no parent.
Phaser(Phaser parent)
          Creates a new Phaser with the given parent, without any initially registered parties.
Phaser(Phaser parent, int parties)
          Creates a new Phaser with the given parent and numbers of registered unarrived parties.
 
Method Summary
 int arrive()
          Arrives at the barrier, but does not wait for others.
 int arriveAndAwaitAdvance()
          Arrives at the barrier and awaits others.
 int arriveAndDeregister()
          Arrives at the barrier, and deregisters from it, without waiting for others.
 int awaitAdvance(int phase)
          Awaits the phase of the barrier to advance from the given value, or returns immediately if argument is negative or this barrier is terminated.
 int awaitAdvanceInterruptibly(int phase)
          Awaits the phase of the barrier to advance from the given value, or returns immediately if argumet is negative or this barrier is terminated, or throws InterruptedException if interrupted while waiting.
 int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
          Awaits the phase of the barrier to advance from the given value or the given timeout elapses, or returns immediately if argument is negative or this barrier is terminated.
 int bulkRegister(int parties)
          Adds the given number of new unarrived parties to this phaser.
 void forceTermination()
          Forces this barrier to enter termination state.
 int getArrivedParties()
          Returns the number of parties that have arrived at the current phase of this barrier.
 Phaser getParent()
          Returns the parent of this phaser, or null if none.
 int getPhase()
          Returns the current phase number.
 int getRegisteredParties()
          Returns the number of parties registered at this barrier.
 Phaser getRoot()
          Returns the root ancestor of this phaser, which is the same as this phaser if it has no parent.
 int getUnarrivedParties()
          Returns the number of registered parties that have not yet arrived at the current phase of this barrier.
 boolean hasPhase(int phase)
          Returns true if the current phase number equals the given phase.
 boolean isTerminated()
          Returns true if this barrier has been terminated.
protected  boolean onAdvance(int phase, int registeredParties)
          Overridable method to perform an action upon phase advance, and to control termination.
 int register()
          Adds a new unarrived party to this phaser.
 String toString()
          Returns a string identifying this phaser, as well as its state.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

Phaser

public Phaser()
Creates a new Phaser without any initially registered parties, initial phase number 0, and no parent.


Phaser

public Phaser(int parties)
Creates a new Phaser with the given numbers of registered unarrived parties, initial phase number 0, and no parent.

Parameters:
parties - the number of parties required to trip barrier.
Throws:
IllegalArgumentException - if parties less than zero or greater than the maximum number of parties supported.

Phaser

public Phaser(Phaser parent)
Creates a new Phaser with the given parent, without any initially registered parties. If parent is non-null this phaser is registered with the parent and its initial phase number is the same as that of parent phaser.

Parameters:
parent - the parent phaser.

Phaser

public Phaser(Phaser parent,
              int parties)
Creates a new Phaser with the given parent and numbers of registered unarrived parties. If parent is non-null this phaser is registered with the parent and its initial phase number is the same as that of parent phaser.

Parameters:
parent - the parent phaser.
parties - the number of parties required to trip barrier.
Throws:
IllegalArgumentException - if parties less than zero or greater than the maximum number of parties supported.
Method Detail

register

public int register()
Adds a new unarrived party to this phaser.

Returns:
the current barrier phase number upon registration
Throws:
IllegalStateException - if attempting to register more than the maximum supported number of parties.

bulkRegister

public int bulkRegister(int parties)
Adds the given number of new unarrived parties to this phaser.

Parameters:
parties - the number of parties required to trip barrier.
Returns:
the current barrier phase number upon registration
Throws:
IllegalStateException - if attempting to register more than the maximum supported number of parties.

arrive

public int arrive()
Arrives at the barrier, but does not wait for others. (You can in turn wait for others via awaitAdvance(int)).

Returns:
the barrier phase number upon entry to this method, or a negative value if terminated;
Throws:
IllegalStateException - if not terminated and the number of unarrived parties would become negative.

arriveAndDeregister

public int arriveAndDeregister()
Arrives at the barrier, and deregisters from it, without waiting for others. Deregistration reduces number of parties required to trip the barrier in future phases. If this phaser has a parent, and deregistration causes this phaser to have zero parties, this phaser is also deregistered from its parent.

Returns:
the current barrier phase number upon entry to this method, or a negative value if terminated;
Throws:
IllegalStateException - if not terminated and the number of registered or unarrived parties would become negative.

arriveAndAwaitAdvance

public int arriveAndAwaitAdvance()
Arrives at the barrier and awaits others. Equivalent in effect to awaitAdvance(arrive()). If you instead need to await with interruption of timeout, and/or deregister upon arrival, you can arrange them using analogous constructions.

Returns:
the phase on entry to this method
Throws:
IllegalStateException - if not terminated and the number of unarrived parties would become negative.

awaitAdvance

public int awaitAdvance(int phase)
Awaits the phase of the barrier to advance from the given value, or returns immediately if argument is negative or this barrier is terminated.

Parameters:
phase - the phase on entry to this method
Returns:
the phase on exit from this method

awaitAdvanceInterruptibly

public int awaitAdvanceInterruptibly(int phase)
                              throws InterruptedException
Awaits the phase of the barrier to advance from the given value, or returns immediately if argumet is negative or this barrier is terminated, or throws InterruptedException if interrupted while waiting.

Parameters:
phase - the phase on entry to this method
Returns:
the phase on exit from this method
Throws:
InterruptedException - if thread interrupted while waiting

awaitAdvanceInterruptibly

public int awaitAdvanceInterruptibly(int phase,
                                     long timeout,
                                     TimeUnit unit)
                              throws InterruptedException,
                                     TimeoutException
Awaits the phase of the barrier to advance from the given value or the given timeout elapses, or returns immediately if argument is negative or this barrier is terminated.

Parameters:
phase - the phase on entry to this method
Returns:
the phase on exit from this method
Throws:
InterruptedException - if thread interrupted while waiting
TimeoutException - if timed out while waiting

forceTermination

public void forceTermination()
Forces this barrier to enter termination state. Counts of arrived and registered parties are unaffected. If this phaser has a parent, it too is terminated. This method may be useful for coordinating recovery after one or more tasks encounter unexpected exceptions.


getPhase

public final int getPhase()
Returns the current phase number. The maximum phase number is Integer.MAX_VALUE, after which it restarts at zero. Upon termination, the phase number is negative.

Returns:
the phase number, or a negative value if terminated

hasPhase

public final boolean hasPhase(int phase)
Returns true if the current phase number equals the given phase.

Parameters:
phase - the phase
Returns:
true if the current phase number equals the given phase.

getRegisteredParties

public int getRegisteredParties()
Returns the number of parties registered at this barrier.

Returns:
the number of parties

getArrivedParties

public int getArrivedParties()
Returns the number of parties that have arrived at the current phase of this barrier.

Returns:
the number of arrived parties

getUnarrivedParties

public int getUnarrivedParties()
Returns the number of registered parties that have not yet arrived at the current phase of this barrier.

Returns:
the number of unarrived parties

getParent

public Phaser getParent()
Returns the parent of this phaser, or null if none.

Returns:
the parent of this phaser, or null if none.

getRoot

public Phaser getRoot()
Returns the root ancestor of this phaser, which is the same as this phaser if it has no parent.

Returns:
the root ancestor of this phaser.

isTerminated

public boolean isTerminated()
Returns true if this barrier has been terminated.

Returns:
true if this barrier has been terminated

onAdvance

protected boolean onAdvance(int phase,
                            int registeredParties)
Overridable method to perform an action upon phase advance, and to control termination. This method is invoked whenever the barrier is tripped (and thus all other waiting parties are dormant). If it returns true, then, rather than advance the phase number, this barrier will be set to a final termination state, and subsequent calls to isTerminated will return true.

The default version returns true when the number of registered parties is zero. Normally, overrides that arrange termination for other reasons should also preserve this property.

You may override this method to perform an action with side effects visible to participating tasks, but it is in general only sensible to do so in designs where all parties register before any arrive, and all awaitAdvance at each phase. Otherwise, you cannot ensure lack of interference. In particular, this method may be invoked more than once per transition if other parties successfully register while the invocation of this method is in progress, thus postponing the transition until those parties also arrive, re-triggering this method.

Parameters:
phase - the phase number on entering the barrier
registeredParties - the current number of registered parties.
Returns:
true if this barrier should terminate

toString

public String toString()
Returns a string identifying this phaser, as well as its state. The state, in brackets, includes the String "phase =" followed by the phase number, "parties =" followed by the number of registered parties, and "arrived =" followed by the number of arrived parties

Overrides:
toString in class Object
Returns:
a string identifying this barrier, as well as its state