ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.5
Committed: Sun Sep 7 11:24:26 2008 UTC (15 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.4: +1 -1 lines
Log Message:
Typos

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.util.concurrent.*;
9     import java.util.concurrent.atomic.*;
10     import java.util.concurrent.locks.LockSupport;
11 dl 1.4 import sun.misc.Unsafe;
12     import java.lang.reflect.*;
13 dl 1.1
14     /**
15     * A reusable synchronization barrier, similar in functionality to a
16 dl 1.4 * {@link java.util.concurrent.CyclicBarrier} and {@link
17     * java.util.concurrent.CountDownLatch} but supporting more flexible
18     * usage.
19 dl 1.1 *
20     * <ul>
21     *
22 dl 1.4 * <li> The number of parties synchronizing on a phaser may vary over
23     * time. A task may register to be a party at any time, and may
24     * deregister upon arriving at the barrier. As is the case with most
25     * basic synchronization constructs, registration and deregistration
26     * affect only internal counts; they do not establish any further
27     * internal bookkeeping, so tasks cannot query whether they are
28     * registered. (However, you can introduce such bookkeeping in by
29     * subclassing this class.)
30 dl 1.1 *
31     * <li> Each generation has an associated phase value, starting at
32     * zero, and advancing when all parties reach the barrier (wrapping
33     * around to zero after reaching <tt>Integer.MAX_VALUE</tt>).
34 jsr166 1.3 *
35 dl 1.1 * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
36     * Method <tt>arriveAndAwaitAdvance</tt> has effect analogous to
37     * <tt>CyclicBarrier.await</tt>. However, Phasers separate two
38 dl 1.4 * aspects of coordination, that may also be invoked independently:
39 dl 1.1 *
40     * <ul>
41     *
42     * <li> Arriving at a barrier. Methods <tt>arrive</tt> and
43 jsr166 1.3 * <tt>arriveAndDeregister</tt> do not block, but return
44 dl 1.4 * the phase value current upon entry to the method.
45 dl 1.1 *
46     * <li> Awaiting others. Method <tt>awaitAdvance</tt> requires an
47     * argument indicating the entry phase, and returns when the
48     * barrier advances to a new phase.
49     * </ul>
50     *
51     *
52     * <li> Barrier actions, performed by the task triggering a phase
53     * advance while others may be waiting, are arranged by overriding
54     * method <tt>onAdvance</tt>, that also controls termination.
55 dl 1.5 * Overriding this method may be used to similar but more flexible
56 dl 1.4 * effect as providing a barrier action to a CyclicBarrier.
57 dl 1.1 *
58     * <li> Phasers may enter a <em>termination</em> state in which all
59     * await actions immediately return, indicating (via a negative phase
60     * value) that execution is complete. Termination is triggered by
61     * executing the overridable <tt>onAdvance</tt> method that is invoked
62 dl 1.4 * each time the barrier is about to be tripped. When a Phaser is
63     * controlling an action with a fixed number of iterations, it is
64     * often convenient to override this method to cause termination when
65     * the current phase number reaches a threshold. Method
66     * <tt>forceTermination</tt> is also available to abruptly release
67     * waiting threads and allow them to terminate.
68     *
69     * <li> Phasers may be tiered to reduce contention. Phasers with large
70     * numbers of parties that would otherwise experience heavy
71     * synchronization contention costs may instead be arranged in trees.
72     * This will typically greatly increase throughput even though it
73     * incurs somewhat greater per-operation overhead.
74 jsr166 1.3 *
75 dl 1.1 * <li> By default, <tt>awaitAdvance</tt> continues to wait even if
76 dl 1.4 * the waiting thread is interrupted. And unlike the case in
77 dl 1.1 * CyclicBarriers, exceptions encountered while tasks wait
78     * interruptibly or with timeout do not change the state of the
79     * barrier. If necessary, you can perform any associated recovery
80 dl 1.4 * within handlers of those exceptions, often after invoking
81     * <tt>forceTermination</tt>.
82 dl 1.1 *
83     * </ul>
84     *
85 dl 1.4 * <p><b>Sample usages:</b>
86     *
87     * <p>A Phaser may be used instead of a <tt>CountdownLatch</tt> to control
88     * a one-shot action serving a variable number of parties. The typical
89     * idiom is for the method setting this up to first register, then
90     * start the actions, then deregister, as in:
91 dl 1.1 *
92 dl 1.4 * <pre>
93     * void runTasks(List&lt;Runnable&gt; list) {
94     * final Phaser phaser = new Phaser(1); // "1" to register self
95     * for (Runnable r : list) {
96     * phaser.register();
97     * new Thread() {
98     * public void run() {
99     * phaser.arriveAndAwaitAdvance(); // await all creation
100     * r.run();
101     * phaser.arriveAndDeregister(); // signal completion
102     * }
103     * }.start();
104     * }
105     * phaser.arrive(); // allow threads to start
106     * int p = phaser.arriveAndDeregister(); // deregister self
107     * otherActions(); // do other things while tasks execute
108     * phaser.awaitAdvance(p); // wait for all tasks to arrive
109     * }
110     * </pre>
111 dl 1.1 *
112 dl 1.4 * <p>One way to cause a set of threads to repeatedly perform actions
113     * for a given number of iterations is to override <tt>onAdvance</tt>:
114 dl 1.1 *
115     * <pre>
116 dl 1.4 * void startTasks(List&lt;Runnable&gt; list, final int iterations) {
117     * final Phaser phaser = new Phaser() {
118     * public boolean onAdvance(int phase, int registeredParties) {
119     * return phase &gt;= iterations || registeredParties == 0;
120     * }
121     * };
122     * phaser.register();
123     * for (Runnable r : list) {
124     * phaser.register();
125     * new Thread() {
126     * public void run() {
127     * do {
128     * r.run();
129     * phaser.arriveAndAwaitAdvance();
130     * } while(!phaser.isTerminated();
131     * }
132     * }.start();
133 dl 1.1 * }
134 dl 1.4 * phaser.arriveAndDeregister(); // deregister self, don't wait
135 dl 1.1 * }
136     * </pre>
137     *
138 dl 1.4 * <p> To create a set of tasks using a tree of Phasers,
139     * you could use code of the following form, assuming a
140     * Task class with a constructor accepting a Phaser that
141     * it registers for upon construction:
142     * <pre>
143     * void build(Task[] actions, int lo, int hi, Phaser b) {
144     * int step = (hi - lo) / TASKS_PER_PHASER;
145     * if (step &gt; 1) {
146     * int i = lo;
147     * while (i &lt; hi) {
148     * int r = Math.min(i + step, hi);
149     * build(actions, i, r, new Phaser(b));
150     * i = r;
151     * }
152     * }
153     * else {
154     * for (int i = lo; i &lt; hi; ++i)
155     * actions[i] = new Task(b);
156     * // assumes new Task(b) performs b.register()
157     * }
158     * }
159     * // .. initially called, for n tasks via
160     * build(new Task[n], 0, n, new Phaser());
161     * </pre>
162     *
163     * The best value of <tt>TASKS_PER_PHASER</tt> depends mainly on
164     * expected barrier synchronization rates. A value as low as four may
165     * be appropriate for extremely small per-barrier task bodies (thus
166     * high rates), or up to hundreds for extremely large ones.
167     *
168     * </pre>
169     *
170 dl 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
171 dl 1.4 * maximum number of parties to 65535. Attempts to register additional
172     * parties result in IllegalStateExceptions. However, you can and
173     * should create tiered phasers to accommodate arbitrarily large sets
174     * of participants.
175 dl 1.1 */
176     public class Phaser {
177     /*
178     * This class implements an extension of X10 "clocks". Thanks to
179 dl 1.4 * Vijay Saraswat for the idea, and to Vivek Sarkar for
180     * enhancements to extend functionality.
181 dl 1.1 */
182    
183     /**
184     * Barrier state representation. Conceptually, a barrier contains
185     * four values:
186 jsr166 1.3 *
187 dl 1.1 * * parties -- the number of parties to wait (16 bits)
188     * * unarrived -- the number of parties yet to hit barrier (16 bits)
189     * * phase -- the generation of the barrier (31 bits)
190     * * terminated -- set if barrier is terminated (1 bit)
191     *
192     * However, to efficiently maintain atomicity, these values are
193 dl 1.4 * packed into a single (atomic) long. Termination uses the sign
194     * bit of 32 bit representation of phase, so phase is set to -1 on
195     * termination. Good performace relies on keeping state decoding
196     * and encoding simple, and keeping race windows short.
197     *
198     * Note: there are some cheats in arrive() that rely on unarrived
199     * being lowest 16 bits.
200 dl 1.1 */
201 dl 1.4 private volatile long state;
202 dl 1.1
203     private static final int ushortBits = 16;
204     private static final int ushortMask = (1 << ushortBits) - 1;
205     private static final int phaseMask = 0x7fffffff;
206    
207     private static int unarrivedOf(long s) {
208     return (int)(s & ushortMask);
209     }
210    
211     private static int partiesOf(long s) {
212     return (int)(s & (ushortMask << 16)) >>> 16;
213     }
214    
215     private static int phaseOf(long s) {
216     return (int)(s >>> 32);
217     }
218    
219     private static int arrivedOf(long s) {
220     return partiesOf(s) - unarrivedOf(s);
221     }
222    
223     private static long stateFor(int phase, int parties, int unarrived) {
224     return (((long)phase) << 32) | ((parties << 16) | unarrived);
225     }
226    
227 dl 1.4 private static long trippedStateFor(int phase, int parties) {
228     return (((long)phase) << 32) | ((parties << 16) | parties);
229     }
230    
231 dl 1.1 private static IllegalStateException badBounds(int parties, int unarrived) {
232 dl 1.4 return new IllegalStateException
233     ("Attempt to set " + unarrived +
234     " unarrived of " + parties + " parties");
235     }
236    
237     /**
238     * The parent of this phaser, or null if none
239     */
240     private final Phaser parent;
241    
242     /**
243     * The root of Phaser tree. Equals this if not in a tree. Used to
244     * support faster state push-down.
245     */
246     private final Phaser root;
247    
248     // Wait queues
249    
250     /**
251     * Heads of Treiber stacks waiting for nonFJ threads. To eliminate
252     * contention while releasing some threads while adding others, we
253     * use two of them, alternating across even and odd phases.
254     */
255     private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
256     private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
257    
258     private AtomicReference<QNode> queueFor(int phase) {
259     return (phase & 1) == 0? evenQ : oddQ;
260     }
261    
262     /**
263     * Returns current state, first resolving lagged propagation from
264     * root if necessary.
265     */
266     private long getReconciledState() {
267     return parent == null? state : reconcileState();
268     }
269    
270     /**
271     * Recursively resolves state.
272     */
273     private long reconcileState() {
274     Phaser p = parent;
275     long s = state;
276     if (p != null) {
277     while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
278     long parentState = p.getReconciledState();
279     int parentPhase = phaseOf(parentState);
280     int phase = phaseOf(s = state);
281     if (phase != parentPhase) {
282     long next = trippedStateFor(parentPhase, partiesOf(s));
283     if (casState(s, next)) {
284     releaseWaiters(phase);
285     s = next;
286     }
287     }
288     }
289     }
290     return s;
291 dl 1.1 }
292    
293     /**
294     * Creates a new Phaser without any initially registered parties,
295 dl 1.4 * initial phase number 0, and no parent.
296 dl 1.1 */
297     public Phaser() {
298 dl 1.4 this(null);
299 dl 1.1 }
300    
301     /**
302     * Creates a new Phaser with the given numbers of registered
303 dl 1.4 * unarrived parties, initial phase number 0, and no parent.
304 dl 1.1 * @param parties the number of parties required to trip barrier.
305     * @throws IllegalArgumentException if parties less than zero
306     * or greater than the maximum number of parties supported.
307     */
308     public Phaser(int parties) {
309 dl 1.4 this(null, parties);
310     }
311    
312     /**
313     * Creates a new Phaser with the given parent, without any
314     * initially registered parties. If parent is non-null this phaser
315     * is registered with the parent and its initial phase number is
316     * the same as that of parent phaser.
317     * @param parent the parent phaser.
318     */
319     public Phaser(Phaser parent) {
320     int phase = 0;
321     this.parent = parent;
322     if (parent != null) {
323     this.root = parent.root;
324     phase = parent.register();
325     }
326     else
327     this.root = this;
328     this.state = trippedStateFor(phase, 0);
329     }
330    
331     /**
332     * Creates a new Phaser with the given parent and numbers of
333     * registered unarrived parties. If parent is non-null this phaser
334     * is registered with the parent and its initial phase number is
335     * the same as that of parent phaser.
336     * @param parent the parent phaser.
337     * @param parties the number of parties required to trip barrier.
338     * @throws IllegalArgumentException if parties less than zero
339     * or greater than the maximum number of parties supported.
340     */
341     public Phaser(Phaser parent, int parties) {
342 dl 1.1 if (parties < 0 || parties > ushortMask)
343     throw new IllegalArgumentException("Illegal number of parties");
344 dl 1.4 int phase = 0;
345     this.parent = parent;
346     if (parent != null) {
347     this.root = parent.root;
348     phase = parent.register();
349     }
350     else
351     this.root = this;
352     this.state = trippedStateFor(phase, parties);
353 dl 1.1 }
354    
355     /**
356     * Adds a new unarrived party to this phaser.
357     * @return the current barrier phase number upon registration
358     * @throws IllegalStateException if attempting to register more
359     * than the maximum supported number of parties.
360     */
361 dl 1.4 public int register() {
362     return doRegister(1);
363     }
364    
365     /**
366     * Adds the given number of new unarrived parties to this phaser.
367     * @param parties the number of parties required to trip barrier.
368     * @return the current barrier phase number upon registration
369     * @throws IllegalStateException if attempting to register more
370     * than the maximum supported number of parties.
371     */
372     public int bulkRegister(int parties) {
373     if (parties < 0)
374     throw new IllegalArgumentException();
375     if (parties == 0)
376     return getPhase();
377     return doRegister(parties);
378     }
379    
380     /**
381     * Shared code for register, bulkRegister
382     */
383     private int doRegister(int registrations) {
384     int phase;
385 dl 1.1 for (;;) {
386 dl 1.4 long s = getReconciledState();
387     phase = phaseOf(s);
388     int unarrived = unarrivedOf(s) + registrations;
389     int parties = partiesOf(s) + registrations;
390     if (phase < 0)
391     break;
392 dl 1.1 if (parties > ushortMask || unarrived > ushortMask)
393     throw badBounds(parties, unarrived);
394 dl 1.4 if (phase == phaseOf(root.state) &&
395     casState(s, stateFor(phase, parties, unarrived)))
396     break;
397 dl 1.1 }
398 dl 1.4 return phase;
399 dl 1.1 }
400    
401     /**
402     * Arrives at the barrier, but does not wait for others. (You can
403     * in turn wait for others via {@link #awaitAdvance}).
404     *
405 dl 1.4 * @return the barrier phase number upon entry to this method, or a
406     * negative value if terminated;
407     * @throws IllegalStateException if not terminated and the number
408     * of unarrived parties would become negative.
409 dl 1.1 */
410 dl 1.4 public int arrive() {
411     int phase;
412 dl 1.1 for (;;) {
413 dl 1.4 long s = state;
414     phase = phaseOf(s);
415 dl 1.1 int parties = partiesOf(s);
416     int unarrived = unarrivedOf(s) - 1;
417 dl 1.4 if (unarrived > 0) { // Not the last arrival
418     if (casState(s, s - 1)) // s-1 adds one arrival
419     break;
420     }
421     else if (unarrived == 0) { // the last arrival
422     Phaser par = parent;
423     if (par == null) { // directly trip
424     if (casState
425     (s,
426     trippedStateFor(onAdvance(phase, parties)? -1 :
427     ((phase + 1) & phaseMask), parties))) {
428     releaseWaiters(phase);
429     break;
430     }
431     }
432     else { // cascade to parent
433     if (casState(s, s - 1)) { // zeroes unarrived
434     par.arrive();
435     reconcileState();
436     break;
437     }
438     }
439     }
440     else if (phase < 0) // Don't throw exception if terminated
441     break;
442     else if (phase != phaseOf(root.state)) // or if unreconciled
443     reconcileState();
444     else
445 dl 1.1 throw badBounds(parties, unarrived);
446     }
447 dl 1.4 return phase;
448 dl 1.1 }
449    
450     /**
451     * Arrives at the barrier, and deregisters from it, without
452 dl 1.4 * waiting for others. Deregistration reduces number of parties
453     * required to trip the barrier in future phases. If this phaser
454     * has a parent, and deregistration causes this phaser to have
455     * zero parties, this phaser is also deregistered from its parent.
456 dl 1.1 *
457     * @return the current barrier phase number upon entry to
458     * this method, or a negative value if terminated;
459 dl 1.4 * @throws IllegalStateException if not terminated and the number
460     * of registered or unarrived parties would become negative.
461 dl 1.1 */
462 dl 1.4 public int arriveAndDeregister() {
463     // similar code to arrive, but too different to merge
464     Phaser par = parent;
465     int phase;
466 dl 1.1 for (;;) {
467 dl 1.4 long s = state;
468     phase = phaseOf(s);
469 dl 1.1 int parties = partiesOf(s) - 1;
470     int unarrived = unarrivedOf(s) - 1;
471 dl 1.4 if (parties >= 0) {
472     if (unarrived > 0 || (unarrived == 0 && par != null)) {
473     if (casState
474     (s,
475     stateFor(phase, parties, unarrived))) {
476     if (unarrived == 0) {
477     par.arriveAndDeregister();
478     reconcileState();
479     }
480     break;
481     }
482     continue;
483     }
484     if (unarrived == 0) {
485     if (casState
486     (s,
487     trippedStateFor(onAdvance(phase, parties)? -1 :
488     ((phase + 1) & phaseMask), parties))) {
489     releaseWaiters(phase);
490     break;
491     }
492     continue;
493     }
494     if (phase < 0)
495     break;
496     if (par != null && phase != phaseOf(root.state)) {
497     reconcileState();
498     continue;
499     }
500 dl 1.1 }
501 dl 1.4 throw badBounds(parties, unarrived);
502 dl 1.1 }
503 dl 1.4 return phase;
504 dl 1.1 }
505    
506     /**
507 dl 1.4 * Arrives at the barrier and awaits others. Equivalent in effect
508     * to <tt>awaitAdvance(arrive())</tt>. If you instead need to
509     * await with interruption of timeout, and/or deregister upon
510     * arrival, you can arrange them using analogous constructions.
511     * @return the phase on entry to this method
512     * @throws IllegalStateException if not terminated and the number
513     * of unarrived parties would become negative.
514 dl 1.1 */
515     public int arriveAndAwaitAdvance() {
516 dl 1.4 return awaitAdvance(arrive());
517 dl 1.1 }
518    
519     /**
520     * Awaits the phase of the barrier to advance from the given
521 dl 1.4 * value, or returns immediately if argument is negative or this
522     * barrier is terminated.
523 dl 1.1 * @param phase the phase on entry to this method
524     * @return the phase on exit from this method
525     */
526     public int awaitAdvance(int phase) {
527     if (phase < 0)
528     return phase;
529 dl 1.4 long s = getReconciledState();
530     int p = phaseOf(s);
531     if (p != phase)
532     return p;
533     if (unarrivedOf(s) == 0)
534     parent.awaitAdvance(phase);
535     // Fall here even if parent waited, to reconcile and help release
536     return untimedWait(phase);
537 dl 1.1 }
538    
539     /**
540     * Awaits the phase of the barrier to advance from the given
541 dl 1.4 * value, or returns immediately if argumet is negative or this
542     * barrier is terminated, or throws InterruptedException if
543     * interrupted while waiting.
544 dl 1.1 * @param phase the phase on entry to this method
545     * @return the phase on exit from this method
546     * @throws InterruptedException if thread interrupted while waiting
547     */
548     public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
549     if (phase < 0)
550     return phase;
551 dl 1.4 long s = getReconciledState();
552     int p = phaseOf(s);
553     if (p != phase)
554     return p;
555     if (unarrivedOf(s) != 0)
556     parent.awaitAdvanceInterruptibly(phase);
557     return interruptibleWait(phase);
558 dl 1.1 }
559    
560     /**
561     * Awaits the phase of the barrier to advance from the given value
562 dl 1.4 * or the given timeout elapses, or returns immediately if
563     * argument is negative or this barrier is terminated.
564 dl 1.1 * @param phase the phase on entry to this method
565     * @return the phase on exit from this method
566     * @throws InterruptedException if thread interrupted while waiting
567     * @throws TimeoutException if timed out while waiting
568     */
569 jsr166 1.3 public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
570 dl 1.1 throws InterruptedException, TimeoutException {
571     if (phase < 0)
572     return phase;
573 dl 1.4 long s = getReconciledState();
574     int p = phaseOf(s);
575     if (p != phase)
576     return p;
577     if (unarrivedOf(s) == 0)
578     parent.awaitAdvanceInterruptibly(phase, timeout, unit);
579     return timedWait(phase, unit.toNanos(timeout));
580 dl 1.1 }
581    
582     /**
583     * Forces this barrier to enter termination state. Counts of
584 dl 1.4 * arrived and registered parties are unaffected. If this phaser
585     * has a parent, it too is terminated. This method may be useful
586     * for coordinating recovery after one or more tasks encounter
587     * unexpected exceptions.
588 dl 1.1 */
589     public void forceTermination() {
590     for (;;) {
591 dl 1.4 long s = getReconciledState();
592 dl 1.1 int phase = phaseOf(s);
593     int parties = partiesOf(s);
594     int unarrived = unarrivedOf(s);
595     if (phase < 0 ||
596 dl 1.4 casState(s, stateFor(-1, parties, unarrived))) {
597     releaseWaiters(0);
598     releaseWaiters(1);
599     if (parent != null)
600     parent.forceTermination();
601 dl 1.1 return;
602     }
603     }
604     }
605    
606     /**
607 dl 1.4 * Returns the current phase number. The maximum phase number is
608     * <tt>Integer.MAX_VALUE</tt>, after which it restarts at
609     * zero. Upon termination, the phase number is negative.
610     * @return the phase number, or a negative value if terminated
611 dl 1.1 */
612 dl 1.4 public final int getPhase() {
613     return phaseOf(getReconciledState());
614 dl 1.1 }
615    
616     /**
617 dl 1.4 * Returns true if the current phase number equals the given phase.
618     * @param phase the phase
619     * @return true if the current phase number equals the given phase.
620 dl 1.1 */
621 dl 1.4 public final boolean hasPhase(int phase) {
622     return phaseOf(getReconciledState()) == phase;
623 dl 1.1 }
624    
625     /**
626     * Returns the number of parties registered at this barrier.
627     * @return the number of parties
628     */
629     public int getRegisteredParties() {
630 dl 1.4 return partiesOf(state);
631 dl 1.1 }
632    
633     /**
634     * Returns the number of parties that have arrived at the current
635     * phase of this barrier.
636     * @return the number of arrived parties
637     */
638     public int getArrivedParties() {
639 dl 1.4 return arrivedOf(state);
640 dl 1.1 }
641    
642     /**
643     * Returns the number of registered parties that have not yet
644     * arrived at the current phase of this barrier.
645     * @return the number of unarrived parties
646     */
647     public int getUnarrivedParties() {
648 dl 1.4 return unarrivedOf(state);
649     }
650    
651     /**
652     * Returns the parent of this phaser, or null if none.
653     * @return the parent of this phaser, or null if none.
654     */
655     public Phaser getParent() {
656     return parent;
657     }
658    
659     /**
660     * Returns the root ancestor of this phaser, which is the same as
661     * this phaser if it has no parent.
662     * @return the root ancestor of this phaser.
663     */
664     public Phaser getRoot() {
665     return root;
666 dl 1.1 }
667    
668     /**
669 jsr166 1.2 * Returns true if this barrier has been terminated.
670 dl 1.1 * @return true if this barrier has been terminated
671     */
672     public boolean isTerminated() {
673 dl 1.4 return getPhase() < 0;
674 dl 1.1 }
675    
676     /**
677     * Overridable method to perform an action upon phase advance, and
678     * to control termination. This method is invoked whenever the
679     * barrier is tripped (and thus all other waiting parties are
680     * dormant). If it returns true, then, rather than advance the
681     * phase number, this barrier will be set to a final termination
682     * state, and subsequent calls to <tt>isTerminated</tt> will
683     * return true.
684 jsr166 1.3 *
685 dl 1.1 * <p> The default version returns true when the number of
686     * registered parties is zero. Normally, overrides that arrange
687     * termination for other reasons should also preserve this
688     * property.
689     *
690 dl 1.4 * <p> You may override this method to perform an action with side
691     * effects visible to participating tasks, but it is in general
692     * only sensible to do so in designs where all parties register
693     * before any arrive, and all <tt>awaitAdvance</tt> at each phase.
694     * Otherwise, you cannot ensure lack of interference. In
695     * particular, this method may be invoked more than once per
696     * transition if other parties successfully register while the
697     * invocation of this method is in progress, thus postponing the
698     * transition until those parties also arrive, re-triggering this
699     * method.
700     *
701 dl 1.1 * @param phase the phase number on entering the barrier
702     * @param registeredParties the current number of registered
703     * parties.
704     * @return true if this barrier should terminate
705     */
706     protected boolean onAdvance(int phase, int registeredParties) {
707     return registeredParties <= 0;
708     }
709    
710     /**
711 dl 1.4 * Returns a string identifying this phaser, as well as its
712 dl 1.1 * state. The state, in brackets, includes the String {@code
713     * "phase ="} followed by the phase number, {@code "parties ="}
714     * followed by the number of registered parties, and {@code
715     * "arrived ="} followed by the number of arrived parties
716     *
717     * @return a string identifying this barrier, as well as its state
718     */
719     public String toString() {
720 dl 1.4 long s = getReconciledState();
721 dl 1.1 return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]";
722     }
723    
724 dl 1.4 // methods for waiting
725 dl 1.1
726     /** The number of CPUs, for spin control */
727     static final int NCPUS = Runtime.getRuntime().availableProcessors();
728    
729     /**
730     * The number of times to spin before blocking in timed waits.
731 jsr166 1.2 * The value is empirically derived.
732 dl 1.1 */
733 jsr166 1.3 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
734 dl 1.1
735     /**
736     * The number of times to spin before blocking in untimed waits.
737     * This is greater than timed value because untimed waits spin
738     * faster since they don't need to check times on each spin.
739     */
740     static final int maxUntimedSpins = maxTimedSpins * 32;
741    
742     /**
743     * The number of nanoseconds for which it is faster to spin
744     * rather than to use timed park. A rough estimate suffices.
745     */
746     static final long spinForTimeoutThreshold = 1000L;
747    
748     /**
749 dl 1.4 * Wait nodes for Treiber stack representing wait queue for non-FJ
750     * tasks.
751     */
752     static final class QNode {
753     QNode next;
754     volatile Thread thread; // nulled to cancel wait
755     QNode() {
756     thread = Thread.currentThread();
757     }
758     void signal() {
759     Thread t = thread;
760     if (t != null) {
761     thread = null;
762     LockSupport.unpark(t);
763     }
764     }
765     }
766    
767     /**
768     * Removes and signals waiting threads from wait queue
769     */
770     private void releaseWaiters(int phase) {
771     AtomicReference<QNode> head = queueFor(phase);
772     QNode q;
773     while ((q = head.get()) != null) {
774     if (head.compareAndSet(q, q.next))
775     q.signal();
776     }
777     }
778    
779     /**
780 dl 1.1 * Enqueues node and waits unless aborted or signalled.
781     */
782 dl 1.4 private int untimedWait(int phase) {
783     int spins = maxUntimedSpins;
784 dl 1.1 QNode node = null;
785 dl 1.4 boolean interrupted = false;
786 dl 1.1 boolean queued = false;
787 dl 1.4 int p;
788     while ((p = getPhase()) == phase) {
789     interrupted = Thread.interrupted();
790     if (node != null) {
791     if (!queued) {
792     AtomicReference<QNode> head = queueFor(phase);
793     queued = head.compareAndSet(node.next = head.get(), node);
794 dl 1.1 }
795 dl 1.4 else if (node.thread != null)
796     LockSupport.park(this);
797 dl 1.1 }
798 dl 1.4 else if (spins <= 0)
799     node = new QNode();
800     else
801     --spins;
802     }
803     if (node != null)
804     node.thread = null;
805     if (interrupted)
806     Thread.currentThread().interrupt();
807     releaseWaiters(phase);
808     return p;
809     }
810    
811     /**
812     * Messier interruptible version
813     */
814     private int interruptibleWait(int phase) throws InterruptedException {
815     int spins = maxUntimedSpins;
816     QNode node = null;
817     boolean queued = false;
818     boolean interrupted = false;
819     int p;
820     while ((p = getPhase()) == phase) {
821     if (interrupted = Thread.interrupted())
822     break;
823     if (node != null) {
824     if (!queued) {
825     AtomicReference<QNode> head = queueFor(phase);
826     queued = head.compareAndSet(node.next = head.get(), node);
827 dl 1.1 }
828 dl 1.4 else if (node.thread != null)
829     LockSupport.park(this);
830 dl 1.1 }
831     else if (spins <= 0)
832 dl 1.4 node = new QNode();
833 dl 1.1 else
834     --spins;
835     }
836     if (node != null)
837     node.thread = null;
838 dl 1.4 if (interrupted)
839     throw new InterruptedException();
840     releaseWaiters(phase);
841     return p;
842 dl 1.1 }
843    
844     /**
845 dl 1.4 * Even messier timeout version.
846 dl 1.1 */
847 dl 1.4 private int timedWait(int phase, long nanos)
848 dl 1.1 throws InterruptedException, TimeoutException {
849 dl 1.4 int p;
850     if ((p = getPhase()) == phase) {
851     long lastTime = System.nanoTime();
852     int spins = maxTimedSpins;
853     QNode node = null;
854     boolean queued = false;
855     boolean interrupted = false;
856     while ((p = getPhase()) == phase) {
857     if (interrupted = Thread.interrupted())
858     break;
859     long now = System.nanoTime();
860     if ((nanos -= now - lastTime) <= 0)
861 dl 1.1 break;
862 dl 1.4 lastTime = now;
863     if (node != null) {
864     if (!queued) {
865     AtomicReference<QNode> head = queueFor(phase);
866     queued = head.compareAndSet(node.next = head.get(), node);
867 dl 1.1 }
868 dl 1.4 else if (node.thread != null &&
869     nanos > spinForTimeoutThreshold) {
870     LockSupport.parkNanos(this, nanos);
871 dl 1.1 }
872     }
873 dl 1.4 else if (spins <= 0)
874     node = new QNode();
875 dl 1.1 else
876 dl 1.4 --spins;
877     }
878     if (node != null)
879     node.thread = null;
880     if (interrupted)
881     throw new InterruptedException();
882     if (p == phase && (p = getPhase()) == phase)
883     throw new TimeoutException();
884     }
885     releaseWaiters(phase);
886     return p;
887     }
888    
889     // Temporary Unsafe mechanics for preliminary release
890    
891     static final Unsafe _unsafe;
892     static final long stateOffset;
893    
894     static {
895     try {
896     if (Phaser.class.getClassLoader() != null) {
897     Field f = Unsafe.class.getDeclaredField("theUnsafe");
898     f.setAccessible(true);
899     _unsafe = (Unsafe)f.get(null);
900 dl 1.1 }
901     else
902 dl 1.4 _unsafe = Unsafe.getUnsafe();
903     stateOffset = _unsafe.objectFieldOffset
904     (Phaser.class.getDeclaredField("state"));
905     } catch (Exception e) {
906     throw new RuntimeException("Could not initialize intrinsics", e);
907 dl 1.1 }
908     }
909    
910 dl 1.4 final boolean casState(long cmp, long val) {
911     return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val);
912     }
913 dl 1.1 }