ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.3
Committed: Fri Jul 25 18:11:53 2008 UTC (15 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.2: +16 -17 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import jsr166y.forkjoin.*;
9 import java.util.concurrent.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.concurrent.locks.LockSupport;
12
13 /**
14 * A reusable synchronization barrier, similar in functionality to a
15 * {@link java.util.concurrent.CyclicBarrier}, but supporting more
16 * flexible usage.
17 *
18 * <ul>
19 *
20 * <li> The number of parties synchronizing on the barrier may vary
21 * over time. A task may register to be a party in a barrier at any
22 * time, and may deregister upon arriving at the barrier. As is the
23 * case with most basic synchronization constructs, registration
24 * and deregistration affect only internal counts; they do not
25 * establish any further internal bookkeeping, so tasks cannot query
26 * whether they are registered.
27 *
28 * <li> Each generation has an associated phase value, starting at
29 * zero, and advancing when all parties reach the barrier (wrapping
30 * around to zero after reaching <tt>Integer.MAX_VALUE</tt>).
31 *
32 * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
33 * Method <tt>arriveAndAwaitAdvance</tt> has effect analogous to
34 * <tt>CyclicBarrier.await</tt>. However, Phasers separate two
35 * aspects of coordination, that may be invoked independently:
36 *
37 * <ul>
38 *
39 * <li> Arriving at a barrier. Methods <tt>arrive</tt> and
40 * <tt>arriveAndDeregister</tt> do not block, but return
41 * the phase value on entry to the method.
42 *
43 * <li> Awaiting others. Method <tt>awaitAdvance</tt> requires an
44 * argument indicating the entry phase, and returns when the
45 * barrier advances to a new phase.
46 * </ul>
47 *
48 *
49 * <li> Barrier actions, performed by the task triggering a phase
50 * advance while others may be waiting, are arranged by overriding
51 * method <tt>onAdvance</tt>, that also controls termination.
52 *
53 * <li> Phasers may enter a <em>termination</em> state in which all
54 * await actions immediately return, indicating (via a negative phase
55 * value) that execution is complete. Termination is triggered by
56 * executing the overridable <tt>onAdvance</tt> method that is invoked
57 * each time the barrier is tripped. When a Phaser is controlling an
58 * action with a fixed number of iterations, it is often convenient to
59 * override this method to cause termination when the current phase
60 * number reaches a threshold. Method <tt>forceTermination</tt> is
61 * also available to assist recovery actions upon failure.
62 *
63 * <li> Unlike most synchronizers, a Phaser may also be used with
64 * ForkJoinTasks (as well as plain threads).
65 *
66 * <li> By default, <tt>awaitAdvance</tt> continues to wait even if
67 * the current thread is interrupted. And unlike the case in
68 * CyclicBarriers, exceptions encountered while tasks wait
69 * interruptibly or with timeout do not change the state of the
70 * barrier. If necessary, you can perform any associated recovery
71 * within handlers of those exceptions.
72 *
73 * </ul>
74 *
75 * <p><b>Sample usage:</b>
76 *
77 * <p>[todo: non-FJ example]
78 *
79 * <p> A Phaser may be used to support a style of programming in
80 * which a task waits for others to complete, without otherwise
81 * needing to keep track of which tasks it is waiting for. This is
82 * similar to the "sync" construct in Cilk and "clocks" in X10.
83 * Special constructions based on such barriers are available using
84 * the <tt>LinkedAsyncAction</tt> and <tt>CyclicAction</tt> classes,
85 * but they can be useful in other contexts as well. For a simple
86 * (but not very useful) example, here is a variant of Fibonacci:
87 *
88 * <pre>
89 * class BarrierFibonacci extends RecursiveAction {
90 * int argument, result;
91 * final Phaser parentBarrier;
92 * BarrierFibonacci(int n, Phaser parentBarrier) {
93 * this.argument = n;
94 * this.parentBarrier = parentBarrier;
95 * parentBarrier.register();
96 * }
97 * protected void compute() {
98 * int n = argument;
99 * if (n &lt;= 1)
100 * result = n;
101 * else {
102 * Phaser childBarrier = new Phaser(1);
103 * BarrierFibonacci f1 = new BarrierFibonacci(n - 1, childBarrier);
104 * BarrierFibonacci f2 = new BarrierFibonacci(n - 2, childBarrier);
105 * f1.fork();
106 * f2.fork();
107 * childBarrier.arriveAndAwait();
108 * result = f1.result + f2.result;
109 * }
110 * parentBarrier.arriveAndDeregister();
111 * }
112 * }
113 * </pre>
114 *
115 * <p><b>Implementation notes</b>: This implementation restricts the
116 * maximum number of parties to 65535. Attempts to register
117 * additional parties result in IllegalStateExceptions.
118 */
119 public class Phaser {
120 /*
121 * This class implements an extension of X10 "clocks". Thanks to
122 * Vijay Saraswat for the idea of applying it to ForkJoinTasks,
123 * and to Vivek Sarkar for enhancements to extend functionality.
124 */
125
126 /**
127 * Barrier state representation. Conceptually, a barrier contains
128 * four values:
129 *
130 * * parties -- the number of parties to wait (16 bits)
131 * * unarrived -- the number of parties yet to hit barrier (16 bits)
132 * * phase -- the generation of the barrier (31 bits)
133 * * terminated -- set if barrier is terminated (1 bit)
134 *
135 * However, to efficiently maintain atomicity, these values are
136 * packed into a single AtomicLong. Termination uses the sign bit
137 * of 32 bit representation of phase, so phase is set to -1 on
138 * termination.
139 */
140 private final AtomicLong state;
141
142 /**
143 * Head of Treiber stack for waiting nonFJ threads.
144 */
145 private final AtomicReference<QNode> head = new AtomicReference<QNode>();
146
147 private static final int ushortBits = 16;
148 private static final int ushortMask = (1 << ushortBits) - 1;
149 private static final int phaseMask = 0x7fffffff;
150
151 private static int unarrivedOf(long s) {
152 return (int)(s & ushortMask);
153 }
154
155 private static int partiesOf(long s) {
156 return (int)(s & (ushortMask << 16)) >>> 16;
157 }
158
159 private static int phaseOf(long s) {
160 return (int)(s >>> 32);
161 }
162
163 private static int arrivedOf(long s) {
164 return partiesOf(s) - unarrivedOf(s);
165 }
166
167 private static long stateFor(int phase, int parties, int unarrived) {
168 return (((long)phase) << 32) | ((parties << 16) | unarrived);
169 }
170
171 private static IllegalStateException badBounds(int parties, int unarrived) {
172 return new IllegalStateException("Attempt to set " + unarrived +
173 " unarrived of " + parties + " parties");
174 }
175
176 /**
177 * Creates a new Phaser without any initially registered parties,
178 * and initial phase number 0.
179 */
180 public Phaser() {
181 state = new AtomicLong(stateFor(0, 0, 0));
182 }
183
184 /**
185 * Creates a new Phaser with the given numbers of registered
186 * unarrived parties and initial phase number 0.
187 * @param parties the number of parties required to trip barrier.
188 * @throws IllegalArgumentException if parties less than zero
189 * or greater than the maximum number of parties supported.
190 */
191 public Phaser(int parties) {
192 if (parties < 0 || parties > ushortMask)
193 throw new IllegalArgumentException("Illegal number of parties");
194 state = new AtomicLong(stateFor(0, parties, parties));
195 }
196
197 /**
198 * Adds a new unarrived party to this phaser.
199 * @return the current barrier phase number upon registration
200 * @throws IllegalStateException if attempting to register more
201 * than the maximum supported number of parties.
202 */
203 public int register() { // increment both parties and unarrived
204 final AtomicLong state = this.state;
205 for (;;) {
206 long s = state.get();
207 int phase = phaseOf(s);
208 int parties = partiesOf(s) + 1;
209 int unarrived = unarrivedOf(s) + 1;
210 if (parties > ushortMask || unarrived > ushortMask)
211 throw badBounds(parties, unarrived);
212 if (state.compareAndSet(s, stateFor(phase, parties, unarrived)))
213 return phase;
214 }
215 }
216
217 /**
218 * Arrives at the barrier, but does not wait for others. (You can
219 * in turn wait for others via {@link #awaitAdvance}).
220 *
221 * @return the current barrier phase number upon entry to
222 * this method, or a negative value if terminated;
223 * @throws IllegalStateException if the number of unarrived
224 * parties would become negative.
225 */
226 public int arrive() { // decrement unarrived. If zero, trip
227 final AtomicLong state = this.state;
228 for (;;) {
229 long s = state.get();
230 int phase = phaseOf(s);
231 int parties = partiesOf(s);
232 int unarrived = unarrivedOf(s) - 1;
233 if (unarrived < 0)
234 throw badBounds(parties, unarrived);
235 if (unarrived == 0 && phase >= 0) {
236 trip(phase, parties);
237 return phase;
238 }
239 if (state.compareAndSet(s, stateFor(phase, parties, unarrived)))
240 return phase;
241 }
242 }
243
244 /**
245 * Arrives at the barrier, and deregisters from it, without
246 * waiting for others.
247 *
248 * @return the current barrier phase number upon entry to
249 * this method, or a negative value if terminated;
250 * @throws IllegalStateException if the number of registered or
251 * unarrived parties would become negative.
252 */
253 public int arriveAndDeregister() { // Same as arrive, plus decrement parties
254 final AtomicLong state = this.state;
255 for (;;) {
256 long s = state.get();
257 int phase = phaseOf(s);
258 int parties = partiesOf(s) - 1;
259 int unarrived = unarrivedOf(s) - 1;
260 if (parties < 0 || unarrived < 0)
261 throw badBounds(parties, unarrived);
262 if (unarrived == 0 && phase >= 0) {
263 trip(phase, parties);
264 return phase;
265 }
266 if (state.compareAndSet(s, stateFor(phase, parties, unarrived)))
267 return phase;
268 }
269 }
270
271 /**
272 * Arrives at the barrier and awaits others. Unlike other arrival
273 * methods, this method returns the arrival index of the
274 * caller. The caller tripping the barrier returns zero, the
275 * previous caller 1, and so on.
276 * @return the arrival index
277 * @throws IllegalStateException if the number of unarrived
278 * parties would become negative.
279 */
280 public int arriveAndAwaitAdvance() {
281 final AtomicLong state = this.state;
282 for (;;) {
283 long s = state.get();
284 int phase = phaseOf(s);
285 int parties = partiesOf(s);
286 int unarrived = unarrivedOf(s) - 1;
287 if (unarrived < 0)
288 throw badBounds(parties, unarrived);
289 if (unarrived == 0 && phase >= 0) {
290 trip(phase, parties);
291 return 0;
292 }
293 if (state.compareAndSet(s, stateFor(phase, parties, unarrived))) {
294 awaitAdvance(phase);
295 return unarrived;
296 }
297 }
298 }
299
300 /**
301 * Awaits the phase of the barrier to advance from the given
302 * value, or returns immediately if this barrier is terminated.
303 * @param phase the phase on entry to this method
304 * @return the phase on exit from this method
305 */
306 public int awaitAdvance(int phase) {
307 if (phase < 0)
308 return phase;
309 Thread current = Thread.currentThread();
310 if (current instanceof ForkJoinWorkerThread)
311 return helpingWait(phase);
312 if (untimedWait(current, phase, false))
313 current.interrupt();
314 return phaseOf(state.get());
315 }
316
317 /**
318 * Awaits the phase of the barrier to advance from the given
319 * value, or returns immediately if this barrier is terminated, or
320 * throws InterruptedException if interrupted while waiting.
321 * @param phase the phase on entry to this method
322 * @return the phase on exit from this method
323 * @throws InterruptedException if thread interrupted while waiting
324 */
325 public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
326 if (phase < 0)
327 return phase;
328 Thread current = Thread.currentThread();
329 if (current instanceof ForkJoinWorkerThread)
330 return helpingWait(phase);
331 else if (Thread.interrupted() || untimedWait(current, phase, true))
332 throw new InterruptedException();
333 else
334 return phaseOf(state.get());
335 }
336
337 /**
338 * Awaits the phase of the barrier to advance from the given value
339 * or the given timeout elapses, or returns immediately if this
340 * barrier is terminated.
341 * @param phase the phase on entry to this method
342 * @return the phase on exit from this method
343 * @throws InterruptedException if thread interrupted while waiting
344 * @throws TimeoutException if timed out while waiting
345 */
346 public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
347 throws InterruptedException, TimeoutException {
348 if (phase < 0)
349 return phase;
350 long nanos = unit.toNanos(timeout);
351 Thread current = Thread.currentThread();
352 if (current instanceof ForkJoinWorkerThread)
353 return timedHelpingWait(phase, nanos);
354 timedWait(current, phase, nanos);
355 return phaseOf(state.get());
356 }
357
358 /**
359 * Forces this barrier to enter termination state. Counts of
360 * arrived and registered parties are unaffected. This method may
361 * be useful for coordinating recovery after one or more tasks
362 * encounter unexpected exceptions.
363 */
364 public void forceTermination() {
365 final AtomicLong state = this.state;
366 for (;;) {
367 long s = state.get();
368 int phase = phaseOf(s);
369 int parties = partiesOf(s);
370 int unarrived = unarrivedOf(s);
371 if (phase < 0 ||
372 state.compareAndSet(s, stateFor(-1, parties, unarrived))) {
373 if (head.get() != null)
374 releaseWaiters(-1);
375 return;
376 }
377 }
378 }
379
380 /**
381 * Resets the barrier with the given numbers of registered unarrived
382 * parties and phase number 0. This method allows repeated reuse
383 * of this barrier, but only if it is somehow known not to be in
384 * use for other purposes.
385 * @param parties the number of parties required to trip barrier.
386 * @throws IllegalArgumentException if parties less than zero
387 * or greater than the maximum number of parties supported.
388 */
389 public void reset(int parties) {
390 if (parties < 0 || parties > ushortMask)
391 throw new IllegalArgumentException("Illegal number of parties");
392 state.set(stateFor(0, parties, parties));
393 if (head.get() != null)
394 releaseWaiters(0);
395 }
396
397 /**
398 * Returns the current phase number. The maximum phase number is
399 * <tt>Integer.MAX_VALUE</tt>, after which it restarts at
400 * zero. Upon termination, the phase number is negative.
401 * @return the phase number, or a negative value if terminated
402 */
403 public int getPhase() {
404 return phaseOf(state.get());
405 }
406
407 /**
408 * Returns the number of parties registered at this barrier.
409 * @return the number of parties
410 */
411 public int getRegisteredParties() {
412 return partiesOf(state.get());
413 }
414
415 /**
416 * Returns the number of parties that have arrived at the current
417 * phase of this barrier.
418 * @return the number of arrived parties
419 */
420 public int getArrivedParties() {
421 return arrivedOf(state.get());
422 }
423
424 /**
425 * Returns the number of registered parties that have not yet
426 * arrived at the current phase of this barrier.
427 * @return the number of unarrived parties
428 */
429 public int getUnarrivedParties() {
430 return unarrivedOf(state.get());
431 }
432
433 /**
434 * Returns true if this barrier has been terminated.
435 * @return true if this barrier has been terminated
436 */
437 public boolean isTerminated() {
438 return phaseOf(state.get()) < 0;
439 }
440
441 /**
442 * Overridable method to perform an action upon phase advance, and
443 * to control termination. This method is invoked whenever the
444 * barrier is tripped (and thus all other waiting parties are
445 * dormant). If it returns true, then, rather than advance the
446 * phase number, this barrier will be set to a final termination
447 * state, and subsequent calls to <tt>isTerminated</tt> will
448 * return true.
449 *
450 * <p> The default version returns true when the number of
451 * registered parties is zero. Normally, overrides that arrange
452 * termination for other reasons should also preserve this
453 * property.
454 *
455 * @param phase the phase number on entering the barrier
456 * @param registeredParties the current number of registered
457 * parties.
458 * @return true if this barrier should terminate
459 */
460 protected boolean onAdvance(int phase, int registeredParties) {
461 return registeredParties <= 0;
462 }
463
464 /**
465 * Returns a string identifying this barrier, as well as its
466 * state. The state, in brackets, includes the String {@code
467 * "phase ="} followed by the phase number, {@code "parties ="}
468 * followed by the number of registered parties, and {@code
469 * "arrived ="} followed by the number of arrived parties
470 *
471 * @return a string identifying this barrier, as well as its state
472 */
473 public String toString() {
474 long s = state.get();
475 return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]";
476 }
477
478 // methods for tripping and waiting
479
480 /**
481 * Advance the current phase (or terminate)
482 */
483 private void trip(int phase, int parties) {
484 int next = onAdvance(phase, parties)? -1 : ((phase + 1) & phaseMask);
485 state.set(stateFor(next, parties, parties));
486 if (head.get() != null)
487 releaseWaiters(next);
488 }
489
490 private int helpingWait(int phase) {
491 final AtomicLong state = this.state;
492 int p;
493 while ((p = phaseOf(state.get())) == phase) {
494 ForkJoinTask<?> t = ForkJoinWorkerThread.pollTask();
495 if (t != null) {
496 if ((p = phaseOf(state.get())) == phase)
497 t.exec();
498 else { // push task and exit if barrier advanced
499 t.fork();
500 break;
501 }
502 }
503 }
504 return p;
505 }
506
507 private int timedHelpingWait(int phase, long nanos) throws TimeoutException {
508 final AtomicLong state = this.state;
509 long lastTime = System.nanoTime();
510 int p;
511 while ((p = phaseOf(state.get())) == phase) {
512 long now = System.nanoTime();
513 nanos -= now - lastTime;
514 lastTime = now;
515 if (nanos <= 0) {
516 if ((p = phaseOf(state.get())) == phase)
517 throw new TimeoutException();
518 else
519 break;
520 }
521 ForkJoinTask<?> t = ForkJoinWorkerThread.pollTask();
522 if (t != null) {
523 if ((p = phaseOf(state.get())) == phase)
524 t.exec();
525 else { // push task and exit if barrier advanced
526 t.fork();
527 break;
528 }
529 }
530 }
531 return p;
532 }
533
534 /**
535 * Wait nodes for Treiber stack representing wait queue for non-FJ
536 * tasks. The waiting scheme is an adaptation of the one used in
537 * forkjoin.PoolBarrier.
538 */
539 static final class QNode {
540 QNode next;
541 volatile Thread thread; // nulled to cancel wait
542 final int phase;
543 QNode(Thread t, int c) {
544 thread = t;
545 phase = c;
546 }
547 }
548
549 private void releaseWaiters(int currentPhase) {
550 final AtomicReference<QNode> head = this.head;
551 QNode p;
552 while ((p = head.get()) != null && p.phase != currentPhase) {
553 if (head.compareAndSet(p, null)) {
554 do {
555 Thread t = p.thread;
556 if (t != null) {
557 p.thread = null;
558 LockSupport.unpark(t);
559 }
560 } while ((p = p.next) != null);
561 }
562 }
563 }
564
565 /** The number of CPUs, for spin control */
566 static final int NCPUS = Runtime.getRuntime().availableProcessors();
567
568 /**
569 * The number of times to spin before blocking in timed waits.
570 * The value is empirically derived.
571 */
572 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
573
574 /**
575 * The number of times to spin before blocking in untimed waits.
576 * This is greater than timed value because untimed waits spin
577 * faster since they don't need to check times on each spin.
578 */
579 static final int maxUntimedSpins = maxTimedSpins * 32;
580
581 /**
582 * The number of nanoseconds for which it is faster to spin
583 * rather than to use timed park. A rough estimate suffices.
584 */
585 static final long spinForTimeoutThreshold = 1000L;
586
587 /**
588 * Enqueues node and waits unless aborted or signalled.
589 */
590 private boolean untimedWait(Thread thread, int currentPhase,
591 boolean abortOnInterrupt) {
592 final AtomicReference<QNode> head = this.head;
593 final AtomicLong state = this.state;
594 boolean wasInterrupted = false;
595 QNode node = null;
596 boolean queued = false;
597 int spins = maxUntimedSpins;
598 while (phaseOf(state.get()) == currentPhase) {
599 QNode h;
600 if (node != null && queued) {
601 if (node.thread != null) {
602 LockSupport.park();
603 if (Thread.interrupted()) {
604 wasInterrupted = true;
605 if (abortOnInterrupt)
606 break;
607 }
608 }
609 }
610 else if ((h = head.get()) != null && h.phase != currentPhase) {
611 if (phaseOf(state.get()) == currentPhase) { // must recheck
612 if (head.compareAndSet(h, h.next)) {
613 Thread t = h.thread; // help clear out old waiters
614 if (t != null) {
615 h.thread = null;
616 LockSupport.unpark(t);
617 }
618 }
619 }
620 else
621 break;
622 }
623 else if (node != null)
624 queued = head.compareAndSet(node.next = h, node);
625 else if (spins <= 0)
626 node = new QNode(thread, currentPhase);
627 else
628 --spins;
629 }
630 if (node != null)
631 node.thread = null;
632 return wasInterrupted;
633 }
634
635 /**
636 * Messier timeout version
637 */
638 private void timedWait(Thread thread, int currentPhase, long nanos)
639 throws InterruptedException, TimeoutException {
640 final AtomicReference<QNode> head = this.head;
641 final AtomicLong state = this.state;
642 long lastTime = System.nanoTime();
643 QNode node = null;
644 boolean queued = false;
645 int spins = maxTimedSpins;
646 while (phaseOf(state.get()) == currentPhase) {
647 QNode h;
648 long now = System.nanoTime();
649 nanos -= now - lastTime;
650 lastTime = now;
651 if (nanos <= 0) {
652 if (node != null)
653 node.thread = null;
654 if (phaseOf(state.get()) == currentPhase)
655 throw new TimeoutException();
656 else
657 break;
658 }
659 else if (node != null && queued) {
660 if (node.thread != null &&
661 nanos > spinForTimeoutThreshold) {
662 // LockSupport.parkNanos(this, nanos);
663 LockSupport.parkNanos(nanos);
664 if (Thread.interrupted()) {
665 node.thread = null;
666 throw new InterruptedException();
667 }
668 }
669 }
670 else if ((h = head.get()) != null && h.phase != currentPhase) {
671 if (phaseOf(state.get()) == currentPhase) { // must recheck
672 if (head.compareAndSet(h, h.next)) {
673 Thread t = h.thread; // help clear out old waiters
674 if (t != null) {
675 h.thread = null;
676 LockSupport.unpark(t);
677 }
678 }
679 }
680 else
681 break;
682 }
683 else if (node != null)
684 queued = head.compareAndSet(node.next = h, node);
685 else if (spins <= 0)
686 node = new QNode(thread, currentPhase);
687 else
688 --spins;
689 }
690 if (node != null)
691 node.thread = null;
692 }
693
694 }