ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.7
Committed: Mon Jan 5 03:53:26 2009 UTC (15 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.6: +18 -18 lines
Log Message:
use @code

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.util.concurrent.*;
9     import java.util.concurrent.atomic.*;
10     import java.util.concurrent.locks.LockSupport;
11 dl 1.4 import sun.misc.Unsafe;
12     import java.lang.reflect.*;
13 dl 1.1
14     /**
15     * A reusable synchronization barrier, similar in functionality to a
16 dl 1.4 * {@link java.util.concurrent.CyclicBarrier} and {@link
17     * java.util.concurrent.CountDownLatch} but supporting more flexible
18     * usage.
19 dl 1.1 *
20     * <ul>
21     *
22 dl 1.4 * <li> The number of parties synchronizing on a phaser may vary over
23     * time. A task may register to be a party at any time, and may
24     * deregister upon arriving at the barrier. As is the case with most
25     * basic synchronization constructs, registration and deregistration
26     * affect only internal counts; they do not establish any further
27     * internal bookkeeping, so tasks cannot query whether they are
28     * registered. (However, you can introduce such bookkeeping in by
29     * subclassing this class.)
30 dl 1.1 *
31     * <li> Each generation has an associated phase value, starting at
32     * zero, and advancing when all parties reach the barrier (wrapping
33 jsr166 1.7 * around to zero after reaching {@code Integer.MAX_VALUE}).
34 jsr166 1.3 *
35 dl 1.1 * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
36 jsr166 1.7 * Method {@code arriveAndAwaitAdvance} has effect analogous to
37     * {@code CyclicBarrier.await}. However, Phasers separate two
38 dl 1.4 * aspects of coordination, that may also be invoked independently:
39 dl 1.1 *
40     * <ul>
41     *
42 jsr166 1.7 * <li> Arriving at a barrier. Methods {@code arrive} and
43     * {@code arriveAndDeregister} do not block, but return
44 dl 1.4 * the phase value current upon entry to the method.
45 dl 1.1 *
46 jsr166 1.7 * <li> Awaiting others. Method {@code awaitAdvance} requires an
47 dl 1.1 * argument indicating the entry phase, and returns when the
48     * barrier advances to a new phase.
49     * </ul>
50     *
51     *
52     * <li> Barrier actions, performed by the task triggering a phase
53     * advance while others may be waiting, are arranged by overriding
54 jsr166 1.7 * method {@code onAdvance}, that also controls termination.
55 dl 1.5 * Overriding this method may be used to similar but more flexible
56 dl 1.4 * effect as providing a barrier action to a CyclicBarrier.
57 dl 1.1 *
58     * <li> Phasers may enter a <em>termination</em> state in which all
59     * await actions immediately return, indicating (via a negative phase
60     * value) that execution is complete. Termination is triggered by
61 jsr166 1.7 * executing the overridable {@code onAdvance} method that is invoked
62 dl 1.4 * each time the barrier is about to be tripped. When a Phaser is
63     * controlling an action with a fixed number of iterations, it is
64     * often convenient to override this method to cause termination when
65     * the current phase number reaches a threshold. Method
66 jsr166 1.7 * {@code forceTermination} is also available to abruptly release
67 dl 1.4 * waiting threads and allow them to terminate.
68     *
69     * <li> Phasers may be tiered to reduce contention. Phasers with large
70     * numbers of parties that would otherwise experience heavy
71     * synchronization contention costs may instead be arranged in trees.
72     * This will typically greatly increase throughput even though it
73     * incurs somewhat greater per-operation overhead.
74 jsr166 1.3 *
75 jsr166 1.7 * <li> By default, {@code awaitAdvance} continues to wait even if
76 dl 1.4 * the waiting thread is interrupted. And unlike the case in
77 dl 1.1 * CyclicBarriers, exceptions encountered while tasks wait
78     * interruptibly or with timeout do not change the state of the
79     * barrier. If necessary, you can perform any associated recovery
80 dl 1.4 * within handlers of those exceptions, often after invoking
81 jsr166 1.7 * {@code forceTermination}.
82 dl 1.1 *
83     * </ul>
84     *
85 dl 1.4 * <p><b>Sample usages:</b>
86     *
87 jsr166 1.7 * <p>A Phaser may be used instead of a {@code CountdownLatch} to control
88 dl 1.4 * a one-shot action serving a variable number of parties. The typical
89     * idiom is for the method setting this up to first register, then
90     * start the actions, then deregister, as in:
91 dl 1.1 *
92 dl 1.4 * <pre>
93     * void runTasks(List&lt;Runnable&gt; list) {
94     * final Phaser phaser = new Phaser(1); // "1" to register self
95     * for (Runnable r : list) {
96     * phaser.register();
97     * new Thread() {
98     * public void run() {
99     * phaser.arriveAndAwaitAdvance(); // await all creation
100     * r.run();
101     * phaser.arriveAndDeregister(); // signal completion
102     * }
103     * }.start();
104     * }
105 dl 1.6 *
106     * doSomethingOnBehalfOfWorkers();
107 dl 1.4 * phaser.arrive(); // allow threads to start
108 dl 1.6 * int p = phaser.arriveAndDeregister(); // deregister self ...
109     * p = phaser.awaitAdvance(p); // ... and await arrival
110 dl 1.4 * otherActions(); // do other things while tasks execute
111 dl 1.6 * phaser.awaitAdvance(p); // awit final completion
112 dl 1.4 * }
113     * </pre>
114 dl 1.1 *
115 dl 1.4 * <p>One way to cause a set of threads to repeatedly perform actions
116 jsr166 1.7 * for a given number of iterations is to override {@code onAdvance}:
117 dl 1.1 *
118     * <pre>
119 dl 1.4 * void startTasks(List&lt;Runnable&gt; list, final int iterations) {
120     * final Phaser phaser = new Phaser() {
121     * public boolean onAdvance(int phase, int registeredParties) {
122     * return phase &gt;= iterations || registeredParties == 0;
123     * }
124     * };
125     * phaser.register();
126     * for (Runnable r : list) {
127     * phaser.register();
128     * new Thread() {
129     * public void run() {
130     * do {
131     * r.run();
132     * phaser.arriveAndAwaitAdvance();
133     * } while(!phaser.isTerminated();
134     * }
135     * }.start();
136 dl 1.1 * }
137 dl 1.4 * phaser.arriveAndDeregister(); // deregister self, don't wait
138 dl 1.1 * }
139     * </pre>
140     *
141 dl 1.4 * <p> To create a set of tasks using a tree of Phasers,
142     * you could use code of the following form, assuming a
143     * Task class with a constructor accepting a Phaser that
144     * it registers for upon construction:
145     * <pre>
146     * void build(Task[] actions, int lo, int hi, Phaser b) {
147     * int step = (hi - lo) / TASKS_PER_PHASER;
148     * if (step &gt; 1) {
149     * int i = lo;
150     * while (i &lt; hi) {
151     * int r = Math.min(i + step, hi);
152     * build(actions, i, r, new Phaser(b));
153     * i = r;
154     * }
155     * }
156     * else {
157     * for (int i = lo; i &lt; hi; ++i)
158     * actions[i] = new Task(b);
159     * // assumes new Task(b) performs b.register()
160     * }
161     * }
162     * // .. initially called, for n tasks via
163     * build(new Task[n], 0, n, new Phaser());
164     * </pre>
165     *
166 jsr166 1.7 * The best value of {@code TASKS_PER_PHASER} depends mainly on
167 dl 1.4 * expected barrier synchronization rates. A value as low as four may
168     * be appropriate for extremely small per-barrier task bodies (thus
169     * high rates), or up to hundreds for extremely large ones.
170     *
171     * </pre>
172     *
173 dl 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
174 dl 1.4 * maximum number of parties to 65535. Attempts to register additional
175     * parties result in IllegalStateExceptions. However, you can and
176     * should create tiered phasers to accommodate arbitrarily large sets
177     * of participants.
178 dl 1.1 */
179     public class Phaser {
180     /*
181     * This class implements an extension of X10 "clocks". Thanks to
182 dl 1.4 * Vijay Saraswat for the idea, and to Vivek Sarkar for
183     * enhancements to extend functionality.
184 dl 1.1 */
185    
186     /**
187     * Barrier state representation. Conceptually, a barrier contains
188     * four values:
189 jsr166 1.3 *
190 dl 1.1 * * parties -- the number of parties to wait (16 bits)
191     * * unarrived -- the number of parties yet to hit barrier (16 bits)
192     * * phase -- the generation of the barrier (31 bits)
193     * * terminated -- set if barrier is terminated (1 bit)
194     *
195     * However, to efficiently maintain atomicity, these values are
196 dl 1.4 * packed into a single (atomic) long. Termination uses the sign
197     * bit of 32 bit representation of phase, so phase is set to -1 on
198     * termination. Good performace relies on keeping state decoding
199     * and encoding simple, and keeping race windows short.
200     *
201     * Note: there are some cheats in arrive() that rely on unarrived
202     * being lowest 16 bits.
203 dl 1.1 */
204 dl 1.4 private volatile long state;
205 dl 1.1
206     private static final int ushortBits = 16;
207     private static final int ushortMask = (1 << ushortBits) - 1;
208     private static final int phaseMask = 0x7fffffff;
209    
210     private static int unarrivedOf(long s) {
211     return (int)(s & ushortMask);
212     }
213    
214     private static int partiesOf(long s) {
215     return (int)(s & (ushortMask << 16)) >>> 16;
216     }
217    
218     private static int phaseOf(long s) {
219     return (int)(s >>> 32);
220     }
221    
222     private static int arrivedOf(long s) {
223     return partiesOf(s) - unarrivedOf(s);
224     }
225    
226     private static long stateFor(int phase, int parties, int unarrived) {
227     return (((long)phase) << 32) | ((parties << 16) | unarrived);
228     }
229    
230 dl 1.4 private static long trippedStateFor(int phase, int parties) {
231     return (((long)phase) << 32) | ((parties << 16) | parties);
232     }
233    
234 dl 1.1 private static IllegalStateException badBounds(int parties, int unarrived) {
235 dl 1.4 return new IllegalStateException
236     ("Attempt to set " + unarrived +
237     " unarrived of " + parties + " parties");
238     }
239    
240     /**
241     * The parent of this phaser, or null if none
242     */
243     private final Phaser parent;
244    
245     /**
246     * The root of Phaser tree. Equals this if not in a tree. Used to
247     * support faster state push-down.
248     */
249     private final Phaser root;
250    
251     // Wait queues
252    
253     /**
254     * Heads of Treiber stacks waiting for nonFJ threads. To eliminate
255     * contention while releasing some threads while adding others, we
256     * use two of them, alternating across even and odd phases.
257     */
258     private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
259     private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
260    
261     private AtomicReference<QNode> queueFor(int phase) {
262     return (phase & 1) == 0? evenQ : oddQ;
263     }
264    
265     /**
266     * Returns current state, first resolving lagged propagation from
267     * root if necessary.
268     */
269     private long getReconciledState() {
270     return parent == null? state : reconcileState();
271     }
272    
273     /**
274     * Recursively resolves state.
275     */
276     private long reconcileState() {
277     Phaser p = parent;
278     long s = state;
279     if (p != null) {
280     while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
281     long parentState = p.getReconciledState();
282     int parentPhase = phaseOf(parentState);
283     int phase = phaseOf(s = state);
284     if (phase != parentPhase) {
285     long next = trippedStateFor(parentPhase, partiesOf(s));
286     if (casState(s, next)) {
287     releaseWaiters(phase);
288     s = next;
289     }
290     }
291     }
292     }
293     return s;
294 dl 1.1 }
295    
296     /**
297     * Creates a new Phaser without any initially registered parties,
298 dl 1.4 * initial phase number 0, and no parent.
299 dl 1.1 */
300     public Phaser() {
301 dl 1.4 this(null);
302 dl 1.1 }
303    
304     /**
305     * Creates a new Phaser with the given numbers of registered
306 dl 1.4 * unarrived parties, initial phase number 0, and no parent.
307 dl 1.1 * @param parties the number of parties required to trip barrier.
308     * @throws IllegalArgumentException if parties less than zero
309     * or greater than the maximum number of parties supported.
310     */
311     public Phaser(int parties) {
312 dl 1.4 this(null, parties);
313     }
314    
315     /**
316     * Creates a new Phaser with the given parent, without any
317     * initially registered parties. If parent is non-null this phaser
318     * is registered with the parent and its initial phase number is
319     * the same as that of parent phaser.
320     * @param parent the parent phaser.
321     */
322     public Phaser(Phaser parent) {
323     int phase = 0;
324     this.parent = parent;
325     if (parent != null) {
326     this.root = parent.root;
327     phase = parent.register();
328     }
329     else
330     this.root = this;
331     this.state = trippedStateFor(phase, 0);
332     }
333    
334     /**
335     * Creates a new Phaser with the given parent and numbers of
336     * registered unarrived parties. If parent is non-null this phaser
337     * is registered with the parent and its initial phase number is
338     * the same as that of parent phaser.
339     * @param parent the parent phaser.
340     * @param parties the number of parties required to trip barrier.
341     * @throws IllegalArgumentException if parties less than zero
342     * or greater than the maximum number of parties supported.
343     */
344     public Phaser(Phaser parent, int parties) {
345 dl 1.1 if (parties < 0 || parties > ushortMask)
346     throw new IllegalArgumentException("Illegal number of parties");
347 dl 1.4 int phase = 0;
348     this.parent = parent;
349     if (parent != null) {
350     this.root = parent.root;
351     phase = parent.register();
352     }
353     else
354     this.root = this;
355     this.state = trippedStateFor(phase, parties);
356 dl 1.1 }
357    
358     /**
359     * Adds a new unarrived party to this phaser.
360     * @return the current barrier phase number upon registration
361     * @throws IllegalStateException if attempting to register more
362     * than the maximum supported number of parties.
363     */
364 dl 1.4 public int register() {
365     return doRegister(1);
366     }
367    
368     /**
369     * Adds the given number of new unarrived parties to this phaser.
370     * @param parties the number of parties required to trip barrier.
371     * @return the current barrier phase number upon registration
372     * @throws IllegalStateException if attempting to register more
373     * than the maximum supported number of parties.
374     */
375     public int bulkRegister(int parties) {
376     if (parties < 0)
377     throw new IllegalArgumentException();
378     if (parties == 0)
379     return getPhase();
380     return doRegister(parties);
381     }
382    
383     /**
384     * Shared code for register, bulkRegister
385     */
386     private int doRegister(int registrations) {
387     int phase;
388 dl 1.1 for (;;) {
389 dl 1.4 long s = getReconciledState();
390     phase = phaseOf(s);
391     int unarrived = unarrivedOf(s) + registrations;
392     int parties = partiesOf(s) + registrations;
393     if (phase < 0)
394     break;
395 dl 1.1 if (parties > ushortMask || unarrived > ushortMask)
396     throw badBounds(parties, unarrived);
397 dl 1.4 if (phase == phaseOf(root.state) &&
398     casState(s, stateFor(phase, parties, unarrived)))
399     break;
400 dl 1.1 }
401 dl 1.4 return phase;
402 dl 1.1 }
403    
404     /**
405     * Arrives at the barrier, but does not wait for others. (You can
406     * in turn wait for others via {@link #awaitAdvance}).
407     *
408 dl 1.4 * @return the barrier phase number upon entry to this method, or a
409     * negative value if terminated;
410     * @throws IllegalStateException if not terminated and the number
411     * of unarrived parties would become negative.
412 dl 1.1 */
413 dl 1.4 public int arrive() {
414     int phase;
415 dl 1.1 for (;;) {
416 dl 1.4 long s = state;
417     phase = phaseOf(s);
418 dl 1.1 int parties = partiesOf(s);
419     int unarrived = unarrivedOf(s) - 1;
420 dl 1.4 if (unarrived > 0) { // Not the last arrival
421     if (casState(s, s - 1)) // s-1 adds one arrival
422     break;
423     }
424     else if (unarrived == 0) { // the last arrival
425     Phaser par = parent;
426     if (par == null) { // directly trip
427     if (casState
428     (s,
429     trippedStateFor(onAdvance(phase, parties)? -1 :
430     ((phase + 1) & phaseMask), parties))) {
431     releaseWaiters(phase);
432     break;
433     }
434     }
435     else { // cascade to parent
436     if (casState(s, s - 1)) { // zeroes unarrived
437     par.arrive();
438     reconcileState();
439     break;
440     }
441     }
442     }
443     else if (phase < 0) // Don't throw exception if terminated
444     break;
445     else if (phase != phaseOf(root.state)) // or if unreconciled
446     reconcileState();
447     else
448 dl 1.1 throw badBounds(parties, unarrived);
449     }
450 dl 1.4 return phase;
451 dl 1.1 }
452    
453     /**
454     * Arrives at the barrier, and deregisters from it, without
455 dl 1.4 * waiting for others. Deregistration reduces number of parties
456     * required to trip the barrier in future phases. If this phaser
457     * has a parent, and deregistration causes this phaser to have
458     * zero parties, this phaser is also deregistered from its parent.
459 dl 1.1 *
460     * @return the current barrier phase number upon entry to
461     * this method, or a negative value if terminated;
462 dl 1.4 * @throws IllegalStateException if not terminated and the number
463     * of registered or unarrived parties would become negative.
464 dl 1.1 */
465 dl 1.4 public int arriveAndDeregister() {
466     // similar code to arrive, but too different to merge
467     Phaser par = parent;
468     int phase;
469 dl 1.1 for (;;) {
470 dl 1.4 long s = state;
471     phase = phaseOf(s);
472 dl 1.1 int parties = partiesOf(s) - 1;
473     int unarrived = unarrivedOf(s) - 1;
474 dl 1.4 if (parties >= 0) {
475     if (unarrived > 0 || (unarrived == 0 && par != null)) {
476     if (casState
477     (s,
478     stateFor(phase, parties, unarrived))) {
479     if (unarrived == 0) {
480     par.arriveAndDeregister();
481     reconcileState();
482     }
483     break;
484     }
485     continue;
486     }
487     if (unarrived == 0) {
488     if (casState
489     (s,
490     trippedStateFor(onAdvance(phase, parties)? -1 :
491     ((phase + 1) & phaseMask), parties))) {
492     releaseWaiters(phase);
493     break;
494     }
495     continue;
496     }
497     if (phase < 0)
498     break;
499     if (par != null && phase != phaseOf(root.state)) {
500     reconcileState();
501     continue;
502     }
503 dl 1.1 }
504 dl 1.4 throw badBounds(parties, unarrived);
505 dl 1.1 }
506 dl 1.4 return phase;
507 dl 1.1 }
508    
509     /**
510 dl 1.4 * Arrives at the barrier and awaits others. Equivalent in effect
511 jsr166 1.7 * to {@code awaitAdvance(arrive())}. If you instead need to
512 dl 1.4 * await with interruption of timeout, and/or deregister upon
513     * arrival, you can arrange them using analogous constructions.
514     * @return the phase on entry to this method
515     * @throws IllegalStateException if not terminated and the number
516     * of unarrived parties would become negative.
517 dl 1.1 */
518     public int arriveAndAwaitAdvance() {
519 dl 1.4 return awaitAdvance(arrive());
520 dl 1.1 }
521    
522     /**
523     * Awaits the phase of the barrier to advance from the given
524 dl 1.4 * value, or returns immediately if argument is negative or this
525     * barrier is terminated.
526 dl 1.1 * @param phase the phase on entry to this method
527     * @return the phase on exit from this method
528     */
529     public int awaitAdvance(int phase) {
530     if (phase < 0)
531     return phase;
532 dl 1.4 long s = getReconciledState();
533     int p = phaseOf(s);
534     if (p != phase)
535     return p;
536     if (unarrivedOf(s) == 0)
537     parent.awaitAdvance(phase);
538     // Fall here even if parent waited, to reconcile and help release
539     return untimedWait(phase);
540 dl 1.1 }
541    
542     /**
543     * Awaits the phase of the barrier to advance from the given
544 dl 1.4 * value, or returns immediately if argumet is negative or this
545     * barrier is terminated, or throws InterruptedException if
546     * interrupted while waiting.
547 dl 1.1 * @param phase the phase on entry to this method
548     * @return the phase on exit from this method
549     * @throws InterruptedException if thread interrupted while waiting
550     */
551     public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
552     if (phase < 0)
553     return phase;
554 dl 1.4 long s = getReconciledState();
555     int p = phaseOf(s);
556     if (p != phase)
557     return p;
558     if (unarrivedOf(s) != 0)
559     parent.awaitAdvanceInterruptibly(phase);
560     return interruptibleWait(phase);
561 dl 1.1 }
562    
563     /**
564     * Awaits the phase of the barrier to advance from the given value
565 dl 1.4 * or the given timeout elapses, or returns immediately if
566     * argument is negative or this barrier is terminated.
567 dl 1.1 * @param phase the phase on entry to this method
568     * @return the phase on exit from this method
569     * @throws InterruptedException if thread interrupted while waiting
570     * @throws TimeoutException if timed out while waiting
571     */
572 jsr166 1.3 public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
573 dl 1.1 throws InterruptedException, TimeoutException {
574     if (phase < 0)
575     return phase;
576 dl 1.4 long s = getReconciledState();
577     int p = phaseOf(s);
578     if (p != phase)
579     return p;
580     if (unarrivedOf(s) == 0)
581     parent.awaitAdvanceInterruptibly(phase, timeout, unit);
582     return timedWait(phase, unit.toNanos(timeout));
583 dl 1.1 }
584    
585     /**
586     * Forces this barrier to enter termination state. Counts of
587 dl 1.4 * arrived and registered parties are unaffected. If this phaser
588     * has a parent, it too is terminated. This method may be useful
589     * for coordinating recovery after one or more tasks encounter
590     * unexpected exceptions.
591 dl 1.1 */
592     public void forceTermination() {
593     for (;;) {
594 dl 1.4 long s = getReconciledState();
595 dl 1.1 int phase = phaseOf(s);
596     int parties = partiesOf(s);
597     int unarrived = unarrivedOf(s);
598     if (phase < 0 ||
599 dl 1.4 casState(s, stateFor(-1, parties, unarrived))) {
600     releaseWaiters(0);
601     releaseWaiters(1);
602     if (parent != null)
603     parent.forceTermination();
604 dl 1.1 return;
605     }
606     }
607     }
608    
609     /**
610 dl 1.4 * Returns the current phase number. The maximum phase number is
611 jsr166 1.7 * {@code Integer.MAX_VALUE}, after which it restarts at
612 dl 1.4 * zero. Upon termination, the phase number is negative.
613     * @return the phase number, or a negative value if terminated
614 dl 1.1 */
615 dl 1.4 public final int getPhase() {
616     return phaseOf(getReconciledState());
617 dl 1.1 }
618    
619     /**
620 dl 1.4 * Returns true if the current phase number equals the given phase.
621     * @param phase the phase
622     * @return true if the current phase number equals the given phase.
623 dl 1.1 */
624 dl 1.4 public final boolean hasPhase(int phase) {
625     return phaseOf(getReconciledState()) == phase;
626 dl 1.1 }
627    
628     /**
629     * Returns the number of parties registered at this barrier.
630     * @return the number of parties
631     */
632     public int getRegisteredParties() {
633 dl 1.4 return partiesOf(state);
634 dl 1.1 }
635    
636     /**
637     * Returns the number of parties that have arrived at the current
638     * phase of this barrier.
639     * @return the number of arrived parties
640     */
641     public int getArrivedParties() {
642 dl 1.4 return arrivedOf(state);
643 dl 1.1 }
644    
645     /**
646     * Returns the number of registered parties that have not yet
647     * arrived at the current phase of this barrier.
648     * @return the number of unarrived parties
649     */
650     public int getUnarrivedParties() {
651 dl 1.4 return unarrivedOf(state);
652     }
653    
654     /**
655     * Returns the parent of this phaser, or null if none.
656     * @return the parent of this phaser, or null if none.
657     */
658     public Phaser getParent() {
659     return parent;
660     }
661    
662     /**
663     * Returns the root ancestor of this phaser, which is the same as
664     * this phaser if it has no parent.
665     * @return the root ancestor of this phaser.
666     */
667     public Phaser getRoot() {
668     return root;
669 dl 1.1 }
670    
671     /**
672 jsr166 1.2 * Returns true if this barrier has been terminated.
673 dl 1.1 * @return true if this barrier has been terminated
674     */
675     public boolean isTerminated() {
676 dl 1.4 return getPhase() < 0;
677 dl 1.1 }
678    
679     /**
680     * Overridable method to perform an action upon phase advance, and
681     * to control termination. This method is invoked whenever the
682     * barrier is tripped (and thus all other waiting parties are
683     * dormant). If it returns true, then, rather than advance the
684     * phase number, this barrier will be set to a final termination
685 jsr166 1.7 * state, and subsequent calls to {@code isTerminated} will
686 dl 1.1 * return true.
687 jsr166 1.3 *
688 dl 1.1 * <p> The default version returns true when the number of
689     * registered parties is zero. Normally, overrides that arrange
690     * termination for other reasons should also preserve this
691     * property.
692     *
693 dl 1.4 * <p> You may override this method to perform an action with side
694     * effects visible to participating tasks, but it is in general
695     * only sensible to do so in designs where all parties register
696 jsr166 1.7 * before any arrive, and all {@code awaitAdvance} at each phase.
697 dl 1.4 * Otherwise, you cannot ensure lack of interference. In
698     * particular, this method may be invoked more than once per
699     * transition if other parties successfully register while the
700     * invocation of this method is in progress, thus postponing the
701     * transition until those parties also arrive, re-triggering this
702     * method.
703     *
704 dl 1.1 * @param phase the phase number on entering the barrier
705     * @param registeredParties the current number of registered
706     * parties.
707     * @return true if this barrier should terminate
708     */
709     protected boolean onAdvance(int phase, int registeredParties) {
710     return registeredParties <= 0;
711     }
712    
713     /**
714 dl 1.4 * Returns a string identifying this phaser, as well as its
715 dl 1.1 * state. The state, in brackets, includes the String {@code
716     * "phase ="} followed by the phase number, {@code "parties ="}
717     * followed by the number of registered parties, and {@code
718     * "arrived ="} followed by the number of arrived parties
719     *
720     * @return a string identifying this barrier, as well as its state
721     */
722     public String toString() {
723 dl 1.4 long s = getReconciledState();
724 dl 1.1 return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]";
725     }
726    
727 dl 1.4 // methods for waiting
728 dl 1.1
729     /** The number of CPUs, for spin control */
730     static final int NCPUS = Runtime.getRuntime().availableProcessors();
731    
732     /**
733     * The number of times to spin before blocking in timed waits.
734 jsr166 1.2 * The value is empirically derived.
735 dl 1.1 */
736 jsr166 1.3 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
737 dl 1.1
738     /**
739     * The number of times to spin before blocking in untimed waits.
740     * This is greater than timed value because untimed waits spin
741     * faster since they don't need to check times on each spin.
742     */
743     static final int maxUntimedSpins = maxTimedSpins * 32;
744    
745     /**
746     * The number of nanoseconds for which it is faster to spin
747     * rather than to use timed park. A rough estimate suffices.
748     */
749     static final long spinForTimeoutThreshold = 1000L;
750    
751     /**
752 dl 1.4 * Wait nodes for Treiber stack representing wait queue for non-FJ
753     * tasks.
754     */
755     static final class QNode {
756     QNode next;
757     volatile Thread thread; // nulled to cancel wait
758     QNode() {
759     thread = Thread.currentThread();
760     }
761     void signal() {
762     Thread t = thread;
763     if (t != null) {
764     thread = null;
765     LockSupport.unpark(t);
766     }
767     }
768     }
769    
770     /**
771     * Removes and signals waiting threads from wait queue
772     */
773     private void releaseWaiters(int phase) {
774     AtomicReference<QNode> head = queueFor(phase);
775     QNode q;
776     while ((q = head.get()) != null) {
777     if (head.compareAndSet(q, q.next))
778     q.signal();
779     }
780     }
781    
782     /**
783 dl 1.1 * Enqueues node and waits unless aborted or signalled.
784     */
785 dl 1.4 private int untimedWait(int phase) {
786     int spins = maxUntimedSpins;
787 dl 1.1 QNode node = null;
788 dl 1.4 boolean interrupted = false;
789 dl 1.1 boolean queued = false;
790 dl 1.4 int p;
791     while ((p = getPhase()) == phase) {
792     interrupted = Thread.interrupted();
793     if (node != null) {
794     if (!queued) {
795     AtomicReference<QNode> head = queueFor(phase);
796     queued = head.compareAndSet(node.next = head.get(), node);
797 dl 1.1 }
798 dl 1.4 else if (node.thread != null)
799     LockSupport.park(this);
800 dl 1.1 }
801 dl 1.4 else if (spins <= 0)
802     node = new QNode();
803     else
804     --spins;
805     }
806     if (node != null)
807     node.thread = null;
808     if (interrupted)
809     Thread.currentThread().interrupt();
810     releaseWaiters(phase);
811     return p;
812     }
813    
814     /**
815     * Messier interruptible version
816     */
817     private int interruptibleWait(int phase) throws InterruptedException {
818     int spins = maxUntimedSpins;
819     QNode node = null;
820     boolean queued = false;
821     boolean interrupted = false;
822     int p;
823     while ((p = getPhase()) == phase) {
824     if (interrupted = Thread.interrupted())
825     break;
826     if (node != null) {
827     if (!queued) {
828     AtomicReference<QNode> head = queueFor(phase);
829     queued = head.compareAndSet(node.next = head.get(), node);
830 dl 1.1 }
831 dl 1.4 else if (node.thread != null)
832     LockSupport.park(this);
833 dl 1.1 }
834     else if (spins <= 0)
835 dl 1.4 node = new QNode();
836 dl 1.1 else
837     --spins;
838     }
839     if (node != null)
840     node.thread = null;
841 dl 1.4 if (interrupted)
842     throw new InterruptedException();
843     releaseWaiters(phase);
844     return p;
845 dl 1.1 }
846    
847     /**
848 dl 1.4 * Even messier timeout version.
849 dl 1.1 */
850 dl 1.4 private int timedWait(int phase, long nanos)
851 dl 1.1 throws InterruptedException, TimeoutException {
852 dl 1.4 int p;
853     if ((p = getPhase()) == phase) {
854     long lastTime = System.nanoTime();
855     int spins = maxTimedSpins;
856     QNode node = null;
857     boolean queued = false;
858     boolean interrupted = false;
859     while ((p = getPhase()) == phase) {
860     if (interrupted = Thread.interrupted())
861     break;
862     long now = System.nanoTime();
863     if ((nanos -= now - lastTime) <= 0)
864 dl 1.1 break;
865 dl 1.4 lastTime = now;
866     if (node != null) {
867     if (!queued) {
868     AtomicReference<QNode> head = queueFor(phase);
869     queued = head.compareAndSet(node.next = head.get(), node);
870 dl 1.1 }
871 dl 1.4 else if (node.thread != null &&
872     nanos > spinForTimeoutThreshold) {
873     LockSupport.parkNanos(this, nanos);
874 dl 1.1 }
875     }
876 dl 1.4 else if (spins <= 0)
877     node = new QNode();
878 dl 1.1 else
879 dl 1.4 --spins;
880     }
881     if (node != null)
882     node.thread = null;
883     if (interrupted)
884     throw new InterruptedException();
885     if (p == phase && (p = getPhase()) == phase)
886     throw new TimeoutException();
887     }
888     releaseWaiters(phase);
889     return p;
890     }
891    
892     // Temporary Unsafe mechanics for preliminary release
893    
894     static final Unsafe _unsafe;
895     static final long stateOffset;
896    
897     static {
898     try {
899     if (Phaser.class.getClassLoader() != null) {
900     Field f = Unsafe.class.getDeclaredField("theUnsafe");
901     f.setAccessible(true);
902     _unsafe = (Unsafe)f.get(null);
903 dl 1.1 }
904     else
905 dl 1.4 _unsafe = Unsafe.getUnsafe();
906     stateOffset = _unsafe.objectFieldOffset
907     (Phaser.class.getDeclaredField("state"));
908     } catch (Exception e) {
909     throw new RuntimeException("Could not initialize intrinsics", e);
910 dl 1.1 }
911     }
912    
913 dl 1.4 final boolean casState(long cmp, long val) {
914     return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val);
915     }
916 dl 1.1 }