ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.2
Committed: Fri Jul 25 18:10:41 2008 UTC (15 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +4 -4 lines
Log Message:
typos

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import jsr166y.forkjoin.*;
9     import java.util.concurrent.*;
10     import java.util.concurrent.atomic.*;
11     import java.util.concurrent.locks.LockSupport;
12    
13     /**
14     * A reusable synchronization barrier, similar in functionality to a
15     * {@link java.util.concurrent.CyclicBarrier}, but supporting more
16     * flexible usage.
17     *
18     * <ul>
19     *
20     * <li> The number of parties synchronizing on the barrier may vary
21     * over time. A task may register to be a party in a barrier at any
22     * time, and may deregister upon arriving at the barrier. As is the
23     * case with most basic synchronization constructs, registration
24     * and deregistration affect only internal counts; they do not
25     * establish any further internal bookkeeping, so tasks cannot query
26     * whether they are registered.
27     *
28     * <li> Each generation has an associated phase value, starting at
29     * zero, and advancing when all parties reach the barrier (wrapping
30     * around to zero after reaching <tt>Integer.MAX_VALUE</tt>).
31     *
32     * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
33     * Method <tt>arriveAndAwaitAdvance</tt> has effect analogous to
34     * <tt>CyclicBarrier.await</tt>. However, Phasers separate two
35     * aspects of coordination, that may be invoked independently:
36     *
37     * <ul>
38     *
39     * <li> Arriving at a barrier. Methods <tt>arrive</tt> and
40     * <tt>arriveAndDeregister</tt> do not block, but return
41     * the phase value on entry to the method.
42     *
43     * <li> Awaiting others. Method <tt>awaitAdvance</tt> requires an
44     * argument indicating the entry phase, and returns when the
45     * barrier advances to a new phase.
46     * </ul>
47     *
48     *
49     * <li> Barrier actions, performed by the task triggering a phase
50     * advance while others may be waiting, are arranged by overriding
51     * method <tt>onAdvance</tt>, that also controls termination.
52     *
53     * <li> Phasers may enter a <em>termination</em> state in which all
54     * await actions immediately return, indicating (via a negative phase
55     * value) that execution is complete. Termination is triggered by
56     * executing the overridable <tt>onAdvance</tt> method that is invoked
57     * each time the barrier is tripped. When a Phaser is controlling an
58     * action with a fixed number of iterations, it is often convenient to
59     * override this method to cause termination when the current phase
60     * number reaches a threshold. Method <tt>forceTermination</tt> is
61     * also available to assist recovery actions upon failure.
62     *
63     * <li> Unlike most synchronizers, a Phaser may also be used with
64     * ForkJoinTasks (as well as plain threads).
65     *
66     * <li> By default, <tt>awaitAdvance</tt> continues to wait even if
67     * the current thread is interrupted. And unlike the case in
68     * CyclicBarriers, exceptions encountered while tasks wait
69     * interruptibly or with timeout do not change the state of the
70     * barrier. If necessary, you can perform any associated recovery
71     * within handlers of those exceptions.
72     *
73     * </ul>
74     *
75     * <p><b>Sample usage:</b>
76     *
77     * <p>[todo: non-FJ example]
78     *
79     * <p> A Phaser may be used to support a style of programming in
80     * which a task waits for others to complete, without otherwise
81     * needing to keep track of which tasks it is waiting for. This is
82     * similar to the "sync" construct in Cilk and "clocks" in X10.
83     * Special constructions based on such barriers are available using
84     * the <tt>LinkedAsyncAction</tt> and <tt>CyclicAction</tt> classes,
85     * but they can be useful in other contexts as well. For a simple
86     * (but not very useful) example, here is a variant of Fibonacci:
87     *
88     * <pre>
89     * class BarrierFibonacci extends RecursiveAction {
90     * int argument, result;
91     * final Phaser parentBarrier;
92     * BarrierFibonacci(int n, Phaser parentBarrier) {
93     * this.argument = n;
94     * this.parentBarrier = parentBarrier;
95     * parentBarrier.register();
96     * }
97     * protected void compute() {
98     * int n = argument;
99     * if (n &lt;= 1)
100     * result = n;
101     * else {
102     * Phaser childBarrier = new Phaser(1);
103     * BarrierFibonacci f1 = new BarrierFibonacci(n - 1, childBarrier);
104     * BarrierFibonacci f2 = new BarrierFibonacci(n - 2, childBarrier);
105     * f1.fork();
106     * f2.fork();
107     * childBarrier.arriveAndAwait();
108     * result = f1.result + f2.result;
109     * }
110     * parentBarrier.arriveAndDeregister();
111     * }
112     * }
113     * </pre>
114     *
115     * <p><b>Implementation notes</b>: This implementation restricts the
116     * maximum number of parties to 65535. Attempts to register
117     * additional parties result in IllegalStateExceptions.
118     */
119     public class Phaser {
120     /*
121     * This class implements an extension of X10 "clocks". Thanks to
122     * Vijay Saraswat for the idea of applying it to ForkJoinTasks,
123     * and to Vivek Sarkar for enhancements to extend functionality.
124     */
125    
126     /**
127     * Barrier state representation. Conceptually, a barrier contains
128     * four values:
129     *
130     * * parties -- the number of parties to wait (16 bits)
131     * * unarrived -- the number of parties yet to hit barrier (16 bits)
132     * * phase -- the generation of the barrier (31 bits)
133     * * terminated -- set if barrier is terminated (1 bit)
134     *
135     * However, to efficiently maintain atomicity, these values are
136     * packed into a single AtomicLong. Termination uses the sign bit
137     * of 32 bit representation of phase, so phase is set to -1 on
138     * termination.
139     */
140     private final AtomicLong state;
141    
142     /**
143     * Head of Treiber stack for waiting nonFJ threads.
144     */
145     private final AtomicReference<QNode> head = new AtomicReference<QNode>();
146    
147     private static final int ushortBits = 16;
148     private static final int ushortMask = (1 << ushortBits) - 1;
149     private static final int phaseMask = 0x7fffffff;
150    
151     private static int unarrivedOf(long s) {
152     return (int)(s & ushortMask);
153     }
154    
155     private static int partiesOf(long s) {
156     return (int)(s & (ushortMask << 16)) >>> 16;
157     }
158    
159     private static int phaseOf(long s) {
160     return (int)(s >>> 32);
161     }
162    
163     private static int arrivedOf(long s) {
164     return partiesOf(s) - unarrivedOf(s);
165     }
166    
167     private static long stateFor(int phase, int parties, int unarrived) {
168     return (((long)phase) << 32) | ((parties << 16) | unarrived);
169     }
170    
171     private static IllegalStateException badBounds(int parties, int unarrived) {
172     return new IllegalStateException("Attempt to set " + unarrived +
173     " unarrived of " + parties + " parties");
174     }
175    
176     /**
177     * Creates a new Phaser without any initially registered parties,
178     * and initial phase number 0.
179     */
180     public Phaser() {
181     state = new AtomicLong(stateFor(0, 0, 0));
182     }
183    
184     /**
185     * Creates a new Phaser with the given numbers of registered
186     * unarrived parties and initial phase number 0.
187     * @param parties the number of parties required to trip barrier.
188     * @throws IllegalArgumentException if parties less than zero
189     * or greater than the maximum number of parties supported.
190     */
191     public Phaser(int parties) {
192     if (parties < 0 || parties > ushortMask)
193     throw new IllegalArgumentException("Illegal number of parties");
194     state = new AtomicLong(stateFor(0, parties, parties));
195     }
196    
197     /**
198     * Adds a new unarrived party to this phaser.
199     * @return the current barrier phase number upon registration
200     * @throws IllegalStateException if attempting to register more
201     * than the maximum supported number of parties.
202     */
203     public int register() { // increment both parties and unarrived
204     final AtomicLong state = this.state;
205     for (;;) {
206     long s = state.get();
207     int phase = phaseOf(s);
208     int parties = partiesOf(s) + 1;
209     int unarrived = unarrivedOf(s) + 1;
210     if (parties > ushortMask || unarrived > ushortMask)
211     throw badBounds(parties, unarrived);
212     if (state.compareAndSet(s, stateFor(phase, parties, unarrived)))
213     return phase;
214     }
215     }
216    
217     /**
218     * Arrives at the barrier, but does not wait for others. (You can
219     * in turn wait for others via {@link #awaitAdvance}).
220     *
221     * @return the current barrier phase number upon entry to
222     * this method, or a negative value if terminated;
223     * @throws IllegalStateException if the number of unarrived
224     * parties would become negative.
225     */
226     public int arrive() { // decrement unarrived. If zero, trip
227     final AtomicLong state = this.state;
228     for (;;) {
229     long s = state.get();
230     int phase = phaseOf(s);
231     int parties = partiesOf(s);
232     int unarrived = unarrivedOf(s) - 1;
233     if (unarrived < 0)
234     throw badBounds(parties, unarrived);
235     if (unarrived == 0 && phase >= 0) {
236     trip(phase, parties);
237     return phase;
238     }
239     if (state.compareAndSet(s, stateFor(phase, parties, unarrived)))
240     return phase;
241     }
242     }
243    
244     /**
245     * Arrives at the barrier, and deregisters from it, without
246     * waiting for others.
247     *
248     * @return the current barrier phase number upon entry to
249     * this method, or a negative value if terminated;
250     * @throws IllegalStateException if the number of registered or
251     * unarrived parties would become negative.
252     */
253     public int arriveAndDeregister() { // Same as arrive, plus decrement parties
254     final AtomicLong state = this.state;
255     for (;;) {
256     long s = state.get();
257     int phase = phaseOf(s);
258     int parties = partiesOf(s) - 1;
259     int unarrived = unarrivedOf(s) - 1;
260     if (parties < 0 || unarrived < 0)
261     throw badBounds(parties, unarrived);
262     if (unarrived == 0 && phase >= 0) {
263     trip(phase, parties);
264     return phase;
265     }
266     if (state.compareAndSet(s, stateFor(phase, parties, unarrived)))
267     return phase;
268     }
269     }
270    
271     /**
272     * Arrives at the barrier and awaits others. Unlike other arrival
273     * methods, this method returns the arrival index of the
274     * caller. The caller tripping the barrier returns zero, the
275     * previous caller 1, and so on.
276     * @return the arrival index
277     * @throws IllegalStateException if the number of unarrived
278     * parties would become negative.
279     */
280     public int arriveAndAwaitAdvance() {
281     final AtomicLong state = this.state;
282     for (;;) {
283     long s = state.get();
284     int phase = phaseOf(s);
285     int parties = partiesOf(s);
286     int unarrived = unarrivedOf(s) - 1;
287     if (unarrived < 0)
288     throw badBounds(parties, unarrived);
289     if (unarrived == 0 && phase >= 0) {
290     trip(phase, parties);
291     return 0;
292     }
293     if (state.compareAndSet(s, stateFor(phase, parties, unarrived))) {
294     awaitAdvance(phase);
295     return unarrived;
296     }
297     }
298     }
299    
300     /**
301     * Awaits the phase of the barrier to advance from the given
302 jsr166 1.2 * value, or returns immediately if this barrier is terminated.
303 dl 1.1 * @param phase the phase on entry to this method
304     * @return the phase on exit from this method
305     */
306     public int awaitAdvance(int phase) {
307     if (phase < 0)
308     return phase;
309     Thread current = Thread.currentThread();
310     if (current instanceof ForkJoinWorkerThread)
311     return helpingWait(phase);
312     if (untimedWait(current, phase, false))
313     current.interrupt();
314     return phaseOf(state.get());
315     }
316    
317     /**
318     * Awaits the phase of the barrier to advance from the given
319     * value, or returns immediately if this barrier is terminated, or
320     * throws InterruptedException if interrupted while waiting.
321     * @param phase the phase on entry to this method
322     * @return the phase on exit from this method
323     * @throws InterruptedException if thread interrupted while waiting
324     */
325     public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
326     if (phase < 0)
327     return phase;
328     Thread current = Thread.currentThread();
329     if (current instanceof ForkJoinWorkerThread)
330     return helpingWait(phase);
331     else if (Thread.interrupted() || untimedWait(current, phase, true))
332     throw new InterruptedException();
333     else
334     return phaseOf(state.get());
335     }
336    
337     /**
338     * Awaits the phase of the barrier to advance from the given value
339     * or the given timeout elapses, or returns immediately if this
340 jsr166 1.2 * barrier is terminated.
341 dl 1.1 * @param phase the phase on entry to this method
342     * @return the phase on exit from this method
343     * @throws InterruptedException if thread interrupted while waiting
344     * @throws TimeoutException if timed out while waiting
345     */
346     public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
347     throws InterruptedException, TimeoutException {
348     if (phase < 0)
349     return phase;
350     long nanos = unit.toNanos(timeout);
351     Thread current = Thread.currentThread();
352     if (current instanceof ForkJoinWorkerThread)
353     return timedHelpingWait(phase, nanos);
354     timedWait(current, phase, nanos);
355     return phaseOf(state.get());
356     }
357    
358     /**
359     * Forces this barrier to enter termination state. Counts of
360     * arrived and registered parties are unaffected. This method may
361     * be useful for coordinating recovery after one or more tasks
362     * encounter unexpected exceptions.
363     */
364     public void forceTermination() {
365     final AtomicLong state = this.state;
366     for (;;) {
367     long s = state.get();
368     int phase = phaseOf(s);
369     int parties = partiesOf(s);
370     int unarrived = unarrivedOf(s);
371     if (phase < 0 ||
372     state.compareAndSet(s, stateFor(-1, parties, unarrived))) {
373     if (head.get() != null)
374     releaseWaiters(-1);
375     return;
376     }
377     }
378     }
379    
380     /**
381     * Resets the barrier with the given numbers of registered unarrived
382     * parties and phase number 0. This method allows repeated reuse
383     * of this barrier, but only if it is somehow known not to be in
384     * use for other purposes.
385     * @param parties the number of parties required to trip barrier.
386     * @throws IllegalArgumentException if parties less than zero
387     * or greater than the maximum number of parties supported.
388     */
389     public void reset(int parties) {
390     if (parties < 0 || parties > ushortMask)
391     throw new IllegalArgumentException("Illegal number of parties");
392     state.set(stateFor(0, parties, parties));
393     if (head.get() != null)
394     releaseWaiters(0);
395     }
396    
397     /**
398     * Returns the current phase number. The maximum phase number is
399     * <tt>Integer.MAX_VALUE</tt>, after which it restarts at
400     * zero. Upon termination, the phase number is negative.
401     * @return the phase number, or a negative value if terminated
402     */
403     public int getPhase() {
404     return phaseOf(state.get());
405     }
406    
407     /**
408     * Returns the number of parties registered at this barrier.
409     * @return the number of parties
410     */
411     public int getRegisteredParties() {
412     return partiesOf(state.get());
413     }
414    
415     /**
416     * Returns the number of parties that have arrived at the current
417     * phase of this barrier.
418     * @return the number of arrived parties
419     */
420     public int getArrivedParties() {
421     return arrivedOf(state.get());
422     }
423    
424     /**
425     * Returns the number of registered parties that have not yet
426     * arrived at the current phase of this barrier.
427     * @return the number of unarrived parties
428     */
429     public int getUnarrivedParties() {
430     return unarrivedOf(state.get());
431     }
432    
433     /**
434 jsr166 1.2 * Returns true if this barrier has been terminated.
435 dl 1.1 * @return true if this barrier has been terminated
436     */
437     public boolean isTerminated() {
438     return phaseOf(state.get()) < 0;
439     }
440    
441     /**
442     * Overridable method to perform an action upon phase advance, and
443     * to control termination. This method is invoked whenever the
444     * barrier is tripped (and thus all other waiting parties are
445     * dormant). If it returns true, then, rather than advance the
446     * phase number, this barrier will be set to a final termination
447     * state, and subsequent calls to <tt>isTerminated</tt> will
448     * return true.
449     *
450     * <p> The default version returns true when the number of
451     * registered parties is zero. Normally, overrides that arrange
452     * termination for other reasons should also preserve this
453     * property.
454     *
455     * @param phase the phase number on entering the barrier
456     * @param registeredParties the current number of registered
457     * parties.
458     * @return true if this barrier should terminate
459     */
460     protected boolean onAdvance(int phase, int registeredParties) {
461     return registeredParties <= 0;
462     }
463    
464     /**
465     * Returns a string identifying this barrier, as well as its
466     * state. The state, in brackets, includes the String {@code
467     * "phase ="} followed by the phase number, {@code "parties ="}
468     * followed by the number of registered parties, and {@code
469     * "arrived ="} followed by the number of arrived parties
470     *
471     * @return a string identifying this barrier, as well as its state
472     */
473     public String toString() {
474     long s = state.get();
475     return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]";
476     }
477    
478     // methods for tripping and waiting
479    
480     /**
481     * Advance the current phase (or terminate)
482     */
483     private void trip(int phase, int parties) {
484     int next = onAdvance(phase, parties)? -1 : ((phase + 1) & phaseMask);
485     state.set(stateFor(next, parties, parties));
486     if (head.get() != null)
487     releaseWaiters(next);
488     }
489    
490     private int helpingWait(int phase) {
491     final AtomicLong state = this.state;
492     int p;
493     while ((p = phaseOf(state.get())) == phase) {
494     ForkJoinTask<?> t = ForkJoinWorkerThread.pollTask();
495     if (t != null) {
496     if ((p = phaseOf(state.get())) == phase)
497     t.exec();
498     else { // push task and exit if barrier advanced
499     t.fork();
500     break;
501     }
502     }
503     }
504     return p;
505     }
506    
507     private int timedHelpingWait(int phase, long nanos) throws TimeoutException {
508     final AtomicLong state = this.state;
509     long lastTime = System.nanoTime();
510     int p;
511     while ((p = phaseOf(state.get())) == phase) {
512     long now = System.nanoTime();
513     nanos -= now - lastTime;
514     lastTime = now;
515     if (nanos <= 0) {
516     if ((p = phaseOf(state.get())) == phase)
517     throw new TimeoutException();
518     else
519     break;
520     }
521     ForkJoinTask<?> t = ForkJoinWorkerThread.pollTask();
522     if (t != null) {
523     if ((p = phaseOf(state.get())) == phase)
524     t.exec();
525     else { // push task and exit if barrier advanced
526     t.fork();
527     break;
528     }
529     }
530     }
531     return p;
532     }
533    
534     /**
535     * Wait nodes for Treiber stack representing wait queue for non-FJ
536     * tasks. The waiting scheme is an adaptation of the one used in
537     * forkjoin.PoolBarrier.
538     */
539     static final class QNode {
540     QNode next;
541     volatile Thread thread; // nulled to cancel wait
542     final int phase;
543     QNode(Thread t, int c) {
544     thread = t;
545     phase = c;
546     }
547     }
548    
549     private void releaseWaiters(int currentPhase) {
550     final AtomicReference<QNode> head = this.head;
551     QNode p;
552     while ((p = head.get()) != null && p.phase != currentPhase) {
553     if (head.compareAndSet(p, null)) {
554     do {
555     Thread t = p.thread;
556     if (t != null) {
557     p.thread = null;
558     LockSupport.unpark(t);
559     }
560     } while ((p = p.next) != null);
561     }
562     }
563     }
564    
565     /** The number of CPUs, for spin control */
566     static final int NCPUS = Runtime.getRuntime().availableProcessors();
567    
568     /**
569     * The number of times to spin before blocking in timed waits.
570 jsr166 1.2 * The value is empirically derived.
571 dl 1.1 */
572     static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
573    
574     /**
575     * The number of times to spin before blocking in untimed waits.
576     * This is greater than timed value because untimed waits spin
577     * faster since they don't need to check times on each spin.
578     */
579     static final int maxUntimedSpins = maxTimedSpins * 32;
580    
581     /**
582     * The number of nanoseconds for which it is faster to spin
583     * rather than to use timed park. A rough estimate suffices.
584     */
585     static final long spinForTimeoutThreshold = 1000L;
586    
587     /**
588     * Enqueues node and waits unless aborted or signalled.
589     */
590     private boolean untimedWait(Thread thread, int currentPhase,
591     boolean abortOnInterrupt) {
592     final AtomicReference<QNode> head = this.head;
593     final AtomicLong state = this.state;
594     boolean wasInterrupted = false;
595     QNode node = null;
596     boolean queued = false;
597     int spins = maxUntimedSpins;
598     while (phaseOf(state.get()) == currentPhase) {
599     QNode h;
600     if (node != null && queued) {
601     if (node.thread != null) {
602     LockSupport.park();
603     if (Thread.interrupted()) {
604     wasInterrupted = true;
605     if (abortOnInterrupt)
606     break;
607     }
608     }
609     }
610     else if ((h = head.get()) != null && h.phase != currentPhase) {
611     if (phaseOf(state.get()) == currentPhase) { // must recheck
612     if (head.compareAndSet(h, h.next)) {
613     Thread t = h.thread; // help clear out old waiters
614     if (t != null) {
615     h.thread = null;
616     LockSupport.unpark(t);
617     }
618     }
619     }
620     else
621     break;
622     }
623     else if (node != null)
624     queued = head.compareAndSet(node.next = h, node);
625     else if (spins <= 0)
626     node = new QNode(thread, currentPhase);
627     else
628     --spins;
629     }
630     if (node != null)
631     node.thread = null;
632     return wasInterrupted;
633     }
634    
635     /**
636     * Messier timeout version
637     */
638     private void timedWait(Thread thread, int currentPhase, long nanos)
639     throws InterruptedException, TimeoutException {
640     final AtomicReference<QNode> head = this.head;
641     final AtomicLong state = this.state;
642     long lastTime = System.nanoTime();
643     QNode node = null;
644     boolean queued = false;
645     int spins = maxTimedSpins;
646     while (phaseOf(state.get()) == currentPhase) {
647     QNode h;
648     long now = System.nanoTime();
649     nanos -= now - lastTime;
650     lastTime = now;
651     if (nanos <= 0) {
652     if (node != null)
653     node.thread = null;
654     if (phaseOf(state.get()) == currentPhase)
655     throw new TimeoutException();
656     else
657     break;
658     }
659     else if (node != null && queued) {
660     if (node.thread != null &&
661     nanos > spinForTimeoutThreshold) {
662     // LockSupport.parkNanos(this, nanos);
663     LockSupport.parkNanos(nanos);
664     if (Thread.interrupted()) {
665     node.thread = null;
666     throw new InterruptedException();
667     }
668     }
669     }
670     else if ((h = head.get()) != null && h.phase != currentPhase) {
671     if (phaseOf(state.get()) == currentPhase) { // must recheck
672     if (head.compareAndSet(h, h.next)) {
673     Thread t = h.thread; // help clear out old waiters
674     if (t != null) {
675     h.thread = null;
676     LockSupport.unpark(t);
677     }
678     }
679     }
680     else
681     break;
682     }
683     else if (node != null)
684     queued = head.compareAndSet(node.next = h, node);
685     else if (spins <= 0)
686     node = new QNode(thread, currentPhase);
687     else
688     --spins;
689     }
690     if (node != null)
691     node.thread = null;
692     }
693    
694     }
695