ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
Revision: 1.4
Committed: Sat Sep 6 13:19:17 2008 UTC (15 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.3: +573 -354 lines
Log Message:
Support tiering; un-unify FJ vs Thread waiting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.util.concurrent.*;
9 import java.util.concurrent.atomic.*;
10 import java.util.concurrent.locks.LockSupport;
11 import sun.misc.Unsafe;
12 import java.lang.reflect.*;
13
14 /**
15 * A reusable synchronization barrier, similar in functionality to a
16 * {@link java.util.concurrent.CyclicBarrier} and {@link
17 * java.util.concurrent.CountDownLatch} but supporting more flexible
18 * usage.
19 *
20 * <ul>
21 *
22 * <li> The number of parties synchronizing on a phaser may vary over
23 * time. A task may register to be a party at any time, and may
24 * deregister upon arriving at the barrier. As is the case with most
25 * basic synchronization constructs, registration and deregistration
26 * affect only internal counts; they do not establish any further
27 * internal bookkeeping, so tasks cannot query whether they are
28 * registered. (However, you can introduce such bookkeeping in by
29 * subclassing this class.)
30 *
31 * <li> Each generation has an associated phase value, starting at
32 * zero, and advancing when all parties reach the barrier (wrapping
33 * around to zero after reaching <tt>Integer.MAX_VALUE</tt>).
34 *
35 * <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited.
36 * Method <tt>arriveAndAwaitAdvance</tt> has effect analogous to
37 * <tt>CyclicBarrier.await</tt>. However, Phasers separate two
38 * aspects of coordination, that may also be invoked independently:
39 *
40 * <ul>
41 *
42 * <li> Arriving at a barrier. Methods <tt>arrive</tt> and
43 * <tt>arriveAndDeregister</tt> do not block, but return
44 * the phase value current upon entry to the method.
45 *
46 * <li> Awaiting others. Method <tt>awaitAdvance</tt> requires an
47 * argument indicating the entry phase, and returns when the
48 * barrier advances to a new phase.
49 * </ul>
50 *
51 *
52 * <li> Barrier actions, performed by the task triggering a phase
53 * advance while others may be waiting, are arranged by overriding
54 * method <tt>onAdvance</tt>, that also controls termination.
55 * Overriding this method may be used to similar but more flecible
56 * effect as providing a barrier action to a CyclicBarrier.
57 *
58 * <li> Phasers may enter a <em>termination</em> state in which all
59 * await actions immediately return, indicating (via a negative phase
60 * value) that execution is complete. Termination is triggered by
61 * executing the overridable <tt>onAdvance</tt> method that is invoked
62 * each time the barrier is about to be tripped. When a Phaser is
63 * controlling an action with a fixed number of iterations, it is
64 * often convenient to override this method to cause termination when
65 * the current phase number reaches a threshold. Method
66 * <tt>forceTermination</tt> is also available to abruptly release
67 * waiting threads and allow them to terminate.
68 *
69 * <li> Phasers may be tiered to reduce contention. Phasers with large
70 * numbers of parties that would otherwise experience heavy
71 * synchronization contention costs may instead be arranged in trees.
72 * This will typically greatly increase throughput even though it
73 * incurs somewhat greater per-operation overhead.
74 *
75 * <li> By default, <tt>awaitAdvance</tt> continues to wait even if
76 * the waiting thread is interrupted. And unlike the case in
77 * CyclicBarriers, exceptions encountered while tasks wait
78 * interruptibly or with timeout do not change the state of the
79 * barrier. If necessary, you can perform any associated recovery
80 * within handlers of those exceptions, often after invoking
81 * <tt>forceTermination</tt>.
82 *
83 * </ul>
84 *
85 * <p><b>Sample usages:</b>
86 *
87 * <p>A Phaser may be used instead of a <tt>CountdownLatch</tt> to control
88 * a one-shot action serving a variable number of parties. The typical
89 * idiom is for the method setting this up to first register, then
90 * start the actions, then deregister, as in:
91 *
92 * <pre>
93 * void runTasks(List&lt;Runnable&gt; list) {
94 * final Phaser phaser = new Phaser(1); // "1" to register self
95 * for (Runnable r : list) {
96 * phaser.register();
97 * new Thread() {
98 * public void run() {
99 * phaser.arriveAndAwaitAdvance(); // await all creation
100 * r.run();
101 * phaser.arriveAndDeregister(); // signal completion
102 * }
103 * }.start();
104 * }
105 * phaser.arrive(); // allow threads to start
106 * int p = phaser.arriveAndDeregister(); // deregister self
107 * otherActions(); // do other things while tasks execute
108 * phaser.awaitAdvance(p); // wait for all tasks to arrive
109 * }
110 * </pre>
111 *
112 * <p>One way to cause a set of threads to repeatedly perform actions
113 * for a given number of iterations is to override <tt>onAdvance</tt>:
114 *
115 * <pre>
116 * void startTasks(List&lt;Runnable&gt; list, final int iterations) {
117 * final Phaser phaser = new Phaser() {
118 * public boolean onAdvance(int phase, int registeredParties) {
119 * return phase &gt;= iterations || registeredParties == 0;
120 * }
121 * };
122 * phaser.register();
123 * for (Runnable r : list) {
124 * phaser.register();
125 * new Thread() {
126 * public void run() {
127 * do {
128 * r.run();
129 * phaser.arriveAndAwaitAdvance();
130 * } while(!phaser.isTerminated();
131 * }
132 * }.start();
133 * }
134 * phaser.arriveAndDeregister(); // deregister self, don't wait
135 * }
136 * </pre>
137 *
138 * <p> To create a set of tasks using a tree of Phasers,
139 * you could use code of the following form, assuming a
140 * Task class with a constructor accepting a Phaser that
141 * it registers for upon construction:
142 * <pre>
143 * void build(Task[] actions, int lo, int hi, Phaser b) {
144 * int step = (hi - lo) / TASKS_PER_PHASER;
145 * if (step &gt; 1) {
146 * int i = lo;
147 * while (i &lt; hi) {
148 * int r = Math.min(i + step, hi);
149 * build(actions, i, r, new Phaser(b));
150 * i = r;
151 * }
152 * }
153 * else {
154 * for (int i = lo; i &lt; hi; ++i)
155 * actions[i] = new Task(b);
156 * // assumes new Task(b) performs b.register()
157 * }
158 * }
159 * // .. initially called, for n tasks via
160 * build(new Task[n], 0, n, new Phaser());
161 * </pre>
162 *
163 * The best value of <tt>TASKS_PER_PHASER</tt> depends mainly on
164 * expected barrier synchronization rates. A value as low as four may
165 * be appropriate for extremely small per-barrier task bodies (thus
166 * high rates), or up to hundreds for extremely large ones.
167 *
168 * </pre>
169 *
170 * <p><b>Implementation notes</b>: This implementation restricts the
171 * maximum number of parties to 65535. Attempts to register additional
172 * parties result in IllegalStateExceptions. However, you can and
173 * should create tiered phasers to accommodate arbitrarily large sets
174 * of participants.
175 */
176 public class Phaser {
177 /*
178 * This class implements an extension of X10 "clocks". Thanks to
179 * Vijay Saraswat for the idea, and to Vivek Sarkar for
180 * enhancements to extend functionality.
181 */
182
183 /**
184 * Barrier state representation. Conceptually, a barrier contains
185 * four values:
186 *
187 * * parties -- the number of parties to wait (16 bits)
188 * * unarrived -- the number of parties yet to hit barrier (16 bits)
189 * * phase -- the generation of the barrier (31 bits)
190 * * terminated -- set if barrier is terminated (1 bit)
191 *
192 * However, to efficiently maintain atomicity, these values are
193 * packed into a single (atomic) long. Termination uses the sign
194 * bit of 32 bit representation of phase, so phase is set to -1 on
195 * termination. Good performace relies on keeping state decoding
196 * and encoding simple, and keeping race windows short.
197 *
198 * Note: there are some cheats in arrive() that rely on unarrived
199 * being lowest 16 bits.
200 */
201 private volatile long state;
202
203 private static final int ushortBits = 16;
204 private static final int ushortMask = (1 << ushortBits) - 1;
205 private static final int phaseMask = 0x7fffffff;
206
207 private static int unarrivedOf(long s) {
208 return (int)(s & ushortMask);
209 }
210
211 private static int partiesOf(long s) {
212 return (int)(s & (ushortMask << 16)) >>> 16;
213 }
214
215 private static int phaseOf(long s) {
216 return (int)(s >>> 32);
217 }
218
219 private static int arrivedOf(long s) {
220 return partiesOf(s) - unarrivedOf(s);
221 }
222
223 private static long stateFor(int phase, int parties, int unarrived) {
224 return (((long)phase) << 32) | ((parties << 16) | unarrived);
225 }
226
227 private static long trippedStateFor(int phase, int parties) {
228 return (((long)phase) << 32) | ((parties << 16) | parties);
229 }
230
231 private static IllegalStateException badBounds(int parties, int unarrived) {
232 return new IllegalStateException
233 ("Attempt to set " + unarrived +
234 " unarrived of " + parties + " parties");
235 }
236
237 /**
238 * The parent of this phaser, or null if none
239 */
240 private final Phaser parent;
241
242 /**
243 * The root of Phaser tree. Equals this if not in a tree. Used to
244 * support faster state push-down.
245 */
246 private final Phaser root;
247
248 // Wait queues
249
250 /**
251 * Heads of Treiber stacks waiting for nonFJ threads. To eliminate
252 * contention while releasing some threads while adding others, we
253 * use two of them, alternating across even and odd phases.
254 */
255 private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
256 private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
257
258 private AtomicReference<QNode> queueFor(int phase) {
259 return (phase & 1) == 0? evenQ : oddQ;
260 }
261
262 /**
263 * Returns current state, first resolving lagged propagation from
264 * root if necessary.
265 */
266 private long getReconciledState() {
267 return parent == null? state : reconcileState();
268 }
269
270 /**
271 * Recursively resolves state.
272 */
273 private long reconcileState() {
274 Phaser p = parent;
275 long s = state;
276 if (p != null) {
277 while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
278 long parentState = p.getReconciledState();
279 int parentPhase = phaseOf(parentState);
280 int phase = phaseOf(s = state);
281 if (phase != parentPhase) {
282 long next = trippedStateFor(parentPhase, partiesOf(s));
283 if (casState(s, next)) {
284 releaseWaiters(phase);
285 s = next;
286 }
287 }
288 }
289 }
290 return s;
291 }
292
293 /**
294 * Creates a new Phaser without any initially registered parties,
295 * initial phase number 0, and no parent.
296 */
297 public Phaser() {
298 this(null);
299 }
300
301 /**
302 * Creates a new Phaser with the given numbers of registered
303 * unarrived parties, initial phase number 0, and no parent.
304 * @param parties the number of parties required to trip barrier.
305 * @throws IllegalArgumentException if parties less than zero
306 * or greater than the maximum number of parties supported.
307 */
308 public Phaser(int parties) {
309 this(null, parties);
310 }
311
312 /**
313 * Creates a new Phaser with the given parent, without any
314 * initially registered parties. If parent is non-null this phaser
315 * is registered with the parent and its initial phase number is
316 * the same as that of parent phaser.
317 * @param parent the parent phaser.
318 */
319 public Phaser(Phaser parent) {
320 int phase = 0;
321 this.parent = parent;
322 if (parent != null) {
323 this.root = parent.root;
324 phase = parent.register();
325 }
326 else
327 this.root = this;
328 this.state = trippedStateFor(phase, 0);
329 }
330
331 /**
332 * Creates a new Phaser with the given parent and numbers of
333 * registered unarrived parties. If parent is non-null this phaser
334 * is registered with the parent and its initial phase number is
335 * the same as that of parent phaser.
336 * @param parent the parent phaser.
337 * @param parties the number of parties required to trip barrier.
338 * @throws IllegalArgumentException if parties less than zero
339 * or greater than the maximum number of parties supported.
340 */
341 public Phaser(Phaser parent, int parties) {
342 if (parties < 0 || parties > ushortMask)
343 throw new IllegalArgumentException("Illegal number of parties");
344 int phase = 0;
345 this.parent = parent;
346 if (parent != null) {
347 this.root = parent.root;
348 phase = parent.register();
349 }
350 else
351 this.root = this;
352 this.state = trippedStateFor(phase, parties);
353 }
354
355 /**
356 * Adds a new unarrived party to this phaser.
357 * @return the current barrier phase number upon registration
358 * @throws IllegalStateException if attempting to register more
359 * than the maximum supported number of parties.
360 */
361 public int register() {
362 return doRegister(1);
363 }
364
365 /**
366 * Adds the given number of new unarrived parties to this phaser.
367 * @param parties the number of parties required to trip barrier.
368 * @return the current barrier phase number upon registration
369 * @throws IllegalStateException if attempting to register more
370 * than the maximum supported number of parties.
371 */
372 public int bulkRegister(int parties) {
373 if (parties < 0)
374 throw new IllegalArgumentException();
375 if (parties == 0)
376 return getPhase();
377 return doRegister(parties);
378 }
379
380 /**
381 * Shared code for register, bulkRegister
382 */
383 private int doRegister(int registrations) {
384 int phase;
385 for (;;) {
386 long s = getReconciledState();
387 phase = phaseOf(s);
388 int unarrived = unarrivedOf(s) + registrations;
389 int parties = partiesOf(s) + registrations;
390 if (phase < 0)
391 break;
392 if (parties > ushortMask || unarrived > ushortMask)
393 throw badBounds(parties, unarrived);
394 if (phase == phaseOf(root.state) &&
395 casState(s, stateFor(phase, parties, unarrived)))
396 break;
397 }
398 return phase;
399 }
400
401 /**
402 * Arrives at the barrier, but does not wait for others. (You can
403 * in turn wait for others via {@link #awaitAdvance}).
404 *
405 * @return the barrier phase number upon entry to this method, or a
406 * negative value if terminated;
407 * @throws IllegalStateException if not terminated and the number
408 * of unarrived parties would become negative.
409 */
410 public int arrive() {
411 int phase;
412 for (;;) {
413 long s = state;
414 phase = phaseOf(s);
415 int parties = partiesOf(s);
416 int unarrived = unarrivedOf(s) - 1;
417 if (unarrived > 0) { // Not the last arrival
418 if (casState(s, s - 1)) // s-1 adds one arrival
419 break;
420 }
421 else if (unarrived == 0) { // the last arrival
422 Phaser par = parent;
423 if (par == null) { // directly trip
424 if (casState
425 (s,
426 trippedStateFor(onAdvance(phase, parties)? -1 :
427 ((phase + 1) & phaseMask), parties))) {
428 releaseWaiters(phase);
429 break;
430 }
431 }
432 else { // cascade to parent
433 if (casState(s, s - 1)) { // zeroes unarrived
434 par.arrive();
435 reconcileState();
436 break;
437 }
438 }
439 }
440 else if (phase < 0) // Don't throw exception if terminated
441 break;
442 else if (phase != phaseOf(root.state)) // or if unreconciled
443 reconcileState();
444 else
445 throw badBounds(parties, unarrived);
446 }
447 return phase;
448 }
449
450 /**
451 * Arrives at the barrier, and deregisters from it, without
452 * waiting for others. Deregistration reduces number of parties
453 * required to trip the barrier in future phases. If this phaser
454 * has a parent, and deregistration causes this phaser to have
455 * zero parties, this phaser is also deregistered from its parent.
456 *
457 * @return the current barrier phase number upon entry to
458 * this method, or a negative value if terminated;
459 * @throws IllegalStateException if not terminated and the number
460 * of registered or unarrived parties would become negative.
461 */
462 public int arriveAndDeregister() {
463 // similar code to arrive, but too different to merge
464 Phaser par = parent;
465 int phase;
466 for (;;) {
467 long s = state;
468 phase = phaseOf(s);
469 int parties = partiesOf(s) - 1;
470 int unarrived = unarrivedOf(s) - 1;
471 if (parties >= 0) {
472 if (unarrived > 0 || (unarrived == 0 && par != null)) {
473 if (casState
474 (s,
475 stateFor(phase, parties, unarrived))) {
476 if (unarrived == 0) {
477 par.arriveAndDeregister();
478 reconcileState();
479 }
480 break;
481 }
482 continue;
483 }
484 if (unarrived == 0) {
485 if (casState
486 (s,
487 trippedStateFor(onAdvance(phase, parties)? -1 :
488 ((phase + 1) & phaseMask), parties))) {
489 releaseWaiters(phase);
490 break;
491 }
492 continue;
493 }
494 if (phase < 0)
495 break;
496 if (par != null && phase != phaseOf(root.state)) {
497 reconcileState();
498 continue;
499 }
500 }
501 throw badBounds(parties, unarrived);
502 }
503 return phase;
504 }
505
506 /**
507 * Arrives at the barrier and awaits others. Equivalent in effect
508 * to <tt>awaitAdvance(arrive())</tt>. If you instead need to
509 * await with interruption of timeout, and/or deregister upon
510 * arrival, you can arrange them using analogous constructions.
511 * @return the phase on entry to this method
512 * @throws IllegalStateException if not terminated and the number
513 * of unarrived parties would become negative.
514 */
515 public int arriveAndAwaitAdvance() {
516 return awaitAdvance(arrive());
517 }
518
519 /**
520 * Awaits the phase of the barrier to advance from the given
521 * value, or returns immediately if argument is negative or this
522 * barrier is terminated.
523 * @param phase the phase on entry to this method
524 * @return the phase on exit from this method
525 */
526 public int awaitAdvance(int phase) {
527 if (phase < 0)
528 return phase;
529 long s = getReconciledState();
530 int p = phaseOf(s);
531 if (p != phase)
532 return p;
533 if (unarrivedOf(s) == 0)
534 parent.awaitAdvance(phase);
535 // Fall here even if parent waited, to reconcile and help release
536 return untimedWait(phase);
537 }
538
539 /**
540 * Awaits the phase of the barrier to advance from the given
541 * value, or returns immediately if argumet is negative or this
542 * barrier is terminated, or throws InterruptedException if
543 * interrupted while waiting.
544 * @param phase the phase on entry to this method
545 * @return the phase on exit from this method
546 * @throws InterruptedException if thread interrupted while waiting
547 */
548 public int awaitAdvanceInterruptibly(int phase) throws InterruptedException {
549 if (phase < 0)
550 return phase;
551 long s = getReconciledState();
552 int p = phaseOf(s);
553 if (p != phase)
554 return p;
555 if (unarrivedOf(s) != 0)
556 parent.awaitAdvanceInterruptibly(phase);
557 return interruptibleWait(phase);
558 }
559
560 /**
561 * Awaits the phase of the barrier to advance from the given value
562 * or the given timeout elapses, or returns immediately if
563 * argument is negative or this barrier is terminated.
564 * @param phase the phase on entry to this method
565 * @return the phase on exit from this method
566 * @throws InterruptedException if thread interrupted while waiting
567 * @throws TimeoutException if timed out while waiting
568 */
569 public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
570 throws InterruptedException, TimeoutException {
571 if (phase < 0)
572 return phase;
573 long s = getReconciledState();
574 int p = phaseOf(s);
575 if (p != phase)
576 return p;
577 if (unarrivedOf(s) == 0)
578 parent.awaitAdvanceInterruptibly(phase, timeout, unit);
579 return timedWait(phase, unit.toNanos(timeout));
580 }
581
582 /**
583 * Forces this barrier to enter termination state. Counts of
584 * arrived and registered parties are unaffected. If this phaser
585 * has a parent, it too is terminated. This method may be useful
586 * for coordinating recovery after one or more tasks encounter
587 * unexpected exceptions.
588 */
589 public void forceTermination() {
590 for (;;) {
591 long s = getReconciledState();
592 int phase = phaseOf(s);
593 int parties = partiesOf(s);
594 int unarrived = unarrivedOf(s);
595 if (phase < 0 ||
596 casState(s, stateFor(-1, parties, unarrived))) {
597 releaseWaiters(0);
598 releaseWaiters(1);
599 if (parent != null)
600 parent.forceTermination();
601 return;
602 }
603 }
604 }
605
606 /**
607 * Returns the current phase number. The maximum phase number is
608 * <tt>Integer.MAX_VALUE</tt>, after which it restarts at
609 * zero. Upon termination, the phase number is negative.
610 * @return the phase number, or a negative value if terminated
611 */
612 public final int getPhase() {
613 return phaseOf(getReconciledState());
614 }
615
616 /**
617 * Returns true if the current phase number equals the given phase.
618 * @param phase the phase
619 * @return true if the current phase number equals the given phase.
620 */
621 public final boolean hasPhase(int phase) {
622 return phaseOf(getReconciledState()) == phase;
623 }
624
625 /**
626 * Returns the number of parties registered at this barrier.
627 * @return the number of parties
628 */
629 public int getRegisteredParties() {
630 return partiesOf(state);
631 }
632
633 /**
634 * Returns the number of parties that have arrived at the current
635 * phase of this barrier.
636 * @return the number of arrived parties
637 */
638 public int getArrivedParties() {
639 return arrivedOf(state);
640 }
641
642 /**
643 * Returns the number of registered parties that have not yet
644 * arrived at the current phase of this barrier.
645 * @return the number of unarrived parties
646 */
647 public int getUnarrivedParties() {
648 return unarrivedOf(state);
649 }
650
651 /**
652 * Returns the parent of this phaser, or null if none.
653 * @return the parent of this phaser, or null if none.
654 */
655 public Phaser getParent() {
656 return parent;
657 }
658
659 /**
660 * Returns the root ancestor of this phaser, which is the same as
661 * this phaser if it has no parent.
662 * @return the root ancestor of this phaser.
663 */
664 public Phaser getRoot() {
665 return root;
666 }
667
668 /**
669 * Returns true if this barrier has been terminated.
670 * @return true if this barrier has been terminated
671 */
672 public boolean isTerminated() {
673 return getPhase() < 0;
674 }
675
676 /**
677 * Overridable method to perform an action upon phase advance, and
678 * to control termination. This method is invoked whenever the
679 * barrier is tripped (and thus all other waiting parties are
680 * dormant). If it returns true, then, rather than advance the
681 * phase number, this barrier will be set to a final termination
682 * state, and subsequent calls to <tt>isTerminated</tt> will
683 * return true.
684 *
685 * <p> The default version returns true when the number of
686 * registered parties is zero. Normally, overrides that arrange
687 * termination for other reasons should also preserve this
688 * property.
689 *
690 * <p> You may override this method to perform an action with side
691 * effects visible to participating tasks, but it is in general
692 * only sensible to do so in designs where all parties register
693 * before any arrive, and all <tt>awaitAdvance</tt> at each phase.
694 * Otherwise, you cannot ensure lack of interference. In
695 * particular, this method may be invoked more than once per
696 * transition if other parties successfully register while the
697 * invocation of this method is in progress, thus postponing the
698 * transition until those parties also arrive, re-triggering this
699 * method.
700 *
701 * @param phase the phase number on entering the barrier
702 * @param registeredParties the current number of registered
703 * parties.
704 * @return true if this barrier should terminate
705 */
706 protected boolean onAdvance(int phase, int registeredParties) {
707 return registeredParties <= 0;
708 }
709
710 /**
711 * Returns a string identifying this phaser, as well as its
712 * state. The state, in brackets, includes the String {@code
713 * "phase ="} followed by the phase number, {@code "parties ="}
714 * followed by the number of registered parties, and {@code
715 * "arrived ="} followed by the number of arrived parties
716 *
717 * @return a string identifying this barrier, as well as its state
718 */
719 public String toString() {
720 long s = getReconciledState();
721 return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]";
722 }
723
724 // methods for waiting
725
726 /** The number of CPUs, for spin control */
727 static final int NCPUS = Runtime.getRuntime().availableProcessors();
728
729 /**
730 * The number of times to spin before blocking in timed waits.
731 * The value is empirically derived.
732 */
733 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
734
735 /**
736 * The number of times to spin before blocking in untimed waits.
737 * This is greater than timed value because untimed waits spin
738 * faster since they don't need to check times on each spin.
739 */
740 static final int maxUntimedSpins = maxTimedSpins * 32;
741
742 /**
743 * The number of nanoseconds for which it is faster to spin
744 * rather than to use timed park. A rough estimate suffices.
745 */
746 static final long spinForTimeoutThreshold = 1000L;
747
748 /**
749 * Wait nodes for Treiber stack representing wait queue for non-FJ
750 * tasks.
751 */
752 static final class QNode {
753 QNode next;
754 volatile Thread thread; // nulled to cancel wait
755 QNode() {
756 thread = Thread.currentThread();
757 }
758 void signal() {
759 Thread t = thread;
760 if (t != null) {
761 thread = null;
762 LockSupport.unpark(t);
763 }
764 }
765 }
766
767 /**
768 * Removes and signals waiting threads from wait queue
769 */
770 private void releaseWaiters(int phase) {
771 AtomicReference<QNode> head = queueFor(phase);
772 QNode q;
773 while ((q = head.get()) != null) {
774 if (head.compareAndSet(q, q.next))
775 q.signal();
776 }
777 }
778
779 /**
780 * Enqueues node and waits unless aborted or signalled.
781 */
782 private int untimedWait(int phase) {
783 int spins = maxUntimedSpins;
784 QNode node = null;
785 boolean interrupted = false;
786 boolean queued = false;
787 int p;
788 while ((p = getPhase()) == phase) {
789 interrupted = Thread.interrupted();
790 if (node != null) {
791 if (!queued) {
792 AtomicReference<QNode> head = queueFor(phase);
793 queued = head.compareAndSet(node.next = head.get(), node);
794 }
795 else if (node.thread != null)
796 LockSupport.park(this);
797 }
798 else if (spins <= 0)
799 node = new QNode();
800 else
801 --spins;
802 }
803 if (node != null)
804 node.thread = null;
805 if (interrupted)
806 Thread.currentThread().interrupt();
807 releaseWaiters(phase);
808 return p;
809 }
810
811 /**
812 * Messier interruptible version
813 */
814 private int interruptibleWait(int phase) throws InterruptedException {
815 int spins = maxUntimedSpins;
816 QNode node = null;
817 boolean queued = false;
818 boolean interrupted = false;
819 int p;
820 while ((p = getPhase()) == phase) {
821 if (interrupted = Thread.interrupted())
822 break;
823 if (node != null) {
824 if (!queued) {
825 AtomicReference<QNode> head = queueFor(phase);
826 queued = head.compareAndSet(node.next = head.get(), node);
827 }
828 else if (node.thread != null)
829 LockSupport.park(this);
830 }
831 else if (spins <= 0)
832 node = new QNode();
833 else
834 --spins;
835 }
836 if (node != null)
837 node.thread = null;
838 if (interrupted)
839 throw new InterruptedException();
840 releaseWaiters(phase);
841 return p;
842 }
843
844 /**
845 * Even messier timeout version.
846 */
847 private int timedWait(int phase, long nanos)
848 throws InterruptedException, TimeoutException {
849 int p;
850 if ((p = getPhase()) == phase) {
851 long lastTime = System.nanoTime();
852 int spins = maxTimedSpins;
853 QNode node = null;
854 boolean queued = false;
855 boolean interrupted = false;
856 while ((p = getPhase()) == phase) {
857 if (interrupted = Thread.interrupted())
858 break;
859 long now = System.nanoTime();
860 if ((nanos -= now - lastTime) <= 0)
861 break;
862 lastTime = now;
863 if (node != null) {
864 if (!queued) {
865 AtomicReference<QNode> head = queueFor(phase);
866 queued = head.compareAndSet(node.next = head.get(), node);
867 }
868 else if (node.thread != null &&
869 nanos > spinForTimeoutThreshold) {
870 LockSupport.parkNanos(this, nanos);
871 }
872 }
873 else if (spins <= 0)
874 node = new QNode();
875 else
876 --spins;
877 }
878 if (node != null)
879 node.thread = null;
880 if (interrupted)
881 throw new InterruptedException();
882 if (p == phase && (p = getPhase()) == phase)
883 throw new TimeoutException();
884 }
885 releaseWaiters(phase);
886 return p;
887 }
888
889 // Temporary Unsafe mechanics for preliminary release
890
891 static final Unsafe _unsafe;
892 static final long stateOffset;
893
894 static {
895 try {
896 if (Phaser.class.getClassLoader() != null) {
897 Field f = Unsafe.class.getDeclaredField("theUnsafe");
898 f.setAccessible(true);
899 _unsafe = (Unsafe)f.get(null);
900 }
901 else
902 _unsafe = Unsafe.getUnsafe();
903 stateOffset = _unsafe.objectFieldOffset
904 (Phaser.class.getDeclaredField("state"));
905 } catch (Exception e) {
906 throw new RuntimeException("Could not initialize intrinsics", e);
907 }
908 }
909
910 final boolean casState(long cmp, long val) {
911 return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val);
912 }
913 }