ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Phaser.java
Revision: 1.11
Committed: Wed Jul 7 20:41:24 2010 UTC (13 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.10: +1 -1 lines
Log Message:
Sync with jsr166y changes

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.concurrent.atomic.AtomicReference;
10     import java.util.concurrent.locks.LockSupport;
11    
12     /**
13 jsr166 1.10 * A reusable synchronization barrier, similar in functionality to
14 jsr166 1.1 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
15     * {@link java.util.concurrent.CountDownLatch CountDownLatch}
16     * but supporting more flexible usage.
17     *
18 jsr166 1.10 * <p> <b>Registration.</b> Unlike the case for other barriers, the
19     * number of parties <em>registered</em> to synchronize on a phaser
20     * may vary over time. Tasks may be registered at any time (using
21     * methods {@link #register}, {@link #bulkRegister}, or forms of
22     * constructors establishing initial numbers of parties), and
23     * optionally deregistered upon any arrival (using {@link
24     * #arriveAndDeregister}). As is the case with most basic
25     * synchronization constructs, registration and deregistration affect
26     * only internal counts; they do not establish any further internal
27     * bookkeeping, so tasks cannot query whether they are registered.
28     * (However, you can introduce such bookkeeping by subclassing this
29     * class.)
30     *
31     * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
32     * Phaser} may be repeatedly awaited. Method {@link
33     * #arriveAndAwaitAdvance} has effect analogous to {@link
34     * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
35     * generation of a {@code Phaser} has an associated phase number. The
36     * phase number starts at zero, and advances when all parties arrive
37     * at the barrier, wrapping around to zero after reaching {@code
38     * Integer.MAX_VALUE}. The use of phase numbers enables independent
39     * control of actions upon arrival at a barrier and upon awaiting
40     * others, via two kinds of methods that may be invoked by any
41     * registered party:
42     *
43 jsr166 1.1 * <ul>
44     *
45 jsr166 1.10 * <li> <b>Arrival.</b> Methods {@link #arrive} and
46     * {@link #arriveAndDeregister} record arrival at a
47     * barrier. These methods do not block, but return an associated
48     * <em>arrival phase number</em>; that is, the phase number of
49     * the barrier to which the arrival applied. When the final
50     * party for a given phase arrives, an optional barrier action
51     * is performed and the phase advances. Barrier actions,
52     * performed by the party triggering a phase advance, are
53     * arranged by overriding method {@link #onAdvance(int, int)},
54     * which also controls termination. Overriding this method is
55     * similar to, but more flexible than, providing a barrier
56     * action to a {@code CyclicBarrier}.
57     *
58     * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
59     * argument indicating an arrival phase number, and returns when
60     * the barrier advances to (or is already at) a different phase.
61     * Unlike similar constructions using {@code CyclicBarrier},
62     * method {@code awaitAdvance} continues to wait even if the
63     * waiting thread is interrupted. Interruptible and timeout
64     * versions are also available, but exceptions encountered while
65     * tasks wait interruptibly or with timeout do not change the
66     * state of the barrier. If necessary, you can perform any
67     * associated recovery within handlers of those exceptions,
68     * often after invoking {@code forceTermination}. Phasers may
69     * also be used by tasks executing in a {@link ForkJoinPool},
70     * which will ensure sufficient parallelism to execute tasks
71     * when others are blocked waiting for a phase to advance.
72 jsr166 1.1 *
73     * </ul>
74     *
75 jsr166 1.10 * <p> <b>Termination.</b> A {@code Phaser} may enter a
76     * <em>termination</em> state in which all synchronization methods
77     * immediately return without updating phaser state or waiting for
78     * advance, and indicating (via a negative phase value) that execution
79     * is complete. Termination is triggered when an invocation of {@code
80     * onAdvance} returns {@code true}. As illustrated below, when
81     * phasers control actions with a fixed number of iterations, it is
82 jsr166 1.7 * often convenient to override this method to cause termination when
83     * the current phase number reaches a threshold. Method {@link
84     * #forceTermination} is also available to abruptly release waiting
85     * threads and allow them to terminate.
86 jsr166 1.1 *
87 jsr166 1.10 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged
88     * in tree structures) to reduce contention. Phasers with large
89 jsr166 1.1 * numbers of parties that would otherwise experience heavy
90 jsr166 1.10 * synchronization contention costs may instead be set up so that
91     * groups of sub-phasers share a common parent. This may greatly
92     * increase throughput even though it incurs greater per-operation
93     * overhead.
94     *
95     * <p><b>Monitoring.</b> While synchronization methods may be invoked
96     * only by registered parties, the current state of a phaser may be
97     * monitored by any caller. At any given moment there are {@link
98     * #getRegisteredParties} parties in total, of which {@link
99     * #getArrivedParties} have arrived at the current phase ({@link
100     * #getPhase}). When the remaining ({@link #getUnarrivedParties})
101     * parties arrive, the phase advances. The values returned by these
102     * methods may reflect transient states and so are not in general
103     * useful for synchronization control. Method {@link #toString}
104     * returns snapshots of these state queries in a form convenient for
105     * informal monitoring.
106 jsr166 1.1 *
107     * <p><b>Sample usages:</b>
108     *
109 jsr166 1.4 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
110     * to control a one-shot action serving a variable number of
111     * parties. The typical idiom is for the method setting this up to
112     * first register, then start the actions, then deregister, as in:
113 jsr166 1.1 *
114     * <pre> {@code
115 jsr166 1.8 * void runTasks(List<Runnable> tasks) {
116 jsr166 1.1 * final Phaser phaser = new Phaser(1); // "1" to register self
117 jsr166 1.7 * // create and start threads
118 jsr166 1.8 * for (Runnable task : tasks) {
119 jsr166 1.1 * phaser.register();
120     * new Thread() {
121     * public void run() {
122     * phaser.arriveAndAwaitAdvance(); // await all creation
123 jsr166 1.8 * task.run();
124 jsr166 1.1 * }
125     * }.start();
126     * }
127     *
128 jsr166 1.7 * // allow threads to start and deregister self
129     * phaser.arriveAndDeregister();
130 jsr166 1.1 * }}</pre>
131     *
132     * <p>One way to cause a set of threads to repeatedly perform actions
133     * for a given number of iterations is to override {@code onAdvance}:
134     *
135     * <pre> {@code
136 jsr166 1.8 * void startTasks(List<Runnable> tasks, final int iterations) {
137 jsr166 1.1 * final Phaser phaser = new Phaser() {
138 jsr166 1.10 * protected boolean onAdvance(int phase, int registeredParties) {
139 jsr166 1.1 * return phase >= iterations || registeredParties == 0;
140     * }
141     * };
142     * phaser.register();
143 jsr166 1.10 * for (final Runnable task : tasks) {
144 jsr166 1.1 * phaser.register();
145     * new Thread() {
146     * public void run() {
147     * do {
148 jsr166 1.8 * task.run();
149 jsr166 1.1 * phaser.arriveAndAwaitAdvance();
150 jsr166 1.10 * } while (!phaser.isTerminated());
151 jsr166 1.1 * }
152     * }.start();
153     * }
154     * phaser.arriveAndDeregister(); // deregister self, don't wait
155     * }}</pre>
156     *
157 jsr166 1.10 * If the main task must later await termination, it
158     * may re-register and then execute a similar loop:
159     * <pre> {@code
160     * // ...
161     * phaser.register();
162     * while (!phaser.isTerminated())
163     * phaser.arriveAndAwaitAdvance();}</pre>
164     *
165     * <p>Related constructions may be used to await particular phase numbers
166     * in contexts where you are sure that the phase will never wrap around
167     * {@code Integer.MAX_VALUE}. For example:
168     *
169     * <pre> {@code
170     * void awaitPhase(Phaser phaser, int phase) {
171     * int p = phaser.register(); // assumes caller not already registered
172     * while (p < phase) {
173     * if (phaser.isTerminated())
174     * // ... deal with unexpected termination
175     * else
176     * p = phaser.arriveAndAwaitAdvance();
177     * }
178     * phaser.arriveAndDeregister();
179     * }}</pre>
180     *
181     *
182 jsr166 1.5 * <p>To create a set of tasks using a tree of phasers,
183 jsr166 1.1 * you could use code of the following form, assuming a
184 jsr166 1.4 * Task class with a constructor accepting a phaser that
185 jsr166 1.1 * it registers for upon construction:
186 jsr166 1.10 *
187 jsr166 1.1 * <pre> {@code
188 jsr166 1.10 * void build(Task[] actions, int lo, int hi, Phaser ph) {
189     * if (hi - lo > TASKS_PER_PHASER) {
190     * for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
191     * int j = Math.min(i + TASKS_PER_PHASER, hi);
192     * build(actions, i, j, new Phaser(ph));
193 jsr166 1.1 * }
194     * } else {
195     * for (int i = lo; i < hi; ++i)
196 jsr166 1.10 * actions[i] = new Task(ph);
197     * // assumes new Task(ph) performs ph.register()
198 jsr166 1.1 * }
199     * }
200     * // .. initially called, for n tasks via
201     * build(new Task[n], 0, n, new Phaser());}</pre>
202     *
203     * The best value of {@code TASKS_PER_PHASER} depends mainly on
204     * expected barrier synchronization rates. A value as low as four may
205     * be appropriate for extremely small per-barrier task bodies (thus
206     * high rates), or up to hundreds for extremely large ones.
207     *
208     * </pre>
209     *
210     * <p><b>Implementation notes</b>: This implementation restricts the
211     * maximum number of parties to 65535. Attempts to register additional
212 jsr166 1.8 * parties result in {@code IllegalStateException}. However, you can and
213 jsr166 1.1 * should create tiered phasers to accommodate arbitrarily large sets
214     * of participants.
215     *
216     * @since 1.7
217     * @author Doug Lea
218     */
219     public class Phaser {
220     /*
221     * This class implements an extension of X10 "clocks". Thanks to
222     * Vijay Saraswat for the idea, and to Vivek Sarkar for
223     * enhancements to extend functionality.
224     */
225    
226     /**
227     * Barrier state representation. Conceptually, a barrier contains
228     * four values:
229     *
230     * * parties -- the number of parties to wait (16 bits)
231     * * unarrived -- the number of parties yet to hit barrier (16 bits)
232     * * phase -- the generation of the barrier (31 bits)
233     * * terminated -- set if barrier is terminated (1 bit)
234     *
235     * However, to efficiently maintain atomicity, these values are
236     * packed into a single (atomic) long. Termination uses the sign
237     * bit of 32 bit representation of phase, so phase is set to -1 on
238     * termination. Good performance relies on keeping state decoding
239     * and encoding simple, and keeping race windows short.
240     *
241     * Note: there are some cheats in arrive() that rely on unarrived
242     * count being lowest 16 bits.
243     */
244     private volatile long state;
245    
246     private static final int ushortMask = 0xffff;
247     private static final int phaseMask = 0x7fffffff;
248    
249     private static int unarrivedOf(long s) {
250     return (int) (s & ushortMask);
251     }
252    
253     private static int partiesOf(long s) {
254     return ((int) s) >>> 16;
255     }
256    
257     private static int phaseOf(long s) {
258     return (int) (s >>> 32);
259     }
260    
261     private static int arrivedOf(long s) {
262     return partiesOf(s) - unarrivedOf(s);
263     }
264    
265     private static long stateFor(int phase, int parties, int unarrived) {
266     return ((((long) phase) << 32) | (((long) parties) << 16) |
267     (long) unarrived);
268     }
269    
270     private static long trippedStateFor(int phase, int parties) {
271     long lp = (long) parties;
272     return (((long) phase) << 32) | (lp << 16) | lp;
273     }
274    
275     /**
276     * Returns message string for bad bounds exceptions.
277     */
278     private static String badBounds(int parties, int unarrived) {
279     return ("Attempt to set " + unarrived +
280     " unarrived of " + parties + " parties");
281     }
282    
283     /**
284     * The parent of this phaser, or null if none
285     */
286     private final Phaser parent;
287    
288     /**
289 jsr166 1.4 * The root of phaser tree. Equals this if not in a tree. Used to
290 jsr166 1.1 * support faster state push-down.
291     */
292     private final Phaser root;
293    
294     // Wait queues
295    
296     /**
297     * Heads of Treiber stacks for waiting threads. To eliminate
298     * contention while releasing some threads while adding others, we
299     * use two of them, alternating across even and odd phases.
300     */
301     private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
302     private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
303    
304     private AtomicReference<QNode> queueFor(int phase) {
305     return ((phase & 1) == 0) ? evenQ : oddQ;
306     }
307    
308     /**
309     * Returns current state, first resolving lagged propagation from
310     * root if necessary.
311     */
312     private long getReconciledState() {
313     return (parent == null) ? state : reconcileState();
314     }
315    
316     /**
317     * Recursively resolves state.
318     */
319     private long reconcileState() {
320     Phaser p = parent;
321     long s = state;
322     if (p != null) {
323     while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
324     long parentState = p.getReconciledState();
325     int parentPhase = phaseOf(parentState);
326     int phase = phaseOf(s = state);
327     if (phase != parentPhase) {
328     long next = trippedStateFor(parentPhase, partiesOf(s));
329     if (casState(s, next)) {
330     releaseWaiters(phase);
331     s = next;
332     }
333     }
334     }
335     }
336     return s;
337     }
338    
339     /**
340 jsr166 1.4 * Creates a new phaser without any initially registered parties,
341 jsr166 1.1 * initial phase number 0, and no parent. Any thread using this
342 jsr166 1.4 * phaser will need to first register for it.
343 jsr166 1.1 */
344     public Phaser() {
345     this(null);
346     }
347    
348     /**
349 jsr166 1.4 * Creates a new phaser with the given numbers of registered
350 jsr166 1.1 * unarrived parties, initial phase number 0, and no parent.
351     *
352     * @param parties the number of parties required to trip barrier
353     * @throws IllegalArgumentException if parties less than zero
354     * or greater than the maximum number of parties supported
355     */
356     public Phaser(int parties) {
357     this(null, parties);
358     }
359    
360     /**
361 jsr166 1.4 * Creates a new phaser with the given parent, without any
362 jsr166 1.1 * initially registered parties. If parent is non-null this phaser
363     * is registered with the parent and its initial phase number is
364     * the same as that of parent phaser.
365     *
366     * @param parent the parent phaser
367     */
368     public Phaser(Phaser parent) {
369     int phase = 0;
370     this.parent = parent;
371     if (parent != null) {
372     this.root = parent.root;
373     phase = parent.register();
374     }
375     else
376     this.root = this;
377     this.state = trippedStateFor(phase, 0);
378     }
379    
380     /**
381 jsr166 1.4 * Creates a new phaser with the given parent and numbers of
382 jsr166 1.1 * registered unarrived parties. If parent is non-null, this phaser
383     * is registered with the parent and its initial phase number is
384     * the same as that of parent phaser.
385     *
386     * @param parent the parent phaser
387     * @param parties the number of parties required to trip barrier
388     * @throws IllegalArgumentException if parties less than zero
389     * or greater than the maximum number of parties supported
390     */
391     public Phaser(Phaser parent, int parties) {
392     if (parties < 0 || parties > ushortMask)
393     throw new IllegalArgumentException("Illegal number of parties");
394     int phase = 0;
395     this.parent = parent;
396     if (parent != null) {
397     this.root = parent.root;
398     phase = parent.register();
399     }
400     else
401     this.root = this;
402     this.state = trippedStateFor(phase, parties);
403     }
404    
405     /**
406     * Adds a new unarrived party to this phaser.
407     *
408 jsr166 1.10 * @return the arrival phase number to which this registration applied
409 jsr166 1.1 * @throws IllegalStateException if attempting to register more
410     * than the maximum supported number of parties
411     */
412     public int register() {
413     return doRegister(1);
414     }
415    
416     /**
417     * Adds the given number of new unarrived parties to this phaser.
418     *
419     * @param parties the number of parties required to trip barrier
420 jsr166 1.10 * @return the arrival phase number to which this registration applied
421 jsr166 1.1 * @throws IllegalStateException if attempting to register more
422     * than the maximum supported number of parties
423     */
424     public int bulkRegister(int parties) {
425     if (parties < 0)
426     throw new IllegalArgumentException();
427     if (parties == 0)
428     return getPhase();
429     return doRegister(parties);
430     }
431    
432     /**
433     * Shared code for register, bulkRegister
434     */
435     private int doRegister(int registrations) {
436     int phase;
437     for (;;) {
438     long s = getReconciledState();
439     phase = phaseOf(s);
440     int unarrived = unarrivedOf(s) + registrations;
441     int parties = partiesOf(s) + registrations;
442     if (phase < 0)
443     break;
444     if (parties > ushortMask || unarrived > ushortMask)
445     throw new IllegalStateException(badBounds(parties, unarrived));
446     if (phase == phaseOf(root.state) &&
447     casState(s, stateFor(phase, parties, unarrived)))
448     break;
449     }
450     return phase;
451     }
452    
453     /**
454     * Arrives at the barrier, but does not wait for others. (You can
455 jsr166 1.10 * in turn wait for others via {@link #awaitAdvance}). It is an
456     * unenforced usage error for an unregistered party to invoke this
457     * method.
458 jsr166 1.1 *
459 jsr166 1.10 * @return the arrival phase number, or a negative value if terminated
460 jsr166 1.1 * @throws IllegalStateException if not terminated and the number
461     * of unarrived parties would become negative
462     */
463     public int arrive() {
464     int phase;
465     for (;;) {
466     long s = state;
467     phase = phaseOf(s);
468     if (phase < 0)
469     break;
470     int parties = partiesOf(s);
471     int unarrived = unarrivedOf(s) - 1;
472     if (unarrived > 0) { // Not the last arrival
473     if (casState(s, s - 1)) // s-1 adds one arrival
474     break;
475     }
476     else if (unarrived == 0) { // the last arrival
477     Phaser par = parent;
478     if (par == null) { // directly trip
479     if (casState
480     (s,
481     trippedStateFor(onAdvance(phase, parties) ? -1 :
482     ((phase + 1) & phaseMask), parties))) {
483     releaseWaiters(phase);
484     break;
485     }
486     }
487     else { // cascade to parent
488     if (casState(s, s - 1)) { // zeroes unarrived
489     par.arrive();
490     reconcileState();
491     break;
492     }
493     }
494     }
495     else if (phase != phaseOf(root.state)) // or if unreconciled
496     reconcileState();
497     else
498     throw new IllegalStateException(badBounds(parties, unarrived));
499     }
500     return phase;
501     }
502    
503     /**
504 jsr166 1.7 * Arrives at the barrier and deregisters from it without waiting
505     * for others. Deregistration reduces the number of parties
506 jsr166 1.1 * required to trip the barrier in future phases. If this phaser
507     * has a parent, and deregistration causes this phaser to have
508 jsr166 1.7 * zero parties, this phaser also arrives at and is deregistered
509 jsr166 1.10 * from its parent. It is an unenforced usage error for an
510     * unregistered party to invoke this method.
511 jsr166 1.1 *
512 jsr166 1.10 * @return the arrival phase number, or a negative value if terminated
513 jsr166 1.1 * @throws IllegalStateException if not terminated and the number
514     * of registered or unarrived parties would become negative
515     */
516     public int arriveAndDeregister() {
517     // similar code to arrive, but too different to merge
518     Phaser par = parent;
519     int phase;
520     for (;;) {
521     long s = state;
522     phase = phaseOf(s);
523     if (phase < 0)
524     break;
525     int parties = partiesOf(s) - 1;
526     int unarrived = unarrivedOf(s) - 1;
527     if (parties >= 0) {
528     if (unarrived > 0 || (unarrived == 0 && par != null)) {
529     if (casState
530     (s,
531     stateFor(phase, parties, unarrived))) {
532     if (unarrived == 0) {
533     par.arriveAndDeregister();
534     reconcileState();
535     }
536     break;
537     }
538     continue;
539     }
540     if (unarrived == 0) {
541     if (casState
542     (s,
543     trippedStateFor(onAdvance(phase, parties) ? -1 :
544     ((phase + 1) & phaseMask), parties))) {
545     releaseWaiters(phase);
546     break;
547     }
548     continue;
549     }
550     if (par != null && phase != phaseOf(root.state)) {
551     reconcileState();
552     continue;
553     }
554     }
555     throw new IllegalStateException(badBounds(parties, unarrived));
556     }
557     return phase;
558     }
559    
560     /**
561     * Arrives at the barrier and awaits others. Equivalent in effect
562 jsr166 1.7 * to {@code awaitAdvance(arrive())}. If you need to await with
563     * interruption or timeout, you can arrange this with an analogous
564     * construction using one of the other forms of the awaitAdvance
565     * method. If instead you need to deregister upon arrival use
566 jsr166 1.10 * {@code arriveAndDeregister}. It is an unenforced usage error
567     * for an unregistered party to invoke this method.
568 jsr166 1.1 *
569 jsr166 1.10 * @return the arrival phase number, or a negative number if terminated
570 jsr166 1.1 * @throws IllegalStateException if not terminated and the number
571     * of unarrived parties would become negative
572     */
573     public int arriveAndAwaitAdvance() {
574     return awaitAdvance(arrive());
575     }
576    
577     /**
578 jsr166 1.7 * Awaits the phase of the barrier to advance from the given phase
579 jsr166 1.8 * value, returning immediately if the current phase of the
580     * barrier is not equal to the given phase value or this barrier
581 jsr166 1.10 * is terminated. It is an unenforced usage error for an
582     * unregistered party to invoke this method.
583 jsr166 1.1 *
584 jsr166 1.10 * @param phase an arrival phase number, or negative value if
585     * terminated; this argument is normally the value returned by a
586     * previous call to {@code arrive} or its variants
587     * @return the next arrival phase number, or a negative value
588     * if terminated or argument is negative
589 jsr166 1.1 */
590     public int awaitAdvance(int phase) {
591     if (phase < 0)
592     return phase;
593     long s = getReconciledState();
594     int p = phaseOf(s);
595     if (p != phase)
596     return p;
597     if (unarrivedOf(s) == 0 && parent != null)
598     parent.awaitAdvance(phase);
599     // Fall here even if parent waited, to reconcile and help release
600     return untimedWait(phase);
601     }
602    
603     /**
604 jsr166 1.8 * Awaits the phase of the barrier to advance from the given phase
605 jsr166 1.10 * value, throwing {@code InterruptedException} if interrupted
606     * while waiting, or returning immediately if the current phase of
607     * the barrier is not equal to the given phase value or this
608     * barrier is terminated. It is an unenforced usage error for an
609     * unregistered party to invoke this method.
610     *
611     * @param phase an arrival phase number, or negative value if
612     * terminated; this argument is normally the value returned by a
613     * previous call to {@code arrive} or its variants
614     * @return the next arrival phase number, or a negative value
615     * if terminated or argument is negative
616 jsr166 1.1 * @throws InterruptedException if thread interrupted while waiting
617     */
618     public int awaitAdvanceInterruptibly(int phase)
619     throws InterruptedException {
620     if (phase < 0)
621     return phase;
622     long s = getReconciledState();
623     int p = phaseOf(s);
624     if (p != phase)
625     return p;
626     if (unarrivedOf(s) == 0 && parent != null)
627     parent.awaitAdvanceInterruptibly(phase);
628     return interruptibleWait(phase);
629     }
630    
631     /**
632 jsr166 1.8 * Awaits the phase of the barrier to advance from the given phase
633 jsr166 1.10 * value or the given timeout to elapse, throwing {@code
634     * InterruptedException} if interrupted while waiting, or
635     * returning immediately if the current phase of the barrier is
636     * not equal to the given phase value or this barrier is
637     * terminated. It is an unenforced usage error for an
638     * unregistered party to invoke this method.
639     *
640     * @param phase an arrival phase number, or negative value if
641     * terminated; this argument is normally the value returned by a
642     * previous call to {@code arrive} or its variants
643 jsr166 1.8 * @param timeout how long to wait before giving up, in units of
644     * {@code unit}
645     * @param unit a {@code TimeUnit} determining how to interpret the
646     * {@code timeout} parameter
647 jsr166 1.10 * @return the next arrival phase number, or a negative value
648     * if terminated or argument is negative
649 jsr166 1.1 * @throws InterruptedException if thread interrupted while waiting
650     * @throws TimeoutException if timed out while waiting
651     */
652     public int awaitAdvanceInterruptibly(int phase,
653     long timeout, TimeUnit unit)
654     throws InterruptedException, TimeoutException {
655     if (phase < 0)
656     return phase;
657     long s = getReconciledState();
658     int p = phaseOf(s);
659     if (p != phase)
660     return p;
661     if (unarrivedOf(s) == 0 && parent != null)
662     parent.awaitAdvanceInterruptibly(phase, timeout, unit);
663     return timedWait(phase, unit.toNanos(timeout));
664     }
665    
666     /**
667     * Forces this barrier to enter termination state. Counts of
668     * arrived and registered parties are unaffected. If this phaser
669     * has a parent, it too is terminated. This method may be useful
670     * for coordinating recovery after one or more tasks encounter
671     * unexpected exceptions.
672     */
673     public void forceTermination() {
674     for (;;) {
675     long s = getReconciledState();
676     int phase = phaseOf(s);
677     int parties = partiesOf(s);
678     int unarrived = unarrivedOf(s);
679     if (phase < 0 ||
680     casState(s, stateFor(-1, parties, unarrived))) {
681     releaseWaiters(0);
682     releaseWaiters(1);
683     if (parent != null)
684     parent.forceTermination();
685     return;
686     }
687     }
688     }
689    
690     /**
691     * Returns the current phase number. The maximum phase number is
692     * {@code Integer.MAX_VALUE}, after which it restarts at
693     * zero. Upon termination, the phase number is negative.
694     *
695     * @return the phase number, or a negative value if terminated
696     */
697     public final int getPhase() {
698     return phaseOf(getReconciledState());
699     }
700    
701     /**
702     * Returns the number of parties registered at this barrier.
703     *
704     * @return the number of parties
705     */
706     public int getRegisteredParties() {
707     return partiesOf(state);
708     }
709    
710     /**
711 jsr166 1.10 * Returns the number of registered parties that have arrived at
712     * the current phase of this barrier.
713 jsr166 1.1 *
714     * @return the number of arrived parties
715     */
716     public int getArrivedParties() {
717     return arrivedOf(state);
718     }
719    
720     /**
721     * Returns the number of registered parties that have not yet
722     * arrived at the current phase of this barrier.
723     *
724     * @return the number of unarrived parties
725     */
726     public int getUnarrivedParties() {
727     return unarrivedOf(state);
728     }
729    
730     /**
731 jsr166 1.4 * Returns the parent of this phaser, or {@code null} if none.
732 jsr166 1.1 *
733 jsr166 1.4 * @return the parent of this phaser, or {@code null} if none
734 jsr166 1.1 */
735     public Phaser getParent() {
736     return parent;
737     }
738    
739     /**
740     * Returns the root ancestor of this phaser, which is the same as
741     * this phaser if it has no parent.
742     *
743     * @return the root ancestor of this phaser
744     */
745     public Phaser getRoot() {
746     return root;
747     }
748    
749     /**
750     * Returns {@code true} if this barrier has been terminated.
751     *
752     * @return {@code true} if this barrier has been terminated
753     */
754     public boolean isTerminated() {
755     return getPhase() < 0;
756     }
757    
758     /**
759 jsr166 1.10 * Overridable method to perform an action upon impending phase
760     * advance, and to control termination. This method is invoked
761     * upon arrival of the party tripping the barrier (when all other
762     * waiting parties are dormant). If this method returns {@code
763     * true}, then, rather than advance the phase number, this barrier
764     * will be set to a final termination state, and subsequent calls
765     * to {@link #isTerminated} will return true. Any (unchecked)
766     * Exception or Error thrown by an invocation of this method is
767     * propagated to the party attempting to trip the barrier, in
768     * which case no advance occurs.
769     *
770     * <p>The arguments to this method provide the state of the phaser
771     * prevailing for the current transition. (When called from within
772     * an implementation of {@code onAdvance} the values returned by
773     * methods such as {@code getPhase} may or may not reliably
774     * indicate the state to which this transition applies.)
775 jsr166 1.1 *
776 jsr166 1.5 * <p>The default version returns {@code true} when the number of
777 jsr166 1.1 * registered parties is zero. Normally, overrides that arrange
778     * termination for other reasons should also preserve this
779     * property.
780     *
781 jsr166 1.5 * <p>You may override this method to perform an action with side
782 jsr166 1.10 * effects visible to participating tasks, but it is only sensible
783     * to do so in designs where all parties register before any
784     * arrive, and all {@link #awaitAdvance} at each phase.
785 jsr166 1.7 * Otherwise, you cannot ensure lack of interference from other
786 jsr166 1.10 * parties during the invocation of this method. Additionally,
787     * method {@code onAdvance} may be invoked more than once per
788     * transition if registrations are intermixed with arrivals.
789 jsr166 1.1 *
790     * @param phase the phase number on entering the barrier
791     * @param registeredParties the current number of registered parties
792     * @return {@code true} if this barrier should terminate
793     */
794     protected boolean onAdvance(int phase, int registeredParties) {
795     return registeredParties <= 0;
796     }
797    
798     /**
799     * Returns a string identifying this phaser, as well as its
800     * state. The state, in brackets, includes the String {@code
801     * "phase = "} followed by the phase number, {@code "parties = "}
802     * followed by the number of registered parties, and {@code
803     * "arrived = "} followed by the number of arrived parties.
804     *
805     * @return a string identifying this barrier, as well as its state
806     */
807     public String toString() {
808     long s = getReconciledState();
809     return super.toString() +
810     "[phase = " + phaseOf(s) +
811     " parties = " + partiesOf(s) +
812     " arrived = " + arrivedOf(s) + "]";
813     }
814    
815     // methods for waiting
816    
817     /**
818     * Wait nodes for Treiber stack representing wait queue
819     */
820     static final class QNode implements ForkJoinPool.ManagedBlocker {
821     final Phaser phaser;
822     final int phase;
823     final long startTime;
824     final long nanos;
825     final boolean timed;
826     final boolean interruptible;
827     volatile boolean wasInterrupted = false;
828     volatile Thread thread; // nulled to cancel wait
829     QNode next;
830     QNode(Phaser phaser, int phase, boolean interruptible,
831     boolean timed, long startTime, long nanos) {
832     this.phaser = phaser;
833     this.phase = phase;
834     this.timed = timed;
835     this.interruptible = interruptible;
836     this.startTime = startTime;
837     this.nanos = nanos;
838     thread = Thread.currentThread();
839     }
840     public boolean isReleasable() {
841     return (thread == null ||
842     phaser.getPhase() != phase ||
843     (interruptible && wasInterrupted) ||
844     (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
845     }
846     public boolean block() {
847     if (Thread.interrupted()) {
848     wasInterrupted = true;
849     if (interruptible)
850     return true;
851     }
852     if (!timed)
853     LockSupport.park(this);
854     else {
855     long waitTime = nanos - (System.nanoTime() - startTime);
856     if (waitTime <= 0)
857     return true;
858     LockSupport.parkNanos(this, waitTime);
859     }
860     return isReleasable();
861     }
862     void signal() {
863     Thread t = thread;
864     if (t != null) {
865     thread = null;
866     LockSupport.unpark(t);
867     }
868     }
869     boolean doWait() {
870     if (thread != null) {
871     try {
872 dl 1.11 ForkJoinPool.managedBlock(this);
873 jsr166 1.1 } catch (InterruptedException ie) {
874     }
875     }
876     return wasInterrupted;
877     }
878    
879     }
880    
881     /**
882     * Removes and signals waiting threads from wait queue.
883     */
884     private void releaseWaiters(int phase) {
885     AtomicReference<QNode> head = queueFor(phase);
886     QNode q;
887     while ((q = head.get()) != null) {
888     if (head.compareAndSet(q, q.next))
889     q.signal();
890     }
891     }
892    
893     /**
894     * Tries to enqueue given node in the appropriate wait queue.
895     *
896     * @return true if successful
897     */
898     private boolean tryEnqueue(QNode node) {
899     AtomicReference<QNode> head = queueFor(node.phase);
900     return head.compareAndSet(node.next = head.get(), node);
901     }
902    
903     /**
904     * Enqueues node and waits unless aborted or signalled.
905     *
906     * @return current phase
907     */
908     private int untimedWait(int phase) {
909     QNode node = null;
910     boolean queued = false;
911     boolean interrupted = false;
912     int p;
913     while ((p = getPhase()) == phase) {
914     if (Thread.interrupted())
915     interrupted = true;
916     else if (node == null)
917     node = new QNode(this, phase, false, false, 0, 0);
918     else if (!queued)
919     queued = tryEnqueue(node);
920     else
921     interrupted = node.doWait();
922     }
923     if (node != null)
924     node.thread = null;
925     releaseWaiters(phase);
926     if (interrupted)
927     Thread.currentThread().interrupt();
928     return p;
929     }
930    
931     /**
932     * Interruptible version
933     * @return current phase
934     */
935     private int interruptibleWait(int phase) throws InterruptedException {
936     QNode node = null;
937     boolean queued = false;
938     boolean interrupted = false;
939     int p;
940     while ((p = getPhase()) == phase && !interrupted) {
941     if (Thread.interrupted())
942     interrupted = true;
943     else if (node == null)
944     node = new QNode(this, phase, true, false, 0, 0);
945     else if (!queued)
946     queued = tryEnqueue(node);
947     else
948     interrupted = node.doWait();
949     }
950     if (node != null)
951     node.thread = null;
952     if (p != phase || (p = getPhase()) != phase)
953     releaseWaiters(phase);
954     if (interrupted)
955     throw new InterruptedException();
956     return p;
957     }
958    
959     /**
960     * Timeout version.
961     * @return current phase
962     */
963     private int timedWait(int phase, long nanos)
964     throws InterruptedException, TimeoutException {
965     long startTime = System.nanoTime();
966     QNode node = null;
967     boolean queued = false;
968     boolean interrupted = false;
969     int p;
970     while ((p = getPhase()) == phase && !interrupted) {
971     if (Thread.interrupted())
972     interrupted = true;
973     else if (nanos - (System.nanoTime() - startTime) <= 0)
974     break;
975     else if (node == null)
976     node = new QNode(this, phase, true, true, startTime, nanos);
977     else if (!queued)
978     queued = tryEnqueue(node);
979     else
980     interrupted = node.doWait();
981     }
982     if (node != null)
983     node.thread = null;
984     if (p != phase || (p = getPhase()) != phase)
985     releaseWaiters(phase);
986     if (interrupted)
987     throw new InterruptedException();
988     if (p == phase)
989     throw new TimeoutException();
990     return p;
991     }
992    
993     // Unsafe mechanics
994    
995     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
996 jsr166 1.2 private static final long stateOffset =
997 jsr166 1.3 objectFieldOffset("state", Phaser.class);
998 jsr166 1.1
999 jsr166 1.2 private final boolean casState(long cmp, long val) {
1000 jsr166 1.1 return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
1001     }
1002 jsr166 1.3
1003     private static long objectFieldOffset(String field, Class<?> klazz) {
1004     try {
1005     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1006     } catch (NoSuchFieldException e) {
1007     // Convert Exception to corresponding Error
1008     NoSuchFieldError error = new NoSuchFieldError(field);
1009     error.initCause(e);
1010     throw error;
1011     }
1012     }
1013 jsr166 1.1 }