28 |
|
* <li> Each generation has an associated phase value, starting at |
29 |
|
* zero, and advancing when all parties reach the barrier (wrapping |
30 |
|
* around to zero after reaching <tt>Integer.MAX_VALUE</tt>). |
31 |
< |
* |
31 |
> |
* |
32 |
|
* <li> Like a CyclicBarrier, a Phaser may be repeatedly awaited. |
33 |
|
* Method <tt>arriveAndAwaitAdvance</tt> has effect analogous to |
34 |
|
* <tt>CyclicBarrier.await</tt>. However, Phasers separate two |
37 |
|
* <ul> |
38 |
|
* |
39 |
|
* <li> Arriving at a barrier. Methods <tt>arrive</tt> and |
40 |
< |
* <tt>arriveAndDeregister</tt> do not block, but return |
40 |
> |
* <tt>arriveAndDeregister</tt> do not block, but return |
41 |
|
* the phase value on entry to the method. |
42 |
|
* |
43 |
|
* <li> Awaiting others. Method <tt>awaitAdvance</tt> requires an |
60 |
|
* number reaches a threshold. Method <tt>forceTermination</tt> is |
61 |
|
* also available to assist recovery actions upon failure. |
62 |
|
* |
63 |
< |
* <li> Unlike most synchronizers, a Phaser may also be used with |
63 |
> |
* <li> Unlike most synchronizers, a Phaser may also be used with |
64 |
|
* ForkJoinTasks (as well as plain threads). |
65 |
< |
* |
65 |
> |
* |
66 |
|
* <li> By default, <tt>awaitAdvance</tt> continues to wait even if |
67 |
|
* the current thread is interrupted. And unlike the case in |
68 |
|
* CyclicBarriers, exceptions encountered while tasks wait |
72 |
|
* |
73 |
|
* </ul> |
74 |
|
* |
75 |
< |
* <p><b>Sample usage:</b> |
75 |
> |
* <p><b>Sample usage:</b> |
76 |
|
* |
77 |
|
* <p>[todo: non-FJ example] |
78 |
|
* |
114 |
|
* |
115 |
|
* <p><b>Implementation notes</b>: This implementation restricts the |
116 |
|
* maximum number of parties to 65535. Attempts to register |
117 |
< |
* additional parties result in IllegalStateExceptions. |
117 |
> |
* additional parties result in IllegalStateExceptions. |
118 |
|
*/ |
119 |
|
public class Phaser { |
120 |
|
/* |
126 |
|
/** |
127 |
|
* Barrier state representation. Conceptually, a barrier contains |
128 |
|
* four values: |
129 |
< |
* |
129 |
> |
* |
130 |
|
* * parties -- the number of parties to wait (16 bits) |
131 |
|
* * unarrived -- the number of parties yet to hit barrier (16 bits) |
132 |
|
* * phase -- the generation of the barrier (31 bits) |
307 |
|
if (phase < 0) |
308 |
|
return phase; |
309 |
|
Thread current = Thread.currentThread(); |
310 |
< |
if (current instanceof ForkJoinWorkerThread) |
310 |
> |
if (current instanceof ForkJoinWorkerThread) |
311 |
|
return helpingWait(phase); |
312 |
|
if (untimedWait(current, phase, false)) |
313 |
|
current.interrupt(); |
326 |
|
if (phase < 0) |
327 |
|
return phase; |
328 |
|
Thread current = Thread.currentThread(); |
329 |
< |
if (current instanceof ForkJoinWorkerThread) |
329 |
> |
if (current instanceof ForkJoinWorkerThread) |
330 |
|
return helpingWait(phase); |
331 |
|
else if (Thread.interrupted() || untimedWait(current, phase, true)) |
332 |
|
throw new InterruptedException(); |
343 |
|
* @throws InterruptedException if thread interrupted while waiting |
344 |
|
* @throws TimeoutException if timed out while waiting |
345 |
|
*/ |
346 |
< |
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) |
346 |
> |
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) |
347 |
|
throws InterruptedException, TimeoutException { |
348 |
|
if (phase < 0) |
349 |
|
return phase; |
350 |
|
long nanos = unit.toNanos(timeout); |
351 |
|
Thread current = Thread.currentThread(); |
352 |
< |
if (current instanceof ForkJoinWorkerThread) |
352 |
> |
if (current instanceof ForkJoinWorkerThread) |
353 |
|
return timedHelpingWait(phase, nanos); |
354 |
|
timedWait(current, phase, nanos); |
355 |
|
return phaseOf(state.get()); |
446 |
|
* phase number, this barrier will be set to a final termination |
447 |
|
* state, and subsequent calls to <tt>isTerminated</tt> will |
448 |
|
* return true. |
449 |
< |
* |
449 |
> |
* |
450 |
|
* <p> The default version returns true when the number of |
451 |
|
* registered parties is zero. Normally, overrides that arrange |
452 |
|
* termination for other reasons should also preserve this |
569 |
|
* The number of times to spin before blocking in timed waits. |
570 |
|
* The value is empirically derived. |
571 |
|
*/ |
572 |
< |
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; |
572 |
> |
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; |
573 |
|
|
574 |
|
/** |
575 |
|
* The number of times to spin before blocking in untimed waits. |
587 |
|
/** |
588 |
|
* Enqueues node and waits unless aborted or signalled. |
589 |
|
*/ |
590 |
< |
private boolean untimedWait(Thread thread, int currentPhase, |
590 |
> |
private boolean untimedWait(Thread thread, int currentPhase, |
591 |
|
boolean abortOnInterrupt) { |
592 |
|
final AtomicReference<QNode> head = this.head; |
593 |
|
final AtomicLong state = this.state; |
602 |
|
LockSupport.park(); |
603 |
|
if (Thread.interrupted()) { |
604 |
|
wasInterrupted = true; |
605 |
< |
if (abortOnInterrupt) |
605 |
> |
if (abortOnInterrupt) |
606 |
|
break; |
607 |
|
} |
608 |
|
} |
635 |
|
/** |
636 |
|
* Messier timeout version |
637 |
|
*/ |
638 |
< |
private void timedWait(Thread thread, int currentPhase, long nanos) |
638 |
> |
private void timedWait(Thread thread, int currentPhase, long nanos) |
639 |
|
throws InterruptedException, TimeoutException { |
640 |
|
final AtomicReference<QNode> head = this.head; |
641 |
|
final AtomicLong state = this.state; |
692 |
|
} |
693 |
|
|
694 |
|
} |
695 |
– |
|