ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.37
Committed: Mon Aug 24 00:48:52 2009 UTC (14 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.36: +4 -4 lines
Log Message:
cosmetic changes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.9
9 dl 1.1 import java.util.concurrent.*;
10 jsr166 1.20
11     import java.util.concurrent.atomic.AtomicReference;
12 dl 1.1 import java.util.concurrent.locks.LockSupport;
13    
14     /**
15 jsr166 1.37 * A reusable synchronization barrier, similar in functionality to
16 jsr166 1.9 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
17     * {@link java.util.concurrent.CountDownLatch CountDownLatch}
18     * but supporting more flexible usage.
19 dl 1.1 *
20     * <ul>
21     *
22 dl 1.35 * <li> The number of parties <em>registered</em> to synchronize on a
23     * phaser may vary over time. Tasks may be registered at any time
24     * (using methods {@link #register}, {@link #bulkRegister}, or forms
25     * of constructors establishing initial numbers of parties), and may
26     * optionally be deregistered upon any arrival (using {@link
27     * #arriveAndDeregister}). As is the case with most basic
28     * synchronization constructs, registration and deregistration affect
29     * only internal counts; they do not establish any further internal
30 jsr166 1.37 * bookkeeping, so tasks cannot query whether they are registered.
31     * (However, you can introduce such bookkeeping by subclassing this
32     * class.)
33 dl 1.1 *
34 dl 1.36 * <li> Each generation has an associated phase number. The phase
35     * number starts at zero, amd advances when all parties arrive at the
36     * barrier, wrapping around to zero after reaching {@code
37     * Integer.MAX_VALUE}.
38 jsr166 1.3 *
39 jsr166 1.26 * <li> Like a {@code CyclicBarrier}, a phaser may be repeatedly
40 jsr166 1.24 * awaited. Method {@link #arriveAndAwaitAdvance} has effect
41     * analogous to {@link java.util.concurrent.CyclicBarrier#await
42     * CyclicBarrier.await}. However, phasers separate two aspects of
43 dl 1.27 * coordination, which may also be invoked independently:
44 dl 1.1 *
45     * <ul>
46     *
47 jsr166 1.24 * <li> Arriving at a barrier. Methods {@link #arrive} and
48     * {@link #arriveAndDeregister} do not block, but return
49 dl 1.35 * an associated <em>arrival phase number</em>;
50     * that is, the phase number of the barrier to which the
51     * arrival applied.
52 dl 1.1 *
53 jsr166 1.24 * <li> Awaiting others. Method {@link #awaitAdvance} requires an
54 dl 1.35 * argument indicating an arrival phase number, and returns
55     * when the barrier advances to a new phase.
56 dl 1.1 * </ul>
57     *
58     * <li> Barrier actions, performed by the task triggering a phase
59 dl 1.27 * advance, are arranged by overriding method {@link #onAdvance(int,
60     * int)}, which also controls termination. Overriding this method is
61     * similar to, but more flexible than, providing a barrier action to a
62     * {@code CyclicBarrier}.
63 dl 1.1 *
64     * <li> Phasers may enter a <em>termination</em> state in which all
65 dl 1.10 * actions immediately return without updating phaser state or waiting
66     * for advance, and indicating (via a negative phase value) that
67 dl 1.27 * execution is complete. Termination is triggered when an invocation
68     * of {@code onAdvance} returns {@code true}. When a phaser is
69     * controlling an action with a fixed number of iterations, it is
70     * often convenient to override this method to cause termination when
71     * the current phase number reaches a threshold. Method {@link
72     * #forceTermination} is also available to abruptly release waiting
73     * threads and allow them to terminate.
74 dl 1.4 *
75     * <li> Phasers may be tiered to reduce contention. Phasers with large
76     * numbers of parties that would otherwise experience heavy
77     * synchronization contention costs may instead be arranged in trees.
78     * This will typically greatly increase throughput even though it
79     * incurs somewhat greater per-operation overhead.
80 jsr166 1.3 *
81 jsr166 1.7 * <li> By default, {@code awaitAdvance} continues to wait even if
82 dl 1.4 * the waiting thread is interrupted. And unlike the case in
83 jsr166 1.24 * {@code CyclicBarrier}, exceptions encountered while tasks wait
84 dl 1.1 * interruptibly or with timeout do not change the state of the
85     * barrier. If necessary, you can perform any associated recovery
86 dl 1.4 * within handlers of those exceptions, often after invoking
87 jsr166 1.7 * {@code forceTermination}.
88 dl 1.1 *
89 dl 1.27 * <li>Phasers may be used to coordinate tasks executing in a {@link
90     * ForkJoinPool}, which will ensure sufficient parallelism to execute
91     * tasks when others are blocked waiting for a phase to advance.
92 dl 1.10 *
93 dl 1.36 * <li>The current state of a phaser may be monitored. At any given
94     * moment there are {@link #getRegisteredParties}, where {@link
95     * #getArrivedParties} have arrived at the current phase ({@link
96     * #getPhase}). When the remaining {@link #getUnarrivedParties})
97     * arrive, the phase advances. Method {@link #toString} returns
98     * snapshots of these state queries in a form convenient for
99     * informal monitoring.
100     *
101 dl 1.1 * </ul>
102     *
103 dl 1.4 * <p><b>Sample usages:</b>
104     *
105 jsr166 1.24 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
106     * to control a one-shot action serving a variable number of
107     * parties. The typical idiom is for the method setting this up to
108     * first register, then start the actions, then deregister, as in:
109 dl 1.1 *
110 jsr166 1.13 * <pre> {@code
111 jsr166 1.33 * void runTasks(List<Runnable> tasks) {
112 jsr166 1.13 * final Phaser phaser = new Phaser(1); // "1" to register self
113 dl 1.27 * // create and start threads
114 jsr166 1.33 * for (Runnable task : tasks) {
115 jsr166 1.13 * phaser.register();
116     * new Thread() {
117     * public void run() {
118     * phaser.arriveAndAwaitAdvance(); // await all creation
119 jsr166 1.33 * task.run();
120 jsr166 1.13 * }
121     * }.start();
122 dl 1.4 * }
123 dl 1.6 *
124 dl 1.27 * // allow threads to start and deregister self
125     * phaser.arriveAndDeregister();
126 jsr166 1.13 * }}</pre>
127 dl 1.1 *
128 dl 1.4 * <p>One way to cause a set of threads to repeatedly perform actions
129 jsr166 1.7 * for a given number of iterations is to override {@code onAdvance}:
130 dl 1.1 *
131 jsr166 1.13 * <pre> {@code
132 jsr166 1.33 * void startTasks(List<Runnable> tasks, final int iterations) {
133 jsr166 1.13 * final Phaser phaser = new Phaser() {
134     * public boolean onAdvance(int phase, int registeredParties) {
135     * return phase >= iterations || registeredParties == 0;
136     * }
137     * };
138     * phaser.register();
139 jsr166 1.33 * for (Runnable task : tasks) {
140 jsr166 1.13 * phaser.register();
141     * new Thread() {
142     * public void run() {
143     * do {
144 jsr166 1.33 * task.run();
145 jsr166 1.13 * phaser.arriveAndAwaitAdvance();
146     * } while(!phaser.isTerminated();
147 dl 1.4 * }
148 jsr166 1.13 * }.start();
149 dl 1.1 * }
150 dl 1.4 * phaser.arriveAndDeregister(); // deregister self, don't wait
151 jsr166 1.13 * }}</pre>
152 dl 1.1 *
153 jsr166 1.25 * <p>To create a set of tasks using a tree of phasers,
154 dl 1.4 * you could use code of the following form, assuming a
155 jsr166 1.24 * Task class with a constructor accepting a phaser that
156 dl 1.4 * it registers for upon construction:
157 jsr166 1.13 * <pre> {@code
158     * void build(Task[] actions, int lo, int hi, Phaser b) {
159     * int step = (hi - lo) / TASKS_PER_PHASER;
160     * if (step > 1) {
161     * int i = lo;
162     * while (i < hi) {
163     * int r = Math.min(i + step, hi);
164     * build(actions, i, r, new Phaser(b));
165     * i = r;
166     * }
167     * } else {
168     * for (int i = lo; i < hi; ++i)
169     * actions[i] = new Task(b);
170     * // assumes new Task(b) performs b.register()
171     * }
172     * }
173     * // .. initially called, for n tasks via
174     * build(new Task[n], 0, n, new Phaser());}</pre>
175 dl 1.4 *
176 jsr166 1.7 * The best value of {@code TASKS_PER_PHASER} depends mainly on
177 dl 1.4 * expected barrier synchronization rates. A value as low as four may
178     * be appropriate for extremely small per-barrier task bodies (thus
179     * high rates), or up to hundreds for extremely large ones.
180     *
181     * </pre>
182     *
183 dl 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
184 dl 1.4 * maximum number of parties to 65535. Attempts to register additional
185 jsr166 1.32 * parties result in {@code IllegalStateException}. However, you can and
186 dl 1.4 * should create tiered phasers to accommodate arbitrarily large sets
187     * of participants.
188 jsr166 1.16 *
189     * @since 1.7
190     * @author Doug Lea
191 dl 1.1 */
192     public class Phaser {
193     /*
194     * This class implements an extension of X10 "clocks". Thanks to
195 dl 1.4 * Vijay Saraswat for the idea, and to Vivek Sarkar for
196     * enhancements to extend functionality.
197 dl 1.1 */
198    
199     /**
200     * Barrier state representation. Conceptually, a barrier contains
201     * four values:
202 jsr166 1.3 *
203 dl 1.1 * * parties -- the number of parties to wait (16 bits)
204     * * unarrived -- the number of parties yet to hit barrier (16 bits)
205     * * phase -- the generation of the barrier (31 bits)
206     * * terminated -- set if barrier is terminated (1 bit)
207     *
208     * However, to efficiently maintain atomicity, these values are
209 dl 1.4 * packed into a single (atomic) long. Termination uses the sign
210     * bit of 32 bit representation of phase, so phase is set to -1 on
211 jsr166 1.8 * termination. Good performance relies on keeping state decoding
212 dl 1.4 * and encoding simple, and keeping race windows short.
213     *
214     * Note: there are some cheats in arrive() that rely on unarrived
215 dl 1.10 * count being lowest 16 bits.
216 dl 1.1 */
217 dl 1.4 private volatile long state;
218 dl 1.1
219     private static final int ushortBits = 16;
220 dl 1.10 private static final int ushortMask = 0xffff;
221     private static final int phaseMask = 0x7fffffff;
222 dl 1.1
223     private static int unarrivedOf(long s) {
224 jsr166 1.18 return (int) (s & ushortMask);
225 dl 1.1 }
226    
227     private static int partiesOf(long s) {
228 jsr166 1.17 return ((int) s) >>> 16;
229 dl 1.1 }
230    
231     private static int phaseOf(long s) {
232 jsr166 1.17 return (int) (s >>> 32);
233 dl 1.1 }
234    
235     private static int arrivedOf(long s) {
236     return partiesOf(s) - unarrivedOf(s);
237     }
238    
239     private static long stateFor(int phase, int parties, int unarrived) {
240 jsr166 1.17 return ((((long) phase) << 32) | (((long) parties) << 16) |
241     (long) unarrived);
242 dl 1.1 }
243    
244 dl 1.4 private static long trippedStateFor(int phase, int parties) {
245 jsr166 1.17 long lp = (long) parties;
246     return (((long) phase) << 32) | (lp << 16) | lp;
247 dl 1.4 }
248    
249 dl 1.10 /**
250 jsr166 1.14 * Returns message string for bad bounds exceptions.
251 dl 1.10 */
252     private static String badBounds(int parties, int unarrived) {
253     return ("Attempt to set " + unarrived +
254     " unarrived of " + parties + " parties");
255 dl 1.4 }
256    
257     /**
258     * The parent of this phaser, or null if none
259     */
260     private final Phaser parent;
261    
262     /**
263 jsr166 1.24 * The root of phaser tree. Equals this if not in a tree. Used to
264 dl 1.4 * support faster state push-down.
265     */
266     private final Phaser root;
267    
268     // Wait queues
269    
270     /**
271 dl 1.10 * Heads of Treiber stacks for waiting threads. To eliminate
272 dl 1.4 * contention while releasing some threads while adding others, we
273     * use two of them, alternating across even and odd phases.
274     */
275     private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
276     private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
277    
278     private AtomicReference<QNode> queueFor(int phase) {
279 jsr166 1.18 return ((phase & 1) == 0) ? evenQ : oddQ;
280 dl 1.4 }
281    
282     /**
283     * Returns current state, first resolving lagged propagation from
284     * root if necessary.
285     */
286     private long getReconciledState() {
287 jsr166 1.18 return (parent == null) ? state : reconcileState();
288 dl 1.4 }
289    
290     /**
291     * Recursively resolves state.
292     */
293     private long reconcileState() {
294     Phaser p = parent;
295     long s = state;
296     if (p != null) {
297     while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
298     long parentState = p.getReconciledState();
299     int parentPhase = phaseOf(parentState);
300     int phase = phaseOf(s = state);
301     if (phase != parentPhase) {
302     long next = trippedStateFor(parentPhase, partiesOf(s));
303     if (casState(s, next)) {
304     releaseWaiters(phase);
305     s = next;
306     }
307     }
308     }
309     }
310     return s;
311 dl 1.1 }
312    
313     /**
314 jsr166 1.24 * Creates a new phaser without any initially registered parties,
315 dl 1.10 * initial phase number 0, and no parent. Any thread using this
316 jsr166 1.24 * phaser will need to first register for it.
317 dl 1.1 */
318     public Phaser() {
319 dl 1.4 this(null);
320 dl 1.1 }
321    
322     /**
323 jsr166 1.24 * Creates a new phaser with the given numbers of registered
324 dl 1.4 * unarrived parties, initial phase number 0, and no parent.
325 jsr166 1.14 *
326     * @param parties the number of parties required to trip barrier
327 dl 1.1 * @throws IllegalArgumentException if parties less than zero
328 jsr166 1.14 * or greater than the maximum number of parties supported
329 dl 1.1 */
330     public Phaser(int parties) {
331 dl 1.4 this(null, parties);
332     }
333    
334     /**
335 jsr166 1.24 * Creates a new phaser with the given parent, without any
336 dl 1.4 * initially registered parties. If parent is non-null this phaser
337     * is registered with the parent and its initial phase number is
338     * the same as that of parent phaser.
339 jsr166 1.14 *
340     * @param parent the parent phaser
341 dl 1.4 */
342     public Phaser(Phaser parent) {
343     int phase = 0;
344     this.parent = parent;
345     if (parent != null) {
346     this.root = parent.root;
347     phase = parent.register();
348     }
349     else
350     this.root = this;
351     this.state = trippedStateFor(phase, 0);
352     }
353    
354     /**
355 jsr166 1.24 * Creates a new phaser with the given parent and numbers of
356 jsr166 1.14 * registered unarrived parties. If parent is non-null, this phaser
357 dl 1.4 * is registered with the parent and its initial phase number is
358     * the same as that of parent phaser.
359 jsr166 1.14 *
360     * @param parent the parent phaser
361     * @param parties the number of parties required to trip barrier
362 dl 1.4 * @throws IllegalArgumentException if parties less than zero
363 jsr166 1.14 * or greater than the maximum number of parties supported
364 dl 1.4 */
365     public Phaser(Phaser parent, int parties) {
366 dl 1.1 if (parties < 0 || parties > ushortMask)
367     throw new IllegalArgumentException("Illegal number of parties");
368 dl 1.4 int phase = 0;
369     this.parent = parent;
370     if (parent != null) {
371     this.root = parent.root;
372     phase = parent.register();
373     }
374     else
375     this.root = this;
376     this.state = trippedStateFor(phase, parties);
377 dl 1.1 }
378    
379     /**
380     * Adds a new unarrived party to this phaser.
381 jsr166 1.14 *
382 dl 1.35 * @return the arrival phase number to which this registration applied
383 dl 1.1 * @throws IllegalStateException if attempting to register more
384 jsr166 1.14 * than the maximum supported number of parties
385 dl 1.1 */
386 dl 1.4 public int register() {
387     return doRegister(1);
388     }
389    
390     /**
391     * Adds the given number of new unarrived parties to this phaser.
392 jsr166 1.14 *
393     * @param parties the number of parties required to trip barrier
394 dl 1.35 * @return the arrival phase number to which this registration applied
395 dl 1.4 * @throws IllegalStateException if attempting to register more
396 jsr166 1.14 * than the maximum supported number of parties
397 dl 1.4 */
398     public int bulkRegister(int parties) {
399     if (parties < 0)
400     throw new IllegalArgumentException();
401     if (parties == 0)
402     return getPhase();
403     return doRegister(parties);
404     }
405    
406     /**
407     * Shared code for register, bulkRegister
408     */
409     private int doRegister(int registrations) {
410     int phase;
411 dl 1.1 for (;;) {
412 dl 1.4 long s = getReconciledState();
413     phase = phaseOf(s);
414     int unarrived = unarrivedOf(s) + registrations;
415     int parties = partiesOf(s) + registrations;
416 jsr166 1.12 if (phase < 0)
417 dl 1.4 break;
418 dl 1.1 if (parties > ushortMask || unarrived > ushortMask)
419 dl 1.10 throw new IllegalStateException(badBounds(parties, unarrived));
420 dl 1.4 if (phase == phaseOf(root.state) &&
421     casState(s, stateFor(phase, parties, unarrived)))
422     break;
423 dl 1.1 }
424 dl 1.4 return phase;
425 dl 1.1 }
426    
427     /**
428     * Arrives at the barrier, but does not wait for others. (You can
429     * in turn wait for others via {@link #awaitAdvance}).
430     *
431 dl 1.35 * @return the arrival phase number, or a negative value if terminated
432 dl 1.4 * @throws IllegalStateException if not terminated and the number
433 jsr166 1.14 * of unarrived parties would become negative
434 dl 1.1 */
435 dl 1.4 public int arrive() {
436     int phase;
437 dl 1.1 for (;;) {
438 dl 1.4 long s = state;
439     phase = phaseOf(s);
440 dl 1.10 if (phase < 0)
441     break;
442 dl 1.1 int parties = partiesOf(s);
443     int unarrived = unarrivedOf(s) - 1;
444 dl 1.4 if (unarrived > 0) { // Not the last arrival
445     if (casState(s, s - 1)) // s-1 adds one arrival
446     break;
447     }
448     else if (unarrived == 0) { // the last arrival
449     Phaser par = parent;
450     if (par == null) { // directly trip
451     if (casState
452     (s,
453 jsr166 1.18 trippedStateFor(onAdvance(phase, parties) ? -1 :
454 dl 1.4 ((phase + 1) & phaseMask), parties))) {
455     releaseWaiters(phase);
456     break;
457     }
458     }
459     else { // cascade to parent
460     if (casState(s, s - 1)) { // zeroes unarrived
461     par.arrive();
462     reconcileState();
463     break;
464     }
465     }
466     }
467     else if (phase != phaseOf(root.state)) // or if unreconciled
468     reconcileState();
469     else
470 dl 1.10 throw new IllegalStateException(badBounds(parties, unarrived));
471 dl 1.1 }
472 dl 1.4 return phase;
473 dl 1.1 }
474    
475     /**
476 dl 1.27 * Arrives at the barrier and deregisters from it without waiting
477     * for others. Deregistration reduces the number of parties
478 dl 1.4 * required to trip the barrier in future phases. If this phaser
479     * has a parent, and deregistration causes this phaser to have
480 dl 1.27 * zero parties, this phaser also arrives at and is deregistered
481     * from its parent.
482 dl 1.1 *
483 dl 1.35 * @return the arrival phase number, or a negative value if terminated
484 dl 1.4 * @throws IllegalStateException if not terminated and the number
485 jsr166 1.14 * of registered or unarrived parties would become negative
486 dl 1.1 */
487 dl 1.4 public int arriveAndDeregister() {
488     // similar code to arrive, but too different to merge
489     Phaser par = parent;
490     int phase;
491 dl 1.1 for (;;) {
492 dl 1.4 long s = state;
493     phase = phaseOf(s);
494 dl 1.10 if (phase < 0)
495     break;
496 dl 1.1 int parties = partiesOf(s) - 1;
497     int unarrived = unarrivedOf(s) - 1;
498 dl 1.4 if (parties >= 0) {
499     if (unarrived > 0 || (unarrived == 0 && par != null)) {
500     if (casState
501     (s,
502     stateFor(phase, parties, unarrived))) {
503     if (unarrived == 0) {
504     par.arriveAndDeregister();
505     reconcileState();
506     }
507     break;
508     }
509     continue;
510     }
511     if (unarrived == 0) {
512     if (casState
513     (s,
514 jsr166 1.18 trippedStateFor(onAdvance(phase, parties) ? -1 :
515 dl 1.4 ((phase + 1) & phaseMask), parties))) {
516     releaseWaiters(phase);
517     break;
518     }
519     continue;
520     }
521     if (par != null && phase != phaseOf(root.state)) {
522     reconcileState();
523     continue;
524     }
525 dl 1.1 }
526 dl 1.10 throw new IllegalStateException(badBounds(parties, unarrived));
527 dl 1.1 }
528 dl 1.4 return phase;
529 dl 1.1 }
530    
531     /**
532 dl 1.4 * Arrives at the barrier and awaits others. Equivalent in effect
533 dl 1.27 * to {@code awaitAdvance(arrive())}. If you need to await with
534     * interruption or timeout, you can arrange this with an analogous
535     * construction using one of the other forms of the awaitAdvance
536     * method. If instead you need to deregister upon arrival use
537     * {@code arriveAndDeregister}.
538 jsr166 1.14 *
539 dl 1.35 * @return the arrival phase number, or a negative number if terminated
540 dl 1.4 * @throws IllegalStateException if not terminated and the number
541 jsr166 1.14 * of unarrived parties would become negative
542 dl 1.1 */
543     public int arriveAndAwaitAdvance() {
544 dl 1.4 return awaitAdvance(arrive());
545 dl 1.1 }
546    
547     /**
548 dl 1.27 * Awaits the phase of the barrier to advance from the given phase
549 dl 1.30 * value, returning immediately if the current phase of the
550     * barrier is not equal to the given phase value or this barrier
551     * is terminated.
552 jsr166 1.14 *
553 dl 1.35 * @param phase an arrival phase number, or negative value if
554     * terminated; this argument is normally the value returned by a
555     * previous call to {@code arrive} or its variants
556     * @return the next arrival phase number, or a negative value
557     * if terminated or argument is negative
558 dl 1.1 */
559     public int awaitAdvance(int phase) {
560     if (phase < 0)
561     return phase;
562 dl 1.4 long s = getReconciledState();
563     int p = phaseOf(s);
564     if (p != phase)
565     return p;
566 dl 1.10 if (unarrivedOf(s) == 0 && parent != null)
567 dl 1.4 parent.awaitAdvance(phase);
568     // Fall here even if parent waited, to reconcile and help release
569     return untimedWait(phase);
570 dl 1.1 }
571    
572     /**
573 dl 1.30 * Awaits the phase of the barrier to advance from the given phase
574 jsr166 1.32 * value, throwing {@code InterruptedException} if interrupted while
575 dl 1.30 * waiting, or returning immediately if the current phase of the
576     * barrier is not equal to the given phase value or this barrier
577 jsr166 1.32 * is terminated.
578 jsr166 1.14 *
579 dl 1.35 * @param phase an arrival phase number, or negative value if
580     * terminated; this argument is normally the value returned by a
581     * previous call to {@code arrive} or its variants
582     * @return the next arrival phase number, or a negative value
583     * if terminated or argument is negative
584 dl 1.1 * @throws InterruptedException if thread interrupted while waiting
585     */
586 jsr166 1.12 public int awaitAdvanceInterruptibly(int phase)
587 dl 1.10 throws InterruptedException {
588 dl 1.1 if (phase < 0)
589     return phase;
590 dl 1.4 long s = getReconciledState();
591     int p = phaseOf(s);
592     if (p != phase)
593     return p;
594 dl 1.10 if (unarrivedOf(s) == 0 && parent != null)
595 dl 1.4 parent.awaitAdvanceInterruptibly(phase);
596     return interruptibleWait(phase);
597 dl 1.1 }
598    
599     /**
600 dl 1.30 * Awaits the phase of the barrier to advance from the given phase
601 jsr166 1.32 * value or the given timeout to elapse, throwing
602     * {@code InterruptedException} if interrupted while waiting, or
603     * returning immediately if the current phase of the barrier is not
604     * equal to the given phase value or this barrier is terminated.
605 jsr166 1.14 *
606 dl 1.35 * @param phase an arrival phase number, or negative value if
607     * terminated; this argument is normally the value returned by a
608     * previous call to {@code arrive} or its variants
609 dl 1.31 * @param timeout how long to wait before giving up, in units of
610     * {@code unit}
611     * @param unit a {@code TimeUnit} determining how to interpret the
612     * {@code timeout} parameter
613 dl 1.35 * @return the next arrival phase number, or a negative value
614     * if terminated or argument is negative
615 dl 1.1 * @throws InterruptedException if thread interrupted while waiting
616     * @throws TimeoutException if timed out while waiting
617     */
618 jsr166 1.18 public int awaitAdvanceInterruptibly(int phase,
619     long timeout, TimeUnit unit)
620 dl 1.1 throws InterruptedException, TimeoutException {
621     if (phase < 0)
622     return phase;
623 dl 1.4 long s = getReconciledState();
624     int p = phaseOf(s);
625     if (p != phase)
626     return p;
627 dl 1.10 if (unarrivedOf(s) == 0 && parent != null)
628 dl 1.4 parent.awaitAdvanceInterruptibly(phase, timeout, unit);
629     return timedWait(phase, unit.toNanos(timeout));
630 dl 1.1 }
631    
632     /**
633     * Forces this barrier to enter termination state. Counts of
634 dl 1.4 * arrived and registered parties are unaffected. If this phaser
635     * has a parent, it too is terminated. This method may be useful
636     * for coordinating recovery after one or more tasks encounter
637     * unexpected exceptions.
638 dl 1.1 */
639     public void forceTermination() {
640     for (;;) {
641 dl 1.4 long s = getReconciledState();
642 dl 1.1 int phase = phaseOf(s);
643     int parties = partiesOf(s);
644     int unarrived = unarrivedOf(s);
645     if (phase < 0 ||
646 dl 1.4 casState(s, stateFor(-1, parties, unarrived))) {
647     releaseWaiters(0);
648     releaseWaiters(1);
649     if (parent != null)
650     parent.forceTermination();
651 dl 1.1 return;
652     }
653     }
654     }
655    
656     /**
657 dl 1.4 * Returns the current phase number. The maximum phase number is
658 jsr166 1.7 * {@code Integer.MAX_VALUE}, after which it restarts at
659 dl 1.4 * zero. Upon termination, the phase number is negative.
660 jsr166 1.14 *
661 dl 1.4 * @return the phase number, or a negative value if terminated
662 dl 1.1 */
663 dl 1.4 public final int getPhase() {
664     return phaseOf(getReconciledState());
665 dl 1.1 }
666    
667     /**
668     * Returns the number of parties registered at this barrier.
669 jsr166 1.14 *
670 dl 1.1 * @return the number of parties
671     */
672     public int getRegisteredParties() {
673 dl 1.4 return partiesOf(state);
674 dl 1.1 }
675    
676     /**
677 dl 1.36 * Returns the number of registered parties that have arrived at
678     * the current phase of this barrier.
679 jsr166 1.14 *
680 dl 1.1 * @return the number of arrived parties
681     */
682     public int getArrivedParties() {
683 dl 1.4 return arrivedOf(state);
684 dl 1.1 }
685    
686     /**
687     * Returns the number of registered parties that have not yet
688     * arrived at the current phase of this barrier.
689 jsr166 1.14 *
690 dl 1.1 * @return the number of unarrived parties
691     */
692     public int getUnarrivedParties() {
693 dl 1.4 return unarrivedOf(state);
694     }
695    
696     /**
697 jsr166 1.23 * Returns the parent of this phaser, or {@code null} if none.
698 jsr166 1.14 *
699 jsr166 1.23 * @return the parent of this phaser, or {@code null} if none
700 dl 1.4 */
701     public Phaser getParent() {
702     return parent;
703     }
704    
705     /**
706     * Returns the root ancestor of this phaser, which is the same as
707     * this phaser if it has no parent.
708 jsr166 1.14 *
709 jsr166 1.9 * @return the root ancestor of this phaser
710 dl 1.4 */
711     public Phaser getRoot() {
712     return root;
713 dl 1.1 }
714    
715     /**
716 jsr166 1.9 * Returns {@code true} if this barrier has been terminated.
717 jsr166 1.14 *
718 jsr166 1.9 * @return {@code true} if this barrier has been terminated
719 dl 1.1 */
720     public boolean isTerminated() {
721 dl 1.4 return getPhase() < 0;
722 dl 1.1 }
723    
724     /**
725     * Overridable method to perform an action upon phase advance, and
726     * to control termination. This method is invoked whenever the
727     * barrier is tripped (and thus all other waiting parties are
728 jsr166 1.23 * dormant). If it returns {@code true}, then, rather than advance
729     * the phase number, this barrier will be set to a final
730     * termination state, and subsequent calls to {@link #isTerminated}
731     * will return true.
732 jsr166 1.3 *
733 jsr166 1.25 * <p>The default version returns {@code true} when the number of
734 dl 1.1 * registered parties is zero. Normally, overrides that arrange
735     * termination for other reasons should also preserve this
736     * property.
737     *
738 jsr166 1.25 * <p>You may override this method to perform an action with side
739 dl 1.4 * effects visible to participating tasks, but it is in general
740     * only sensible to do so in designs where all parties register
741 jsr166 1.24 * before any arrive, and all {@link #awaitAdvance} at each phase.
742 dl 1.27 * Otherwise, you cannot ensure lack of interference from other
743 jsr166 1.29 * parties during the invocation of this method.
744 dl 1.4 *
745 dl 1.1 * @param phase the phase number on entering the barrier
746 jsr166 1.9 * @param registeredParties the current number of registered parties
747     * @return {@code true} if this barrier should terminate
748 dl 1.1 */
749     protected boolean onAdvance(int phase, int registeredParties) {
750     return registeredParties <= 0;
751     }
752    
753     /**
754 dl 1.4 * Returns a string identifying this phaser, as well as its
755 dl 1.1 * state. The state, in brackets, includes the String {@code
756 jsr166 1.9 * "phase = "} followed by the phase number, {@code "parties = "}
757 dl 1.1 * followed by the number of registered parties, and {@code
758 jsr166 1.9 * "arrived = "} followed by the number of arrived parties.
759 dl 1.1 *
760     * @return a string identifying this barrier, as well as its state
761     */
762     public String toString() {
763 dl 1.4 long s = getReconciledState();
764 jsr166 1.9 return super.toString() +
765     "[phase = " + phaseOf(s) +
766     " parties = " + partiesOf(s) +
767     " arrived = " + arrivedOf(s) + "]";
768 dl 1.1 }
769    
770 dl 1.4 // methods for waiting
771 dl 1.1
772     /**
773 dl 1.10 * Wait nodes for Treiber stack representing wait queue
774 dl 1.1 */
775 dl 1.10 static final class QNode implements ForkJoinPool.ManagedBlocker {
776     final Phaser phaser;
777     final int phase;
778     final long startTime;
779     final long nanos;
780     final boolean timed;
781     final boolean interruptible;
782     volatile boolean wasInterrupted = false;
783     volatile Thread thread; // nulled to cancel wait
784 dl 1.4 QNode next;
785 dl 1.10 QNode(Phaser phaser, int phase, boolean interruptible,
786     boolean timed, long startTime, long nanos) {
787     this.phaser = phaser;
788     this.phase = phase;
789     this.timed = timed;
790     this.interruptible = interruptible;
791     this.startTime = startTime;
792     this.nanos = nanos;
793 dl 1.4 thread = Thread.currentThread();
794     }
795 dl 1.10 public boolean isReleasable() {
796     return (thread == null ||
797     phaser.getPhase() != phase ||
798     (interruptible && wasInterrupted) ||
799     (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
800     }
801     public boolean block() {
802     if (Thread.interrupted()) {
803     wasInterrupted = true;
804     if (interruptible)
805     return true;
806     }
807     if (!timed)
808     LockSupport.park(this);
809     else {
810     long waitTime = nanos - (System.nanoTime() - startTime);
811     if (waitTime <= 0)
812     return true;
813     LockSupport.parkNanos(this, waitTime);
814     }
815     return isReleasable();
816     }
817 dl 1.4 void signal() {
818     Thread t = thread;
819     if (t != null) {
820     thread = null;
821     LockSupport.unpark(t);
822     }
823     }
824 dl 1.10 boolean doWait() {
825     if (thread != null) {
826     try {
827     ForkJoinPool.managedBlock(this, false);
828     } catch (InterruptedException ie) {
829 jsr166 1.12 }
830 dl 1.10 }
831     return wasInterrupted;
832     }
833    
834 dl 1.4 }
835    
836     /**
837 jsr166 1.14 * Removes and signals waiting threads from wait queue.
838 dl 1.4 */
839     private void releaseWaiters(int phase) {
840     AtomicReference<QNode> head = queueFor(phase);
841     QNode q;
842     while ((q = head.get()) != null) {
843     if (head.compareAndSet(q, q.next))
844     q.signal();
845     }
846     }
847    
848     /**
849 jsr166 1.14 * Tries to enqueue given node in the appropriate wait queue.
850     *
851 dl 1.10 * @return true if successful
852     */
853     private boolean tryEnqueue(QNode node) {
854     AtomicReference<QNode> head = queueFor(node.phase);
855     return head.compareAndSet(node.next = head.get(), node);
856     }
857    
858     /**
859 dl 1.1 * Enqueues node and waits unless aborted or signalled.
860 jsr166 1.14 *
861 dl 1.10 * @return current phase
862 dl 1.1 */
863 dl 1.4 private int untimedWait(int phase) {
864 dl 1.1 QNode node = null;
865 dl 1.10 boolean queued = false;
866 dl 1.4 boolean interrupted = false;
867     int p;
868     while ((p = getPhase()) == phase) {
869 dl 1.10 if (Thread.interrupted())
870     interrupted = true;
871     else if (node == null)
872     node = new QNode(this, phase, false, false, 0, 0);
873     else if (!queued)
874     queued = tryEnqueue(node);
875 dl 1.4 else
876 dl 1.10 interrupted = node.doWait();
877 dl 1.4 }
878     if (node != null)
879     node.thread = null;
880 dl 1.10 releaseWaiters(phase);
881 dl 1.4 if (interrupted)
882     Thread.currentThread().interrupt();
883     return p;
884     }
885    
886     /**
887 dl 1.10 * Interruptible version
888     * @return current phase
889 dl 1.4 */
890     private int interruptibleWait(int phase) throws InterruptedException {
891     QNode node = null;
892     boolean queued = false;
893     boolean interrupted = false;
894     int p;
895 dl 1.10 while ((p = getPhase()) == phase && !interrupted) {
896     if (Thread.interrupted())
897     interrupted = true;
898     else if (node == null)
899     node = new QNode(this, phase, true, false, 0, 0);
900     else if (!queued)
901     queued = tryEnqueue(node);
902 dl 1.1 else
903 dl 1.10 interrupted = node.doWait();
904 dl 1.1 }
905     if (node != null)
906     node.thread = null;
907 dl 1.10 if (p != phase || (p = getPhase()) != phase)
908     releaseWaiters(phase);
909 dl 1.4 if (interrupted)
910     throw new InterruptedException();
911     return p;
912 dl 1.1 }
913    
914     /**
915 dl 1.10 * Timeout version.
916     * @return current phase
917 dl 1.1 */
918 dl 1.4 private int timedWait(int phase, long nanos)
919 dl 1.1 throws InterruptedException, TimeoutException {
920 dl 1.10 long startTime = System.nanoTime();
921     QNode node = null;
922     boolean queued = false;
923     boolean interrupted = false;
924 dl 1.4 int p;
925 dl 1.10 while ((p = getPhase()) == phase && !interrupted) {
926     if (Thread.interrupted())
927     interrupted = true;
928     else if (nanos - (System.nanoTime() - startTime) <= 0)
929     break;
930     else if (node == null)
931     node = new QNode(this, phase, true, true, startTime, nanos);
932     else if (!queued)
933     queued = tryEnqueue(node);
934     else
935     interrupted = node.doWait();
936 dl 1.4 }
937 dl 1.10 if (node != null)
938     node.thread = null;
939     if (p != phase || (p = getPhase()) != phase)
940     releaseWaiters(phase);
941     if (interrupted)
942     throw new InterruptedException();
943     if (p == phase)
944     throw new TimeoutException();
945 dl 1.4 return p;
946     }
947    
948 jsr166 1.22 // Unsafe mechanics
949    
950     private static final sun.misc.Unsafe UNSAFE = getUnsafe();
951     private static final long stateOffset =
952     objectFieldOffset("state", Phaser.class);
953    
954     private final boolean casState(long cmp, long val) {
955     return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
956     }
957    
958     private static long objectFieldOffset(String field, Class<?> klazz) {
959     try {
960     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
961     } catch (NoSuchFieldException e) {
962     // Convert Exception to corresponding Error
963     NoSuchFieldError error = new NoSuchFieldError(field);
964     error.initCause(e);
965     throw error;
966     }
967     }
968    
969     /**
970     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
971     * Replace with a simple call to Unsafe.getUnsafe when integrating
972     * into a jdk.
973     *
974     * @return a sun.misc.Unsafe
975     */
976 jsr166 1.19 private static sun.misc.Unsafe getUnsafe() {
977 jsr166 1.11 try {
978 jsr166 1.19 return sun.misc.Unsafe.getUnsafe();
979 jsr166 1.11 } catch (SecurityException se) {
980     try {
981     return java.security.AccessController.doPrivileged
982 jsr166 1.22 (new java.security
983     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
984 jsr166 1.19 public sun.misc.Unsafe run() throws Exception {
985 jsr166 1.22 java.lang.reflect.Field f = sun.misc
986     .Unsafe.class.getDeclaredField("theUnsafe");
987     f.setAccessible(true);
988     return (sun.misc.Unsafe) f.get(null);
989 jsr166 1.11 }});
990     } catch (java.security.PrivilegedActionException e) {
991 jsr166 1.19 throw new RuntimeException("Could not initialize intrinsics",
992     e.getCause());
993 jsr166 1.11 }
994     }
995     }
996 dl 1.1 }