ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.9
Committed: Mon Jan 5 09:11:26 2009 UTC (15 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.8: +19 -16 lines
Log Message:
minor doc fixes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.9
9 dl 1.1 import java.util.concurrent.*;
10     import java.util.concurrent.atomic.*;
11     import java.util.concurrent.locks.LockSupport;
12 dl 1.4 import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14 dl 1.1
15     /**
16     * A reusable synchronization barrier, similar in functionality to a
17 jsr166 1.9 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
18     * {@link java.util.concurrent.CountDownLatch CountDownLatch}
19     * but supporting more flexible usage.
20 dl 1.1 *
21     * <ul>
22     *
23 dl 1.4 * <li> The number of parties synchronizing on a phaser may vary over
24     * time. A task may register to be a party at any time, and may
25     * deregister upon arriving at the barrier. As is the case with most
26     * basic synchronization constructs, registration and deregistration
27     * affect only internal counts; they do not establish any further
28     * internal bookkeeping, so tasks cannot query whether they are
29 jsr166 1.9 * registered. (However, you can introduce such bookkeeping by
30 dl 1.4 * subclassing this class.)
31 dl 1.1 *
32     * <li> Each generation has an associated phase value, starting at
33     * zero, and advancing when all parties reach the barrier (wrapping
34 jsr166 1.7 * around to zero after reaching {@code Integer.MAX_VALUE}).
35 jsr166 1.3 *
36 dl 1.1 * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
37 jsr166 1.7 * Method {@code arriveAndAwaitAdvance} has effect analogous to
38     * {@code CyclicBarrier.await}. However, Phasers separate two
39 dl 1.4 * aspects of coordination, that may also be invoked independently:
40 dl 1.1 *
41     * <ul>
42     *
43 jsr166 1.7 * <li> Arriving at a barrier. Methods {@code arrive} and
44     * {@code arriveAndDeregister} do not block, but return
45 dl 1.4 * the phase value current upon entry to the method.
46 dl 1.1 *
47 jsr166 1.7 * <li> Awaiting others. Method {@code awaitAdvance} requires an
48 dl 1.1 * argument indicating the entry phase, and returns when the
49     * barrier advances to a new phase.
50     * </ul>
51     *
52     *
53     * <li> Barrier actions, performed by the task triggering a phase
54     * advance while others may be waiting, are arranged by overriding
55 jsr166 1.7 * method {@code onAdvance}, that also controls termination.
56 dl 1.5 * Overriding this method may be used to similar but more flexible
57 dl 1.4 * effect as providing a barrier action to a CyclicBarrier.
58 dl 1.1 *
59     * <li> Phasers may enter a <em>termination</em> state in which all
60     * await actions immediately return, indicating (via a negative phase
61     * value) that execution is complete. Termination is triggered by
62 jsr166 1.7 * executing the overridable {@code onAdvance} method that is invoked
63 dl 1.4 * each time the barrier is about to be tripped. When a Phaser is
64     * controlling an action with a fixed number of iterations, it is
65     * often convenient to override this method to cause termination when
66     * the current phase number reaches a threshold. Method
67 jsr166 1.7 * {@code forceTermination} is also available to abruptly release
68 dl 1.4 * waiting threads and allow them to terminate.
69     *
70     * <li> Phasers may be tiered to reduce contention. Phasers with large
71     * numbers of parties that would otherwise experience heavy
72     * synchronization contention costs may instead be arranged in trees.
73     * This will typically greatly increase throughput even though it
74     * incurs somewhat greater per-operation overhead.
75 jsr166 1.3 *
76 jsr166 1.7 * <li> By default, {@code awaitAdvance} continues to wait even if
77 dl 1.4 * the waiting thread is interrupted. And unlike the case in
78 dl 1.1 * CyclicBarriers, exceptions encountered while tasks wait
79     * interruptibly or with timeout do not change the state of the
80     * barrier. If necessary, you can perform any associated recovery
81 dl 1.4 * within handlers of those exceptions, often after invoking
82 jsr166 1.7 * {@code forceTermination}.
83 dl 1.1 *
84     * </ul>
85     *
86 dl 1.4 * <p><b>Sample usages:</b>
87     *
88 jsr166 1.8 * <p>A Phaser may be used instead of a {@code CountDownLatch} to control
89 dl 1.4 * a one-shot action serving a variable number of parties. The typical
90     * idiom is for the method setting this up to first register, then
91     * start the actions, then deregister, as in:
92 dl 1.1 *
93 dl 1.4 * <pre>
94     * void runTasks(List&lt;Runnable&gt; list) {
95     * final Phaser phaser = new Phaser(1); // "1" to register self
96     * for (Runnable r : list) {
97     * phaser.register();
98     * new Thread() {
99     * public void run() {
100     * phaser.arriveAndAwaitAdvance(); // await all creation
101     * r.run();
102     * phaser.arriveAndDeregister(); // signal completion
103     * }
104     * }.start();
105     * }
106 dl 1.6 *
107     * doSomethingOnBehalfOfWorkers();
108 dl 1.4 * phaser.arrive(); // allow threads to start
109 dl 1.6 * int p = phaser.arriveAndDeregister(); // deregister self ...
110     * p = phaser.awaitAdvance(p); // ... and await arrival
111 dl 1.4 * otherActions(); // do other things while tasks execute
112 jsr166 1.8 * phaser.awaitAdvance(p); // await final completion
113 dl 1.4 * }
114     * </pre>
115 dl 1.1 *
116 dl 1.4 * <p>One way to cause a set of threads to repeatedly perform actions
117 jsr166 1.7 * for a given number of iterations is to override {@code onAdvance}:
118 dl 1.1 *
119     * <pre>
120 dl 1.4 * void startTasks(List&lt;Runnable&gt; list, final int iterations) {
121     * final Phaser phaser = new Phaser() {
122     * public boolean onAdvance(int phase, int registeredParties) {
123     * return phase &gt;= iterations || registeredParties == 0;
124     * }
125     * };
126     * phaser.register();
127     * for (Runnable r : list) {
128     * phaser.register();
129     * new Thread() {
130     * public void run() {
131     * do {
132     * r.run();
133     * phaser.arriveAndAwaitAdvance();
134     * } while(!phaser.isTerminated();
135     * }
136     * }.start();
137 dl 1.1 * }
138 dl 1.4 * phaser.arriveAndDeregister(); // deregister self, don't wait
139 dl 1.1 * }
140     * </pre>
141     *
142 dl 1.4 * <p> To create a set of tasks using a tree of Phasers,
143     * you could use code of the following form, assuming a
144     * Task class with a constructor accepting a Phaser that
145     * it registers for upon construction:
146     * <pre>
147     * void build(Task[] actions, int lo, int hi, Phaser b) {
148     * int step = (hi - lo) / TASKS_PER_PHASER;
149     * if (step &gt; 1) {
150     * int i = lo;
151     * while (i &lt; hi) {
152     * int r = Math.min(i + step, hi);
153     * build(actions, i, r, new Phaser(b));
154     * i = r;
155     * }
156     * }
157     * else {
158     * for (int i = lo; i &lt; hi; ++i)
159     * actions[i] = new Task(b);
160     * // assumes new Task(b) performs b.register()
161     * }
162     * }
163     * // .. initially called, for n tasks via
164     * build(new Task[n], 0, n, new Phaser());
165     * </pre>
166     *
167 jsr166 1.7 * The best value of {@code TASKS_PER_PHASER} depends mainly on
168 dl 1.4 * expected barrier synchronization rates. A value as low as four may
169     * be appropriate for extremely small per-barrier task bodies (thus
170     * high rates), or up to hundreds for extremely large ones.
171     *
172     * </pre>
173     *
174 dl 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
175 dl 1.4 * maximum number of parties to 65535. Attempts to register additional
176     * parties result in IllegalStateExceptions. However, you can and
177     * should create tiered phasers to accommodate arbitrarily large sets
178     * of participants.
179 dl 1.1 */
180     public class Phaser {
181     /*
182     * This class implements an extension of X10 "clocks". Thanks to
183 dl 1.4 * Vijay Saraswat for the idea, and to Vivek Sarkar for
184     * enhancements to extend functionality.
185 dl 1.1 */
186    
187     /**
188     * Barrier state representation. Conceptually, a barrier contains
189     * four values:
190 jsr166 1.3 *
191 dl 1.1 * * parties -- the number of parties to wait (16 bits)
192     * * unarrived -- the number of parties yet to hit barrier (16 bits)
193     * * phase -- the generation of the barrier (31 bits)
194     * * terminated -- set if barrier is terminated (1 bit)
195     *
196     * However, to efficiently maintain atomicity, these values are
197 dl 1.4 * packed into a single (atomic) long. Termination uses the sign
198     * bit of 32 bit representation of phase, so phase is set to -1 on
199 jsr166 1.8 * termination. Good performance relies on keeping state decoding
200 dl 1.4 * and encoding simple, and keeping race windows short.
201     *
202     * Note: there are some cheats in arrive() that rely on unarrived
203     * being lowest 16 bits.
204 dl 1.1 */
205 dl 1.4 private volatile long state;
206 dl 1.1
207     private static final int ushortBits = 16;
208     private static final int ushortMask = (1 << ushortBits) - 1;
209     private static final int phaseMask = 0x7fffffff;
210    
211     private static int unarrivedOf(long s) {
212     return (int)(s & ushortMask);
213     }
214    
215     private static int partiesOf(long s) {
216     return (int)(s & (ushortMask << 16)) >>> 16;
217     }
218    
219     private static int phaseOf(long s) {
220     return (int)(s >>> 32);
221     }
222    
223     private static int arrivedOf(long s) {
224     return partiesOf(s) - unarrivedOf(s);
225     }
226    
227     private static long stateFor(int phase, int parties, int unarrived) {
228     return (((long)phase) << 32) | ((parties << 16) | unarrived);
229     }
230    
231 dl 1.4 private static long trippedStateFor(int phase, int parties) {
232     return (((long)phase) << 32) | ((parties << 16) | parties);
233     }
234    
235 dl 1.1 private static IllegalStateException badBounds(int parties, int unarrived) {
236 dl 1.4 return new IllegalStateException
237     ("Attempt to set " + unarrived +
238     " unarrived of " + parties + " parties");
239     }
240    
241     /**
242     * The parent of this phaser, or null if none
243     */
244     private final Phaser parent;
245    
246     /**
247     * The root of Phaser tree. Equals this if not in a tree. Used to
248     * support faster state push-down.
249     */
250     private final Phaser root;
251    
252     // Wait queues
253    
254     /**
255     * Heads of Treiber stacks waiting for nonFJ threads. To eliminate
256     * contention while releasing some threads while adding others, we
257     * use two of them, alternating across even and odd phases.
258     */
259     private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
260     private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
261    
262     private AtomicReference<QNode> queueFor(int phase) {
263     return (phase & 1) == 0? evenQ : oddQ;
264     }
265    
266     /**
267     * Returns current state, first resolving lagged propagation from
268     * root if necessary.
269     */
270     private long getReconciledState() {
271     return parent == null? state : reconcileState();
272     }
273    
274     /**
275     * Recursively resolves state.
276     */
277     private long reconcileState() {
278     Phaser p = parent;
279     long s = state;
280     if (p != null) {
281     while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
282     long parentState = p.getReconciledState();
283     int parentPhase = phaseOf(parentState);
284     int phase = phaseOf(s = state);
285     if (phase != parentPhase) {
286     long next = trippedStateFor(parentPhase, partiesOf(s));
287     if (casState(s, next)) {
288     releaseWaiters(phase);
289     s = next;
290     }
291     }
292     }
293     }
294     return s;
295 dl 1.1 }
296    
297     /**
298     * Creates a new Phaser without any initially registered parties,
299 dl 1.4 * initial phase number 0, and no parent.
300 dl 1.1 */
301     public Phaser() {
302 dl 1.4 this(null);
303 dl 1.1 }
304    
305     /**
306     * Creates a new Phaser with the given numbers of registered
307 dl 1.4 * unarrived parties, initial phase number 0, and no parent.
308 dl 1.1 * @param parties the number of parties required to trip barrier.
309     * @throws IllegalArgumentException if parties less than zero
310     * or greater than the maximum number of parties supported.
311     */
312     public Phaser(int parties) {
313 dl 1.4 this(null, parties);
314     }
315    
316     /**
317     * Creates a new Phaser with the given parent, without any
318     * initially registered parties. If parent is non-null this phaser
319     * is registered with the parent and its initial phase number is
320     * the same as that of parent phaser.
321     * @param parent the parent phaser.
322     */
323     public Phaser(Phaser parent) {
324     int phase = 0;
325     this.parent = parent;
326     if (parent != null) {
327     this.root = parent.root;
328     phase = parent.register();
329     }
330     else
331     this.root = this;
332     this.state = trippedStateFor(phase, 0);
333     }
334    
335     /**
336     * Creates a new Phaser with the given parent and numbers of
337     * registered unarrived parties. If parent is non-null this phaser
338     * is registered with the parent and its initial phase number is
339     * the same as that of parent phaser.
340     * @param parent the parent phaser.
341     * @param parties the number of parties required to trip barrier.
342     * @throws IllegalArgumentException if parties less than zero
343     * or greater than the maximum number of parties supported.
344     */
345     public Phaser(Phaser parent, int parties) {
346 dl 1.1 if (parties < 0 || parties > ushortMask)
347     throw new IllegalArgumentException("Illegal number of parties");
348 dl 1.4 int phase = 0;
349     this.parent = parent;
350     if (parent != null) {
351     this.root = parent.root;
352     phase = parent.register();
353     }
354     else
355     this.root = this;
356     this.state = trippedStateFor(phase, parties);
357 dl 1.1 }
358    
359     /**
360     * Adds a new unarrived party to this phaser.
361     * @return the current barrier phase number upon registration
362     * @throws IllegalStateException if attempting to register more
363     * than the maximum supported number of parties.
364     */
365 dl 1.4 public int register() {
366     return doRegister(1);
367     }
368    
369     /**
370     * Adds the given number of new unarrived parties to this phaser.
371     * @param parties the number of parties required to trip barrier.
372     * @return the current barrier phase number upon registration
373     * @throws IllegalStateException if attempting to register more
374     * than the maximum supported number of parties.
375     */
376     public int bulkRegister(int parties) {
377     if (parties < 0)
378     throw new IllegalArgumentException();
379     if (parties == 0)
380     return getPhase();
381     return doRegister(parties);
382     }
383    
384     /**
385     * Shared code for register, bulkRegister
386     */
387     private int doRegister(int registrations) {
388     int phase;
389 dl 1.1 for (;;) {
390 dl 1.4 long s = getReconciledState();
391     phase = phaseOf(s);
392     int unarrived = unarrivedOf(s) + registrations;
393     int parties = partiesOf(s) + registrations;
394     if (phase < 0)
395     break;
396 dl 1.1 if (parties > ushortMask || unarrived > ushortMask)
397     throw badBounds(parties, unarrived);
398 dl 1.4 if (phase == phaseOf(root.state) &&
399     casState(s, stateFor(phase, parties, unarrived)))
400     break;
401 dl 1.1 }
402 dl 1.4 return phase;
403 dl 1.1 }
404    
405     /**
406     * Arrives at the barrier, but does not wait for others. (You can
407     * in turn wait for others via {@link #awaitAdvance}).
408     *
409 dl 1.4 * @return the barrier phase number upon entry to this method, or a
410     * negative value if terminated;
411     * @throws IllegalStateException if not terminated and the number
412     * of unarrived parties would become negative.
413 dl 1.1 */
414 dl 1.4 public int arrive() {
415     int phase;
416 dl 1.1 for (;;) {
417 dl 1.4 long s = state;
418     phase = phaseOf(s);
419 dl 1.1 int parties = partiesOf(s);
420     int unarrived = unarrivedOf(s) - 1;
421 dl 1.4 if (unarrived > 0) { // Not the last arrival
422     if (casState(s, s - 1)) // s-1 adds one arrival
423     break;
424     }
425     else if (unarrived == 0) { // the last arrival
426     Phaser par = parent;
427     if (par == null) { // directly trip
428     if (casState
429     (s,
430     trippedStateFor(onAdvance(phase, parties)? -1 :
431     ((phase + 1) & phaseMask), parties))) {
432     releaseWaiters(phase);
433     break;
434     }
435     }
436     else { // cascade to parent
437     if (casState(s, s - 1)) { // zeroes unarrived
438     par.arrive();
439     reconcileState();
440     break;
441     }
442     }
443     }
444     else if (phase < 0) // Don't throw exception if terminated
445     break;
446     else if (phase != phaseOf(root.state)) // or if unreconciled
447     reconcileState();
448     else
449 dl 1.1 throw badBounds(parties, unarrived);
450     }
451 dl 1.4 return phase;
452 dl 1.1 }
453    
454     /**
455     * Arrives at the barrier, and deregisters from it, without
456 dl 1.4 * waiting for others. Deregistration reduces number of parties
457     * required to trip the barrier in future phases. If this phaser
458     * has a parent, and deregistration causes this phaser to have
459     * zero parties, this phaser is also deregistered from its parent.
460 dl 1.1 *
461     * @return the current barrier phase number upon entry to
462     * this method, or a negative value if terminated;
463 dl 1.4 * @throws IllegalStateException if not terminated and the number
464     * of registered or unarrived parties would become negative.
465 dl 1.1 */
466 dl 1.4 public int arriveAndDeregister() {
467     // similar code to arrive, but too different to merge
468     Phaser par = parent;
469     int phase;
470 dl 1.1 for (;;) {
471 dl 1.4 long s = state;
472     phase = phaseOf(s);
473 dl 1.1 int parties = partiesOf(s) - 1;
474     int unarrived = unarrivedOf(s) - 1;
475 dl 1.4 if (parties >= 0) {
476     if (unarrived > 0 || (unarrived == 0 && par != null)) {
477     if (casState
478     (s,
479     stateFor(phase, parties, unarrived))) {
480     if (unarrived == 0) {
481     par.arriveAndDeregister();
482     reconcileState();
483     }
484     break;
485     }
486     continue;
487     }
488     if (unarrived == 0) {
489     if (casState
490     (s,
491     trippedStateFor(onAdvance(phase, parties)? -1 :
492     ((phase + 1) & phaseMask), parties))) {
493     releaseWaiters(phase);
494     break;
495     }
496     continue;
497     }
498     if (phase < 0)
499     break;
500     if (par != null && phase != phaseOf(root.state)) {
501     reconcileState();
502     continue;
503     }
504 dl 1.1 }
505 dl 1.4 throw badBounds(parties, unarrived);
506 dl 1.1 }
507 dl 1.4 return phase;
508 dl 1.1 }
509    
510     /**
511 dl 1.4 * Arrives at the barrier and awaits others. Equivalent in effect
512 jsr166 1.7 * to {@code awaitAdvance(arrive())}. If you instead need to
513 dl 1.4 * await with interruption of timeout, and/or deregister upon
514     * arrival, you can arrange them using analogous constructions.
515     * @return the phase on entry to this method
516     * @throws IllegalStateException if not terminated and the number
517     * of unarrived parties would become negative.
518 dl 1.1 */
519     public int arriveAndAwaitAdvance() {
520 dl 1.4 return awaitAdvance(arrive());
521 dl 1.1 }
522    
523     /**
524     * Awaits the phase of the barrier to advance from the given
525 dl 1.4 * value, or returns immediately if argument is negative or this
526     * barrier is terminated.
527 dl 1.1 * @param phase the phase on entry to this method
528     * @return the phase on exit from this method
529     */
530     public int awaitAdvance(int phase) {
531     if (phase < 0)
532     return phase;
533 dl 1.4 long s = getReconciledState();
534     int p = phaseOf(s);
535     if (p != phase)
536     return p;
537     if (unarrivedOf(s) == 0)
538     parent.awaitAdvance(phase);
539     // Fall here even if parent waited, to reconcile and help release
540     return untimedWait(phase);
541 dl 1.1 }
542    
543     /**
544     * Awaits the phase of the barrier to advance from the given
545 jsr166 1.8 * value, or returns immediately if argument is negative or this
546 dl 1.4 * barrier is terminated, or throws InterruptedException if
547     * interrupted while waiting.
548 dl 1.1 * @param phase the phase on entry to this method
549     * @return the phase on exit from this method
550     * @throws InterruptedException if thread interrupted while waiting
551     */
552     public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
553     if (phase < 0)
554     return phase;
555 dl 1.4 long s = getReconciledState();
556     int p = phaseOf(s);
557     if (p != phase)
558     return p;
559     if (unarrivedOf(s) != 0)
560     parent.awaitAdvanceInterruptibly(phase);
561     return interruptibleWait(phase);
562 dl 1.1 }
563    
564     /**
565     * Awaits the phase of the barrier to advance from the given value
566 dl 1.4 * or the given timeout elapses, or returns immediately if
567     * argument is negative or this barrier is terminated.
568 dl 1.1 * @param phase the phase on entry to this method
569     * @return the phase on exit from this method
570     * @throws InterruptedException if thread interrupted while waiting
571     * @throws TimeoutException if timed out while waiting
572     */
573 jsr166 1.3 public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
574 dl 1.1 throws InterruptedException, TimeoutException {
575     if (phase < 0)
576     return phase;
577 dl 1.4 long s = getReconciledState();
578     int p = phaseOf(s);
579     if (p != phase)
580     return p;
581     if (unarrivedOf(s) == 0)
582     parent.awaitAdvanceInterruptibly(phase, timeout, unit);
583     return timedWait(phase, unit.toNanos(timeout));
584 dl 1.1 }
585    
586     /**
587     * Forces this barrier to enter termination state. Counts of
588 dl 1.4 * arrived and registered parties are unaffected. If this phaser
589     * has a parent, it too is terminated. This method may be useful
590     * for coordinating recovery after one or more tasks encounter
591     * unexpected exceptions.
592 dl 1.1 */
593     public void forceTermination() {
594     for (;;) {
595 dl 1.4 long s = getReconciledState();
596 dl 1.1 int phase = phaseOf(s);
597     int parties = partiesOf(s);
598     int unarrived = unarrivedOf(s);
599     if (phase < 0 ||
600 dl 1.4 casState(s, stateFor(-1, parties, unarrived))) {
601     releaseWaiters(0);
602     releaseWaiters(1);
603     if (parent != null)
604     parent.forceTermination();
605 dl 1.1 return;
606     }
607     }
608     }
609    
610     /**
611 dl 1.4 * Returns the current phase number. The maximum phase number is
612 jsr166 1.7 * {@code Integer.MAX_VALUE}, after which it restarts at
613 dl 1.4 * zero. Upon termination, the phase number is negative.
614     * @return the phase number, or a negative value if terminated
615 dl 1.1 */
616 dl 1.4 public final int getPhase() {
617     return phaseOf(getReconciledState());
618 dl 1.1 }
619    
620     /**
621 jsr166 1.9 * Returns {@code true} if the current phase number equals the given phase.
622 dl 1.4 * @param phase the phase
623 jsr166 1.9 * @return {@code true} if the current phase number equals the given phase
624 dl 1.1 */
625 dl 1.4 public final boolean hasPhase(int phase) {
626     return phaseOf(getReconciledState()) == phase;
627 dl 1.1 }
628    
629     /**
630     * Returns the number of parties registered at this barrier.
631     * @return the number of parties
632     */
633     public int getRegisteredParties() {
634 dl 1.4 return partiesOf(state);
635 dl 1.1 }
636    
637     /**
638     * Returns the number of parties that have arrived at the current
639     * phase of this barrier.
640     * @return the number of arrived parties
641     */
642     public int getArrivedParties() {
643 dl 1.4 return arrivedOf(state);
644 dl 1.1 }
645    
646     /**
647     * Returns the number of registered parties that have not yet
648     * arrived at the current phase of this barrier.
649     * @return the number of unarrived parties
650     */
651     public int getUnarrivedParties() {
652 dl 1.4 return unarrivedOf(state);
653     }
654    
655     /**
656     * Returns the parent of this phaser, or null if none.
657 jsr166 1.9 * @return the parent of this phaser, or null if none
658 dl 1.4 */
659     public Phaser getParent() {
660     return parent;
661     }
662    
663     /**
664     * Returns the root ancestor of this phaser, which is the same as
665     * this phaser if it has no parent.
666 jsr166 1.9 * @return the root ancestor of this phaser
667 dl 1.4 */
668     public Phaser getRoot() {
669     return root;
670 dl 1.1 }
671    
672     /**
673 jsr166 1.9 * Returns {@code true} if this barrier has been terminated.
674     * @return {@code true} if this barrier has been terminated
675 dl 1.1 */
676     public boolean isTerminated() {
677 dl 1.4 return getPhase() < 0;
678 dl 1.1 }
679    
680     /**
681     * Overridable method to perform an action upon phase advance, and
682     * to control termination. This method is invoked whenever the
683     * barrier is tripped (and thus all other waiting parties are
684     * dormant). If it returns true, then, rather than advance the
685     * phase number, this barrier will be set to a final termination
686 jsr166 1.7 * state, and subsequent calls to {@code isTerminated} will
687 dl 1.1 * return true.
688 jsr166 1.3 *
689 dl 1.1 * <p> The default version returns true when the number of
690     * registered parties is zero. Normally, overrides that arrange
691     * termination for other reasons should also preserve this
692     * property.
693     *
694 dl 1.4 * <p> You may override this method to perform an action with side
695     * effects visible to participating tasks, but it is in general
696     * only sensible to do so in designs where all parties register
697 jsr166 1.7 * before any arrive, and all {@code awaitAdvance} at each phase.
698 dl 1.4 * Otherwise, you cannot ensure lack of interference. In
699     * particular, this method may be invoked more than once per
700     * transition if other parties successfully register while the
701     * invocation of this method is in progress, thus postponing the
702     * transition until those parties also arrive, re-triggering this
703     * method.
704     *
705 dl 1.1 * @param phase the phase number on entering the barrier
706 jsr166 1.9 * @param registeredParties the current number of registered parties
707     * @return {@code true} if this barrier should terminate
708 dl 1.1 */
709     protected boolean onAdvance(int phase, int registeredParties) {
710     return registeredParties <= 0;
711     }
712    
713     /**
714 dl 1.4 * Returns a string identifying this phaser, as well as its
715 dl 1.1 * state. The state, in brackets, includes the String {@code
716 jsr166 1.9 * "phase = "} followed by the phase number, {@code "parties = "}
717 dl 1.1 * followed by the number of registered parties, and {@code
718 jsr166 1.9 * "arrived = "} followed by the number of arrived parties.
719 dl 1.1 *
720     * @return a string identifying this barrier, as well as its state
721     */
722     public String toString() {
723 dl 1.4 long s = getReconciledState();
724 jsr166 1.9 return super.toString() +
725     "[phase = " + phaseOf(s) +
726     " parties = " + partiesOf(s) +
727     " arrived = " + arrivedOf(s) + "]";
728 dl 1.1 }
729    
730 dl 1.4 // methods for waiting
731 dl 1.1
732     /** The number of CPUs, for spin control */
733     static final int NCPUS = Runtime.getRuntime().availableProcessors();
734    
735     /**
736     * The number of times to spin before blocking in timed waits.
737 jsr166 1.2 * The value is empirically derived.
738 dl 1.1 */
739 jsr166 1.3 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
740 dl 1.1
741     /**
742     * The number of times to spin before blocking in untimed waits.
743     * This is greater than timed value because untimed waits spin
744     * faster since they don't need to check times on each spin.
745     */
746     static final int maxUntimedSpins = maxTimedSpins * 32;
747    
748     /**
749     * The number of nanoseconds for which it is faster to spin
750     * rather than to use timed park. A rough estimate suffices.
751     */
752     static final long spinForTimeoutThreshold = 1000L;
753    
754     /**
755 dl 1.4 * Wait nodes for Treiber stack representing wait queue for non-FJ
756     * tasks.
757     */
758     static final class QNode {
759     QNode next;
760     volatile Thread thread; // nulled to cancel wait
761     QNode() {
762     thread = Thread.currentThread();
763     }
764     void signal() {
765     Thread t = thread;
766     if (t != null) {
767     thread = null;
768     LockSupport.unpark(t);
769     }
770     }
771     }
772    
773     /**
774     * Removes and signals waiting threads from wait queue
775     */
776     private void releaseWaiters(int phase) {
777     AtomicReference<QNode> head = queueFor(phase);
778     QNode q;
779     while ((q = head.get()) != null) {
780     if (head.compareAndSet(q, q.next))
781     q.signal();
782     }
783     }
784    
785     /**
786 dl 1.1 * Enqueues node and waits unless aborted or signalled.
787     */
788 dl 1.4 private int untimedWait(int phase) {
789     int spins = maxUntimedSpins;
790 dl 1.1 QNode node = null;
791 dl 1.4 boolean interrupted = false;
792 dl 1.1 boolean queued = false;
793 dl 1.4 int p;
794     while ((p = getPhase()) == phase) {
795     interrupted = Thread.interrupted();
796     if (node != null) {
797     if (!queued) {
798     AtomicReference<QNode> head = queueFor(phase);
799     queued = head.compareAndSet(node.next = head.get(), node);
800 dl 1.1 }
801 dl 1.4 else if (node.thread != null)
802     LockSupport.park(this);
803 dl 1.1 }
804 dl 1.4 else if (spins <= 0)
805     node = new QNode();
806     else
807     --spins;
808     }
809     if (node != null)
810     node.thread = null;
811     if (interrupted)
812     Thread.currentThread().interrupt();
813     releaseWaiters(phase);
814     return p;
815     }
816    
817     /**
818     * Messier interruptible version
819     */
820     private int interruptibleWait(int phase) throws InterruptedException {
821     int spins = maxUntimedSpins;
822     QNode node = null;
823     boolean queued = false;
824     boolean interrupted = false;
825     int p;
826     while ((p = getPhase()) == phase) {
827     if (interrupted = Thread.interrupted())
828     break;
829     if (node != null) {
830     if (!queued) {
831     AtomicReference<QNode> head = queueFor(phase);
832     queued = head.compareAndSet(node.next = head.get(), node);
833 dl 1.1 }
834 dl 1.4 else if (node.thread != null)
835     LockSupport.park(this);
836 dl 1.1 }
837     else if (spins <= 0)
838 dl 1.4 node = new QNode();
839 dl 1.1 else
840     --spins;
841     }
842     if (node != null)
843     node.thread = null;
844 dl 1.4 if (interrupted)
845     throw new InterruptedException();
846     releaseWaiters(phase);
847     return p;
848 dl 1.1 }
849    
850     /**
851 dl 1.4 * Even messier timeout version.
852 dl 1.1 */
853 dl 1.4 private int timedWait(int phase, long nanos)
854 dl 1.1 throws InterruptedException, TimeoutException {
855 dl 1.4 int p;
856     if ((p = getPhase()) == phase) {
857     long lastTime = System.nanoTime();
858     int spins = maxTimedSpins;
859     QNode node = null;
860     boolean queued = false;
861     boolean interrupted = false;
862     while ((p = getPhase()) == phase) {
863     if (interrupted = Thread.interrupted())
864     break;
865     long now = System.nanoTime();
866     if ((nanos -= now - lastTime) <= 0)
867 dl 1.1 break;
868 dl 1.4 lastTime = now;
869     if (node != null) {
870     if (!queued) {
871     AtomicReference<QNode> head = queueFor(phase);
872     queued = head.compareAndSet(node.next = head.get(), node);
873 dl 1.1 }
874 dl 1.4 else if (node.thread != null &&
875     nanos > spinForTimeoutThreshold) {
876     LockSupport.parkNanos(this, nanos);
877 dl 1.1 }
878     }
879 dl 1.4 else if (spins <= 0)
880     node = new QNode();
881 dl 1.1 else
882 dl 1.4 --spins;
883     }
884     if (node != null)
885     node.thread = null;
886     if (interrupted)
887     throw new InterruptedException();
888     if (p == phase && (p = getPhase()) == phase)
889     throw new TimeoutException();
890     }
891     releaseWaiters(phase);
892     return p;
893     }
894    
895     // Temporary Unsafe mechanics for preliminary release
896    
897     static final Unsafe _unsafe;
898     static final long stateOffset;
899    
900     static {
901     try {
902     if (Phaser.class.getClassLoader() != null) {
903     Field f = Unsafe.class.getDeclaredField("theUnsafe");
904     f.setAccessible(true);
905     _unsafe = (Unsafe)f.get(null);
906 dl 1.1 }
907     else
908 dl 1.4 _unsafe = Unsafe.getUnsafe();
909     stateOffset = _unsafe.objectFieldOffset
910     (Phaser.class.getDeclaredField("state"));
911     } catch (Exception e) {
912     throw new RuntimeException("Could not initialize intrinsics", e);
913 dl 1.1 }
914     }
915    
916 dl 1.4 final boolean casState(long cmp, long val) {
917     return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val);
918     }
919 dl 1.1 }