347 |
|
else { |
348 |
|
parent.doArrive((u == 0) ? |
349 |
|
ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL); |
350 |
< |
if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase || |
351 |
< |
((int)(state >>> PHASE_SHIFT) != nextPhase && |
352 |
< |
!UNSAFE.compareAndSwapLong(this, stateOffset, |
353 |
< |
s, next))) |
350 |
> |
if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase) |
351 |
|
reconcileState(); |
352 |
+ |
else if (state == s) |
353 |
+ |
UNSAFE.compareAndSwapLong(this, stateOffset, s, |
354 |
+ |
next); |
355 |
|
} |
356 |
|
} |
357 |
|
return phase; |
413 |
|
int phase, rPhase; |
414 |
|
while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 && |
415 |
|
(rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) { |
416 |
< |
if ((int)(par.state >>> PHASE_SHIFT) != rPhase) |
416 |
> |
if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase) |
417 |
|
par.reconcileState(); |
418 |
|
else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) { |
419 |
|
long u = s & PARTIES_MASK; // reset unarrived to parties |
587 |
|
* of unarrived parties would become negative |
588 |
|
*/ |
589 |
|
public int arriveAndAwaitAdvance() { |
590 |
< |
return awaitAdvance(arrive()); |
590 |
> |
return awaitAdvance(doArrive(ONE_ARRIVAL)); |
591 |
|
} |
592 |
|
|
593 |
|
/** |
603 |
|
* if terminated or argument is negative |
604 |
|
*/ |
605 |
|
public int awaitAdvance(int phase) { |
606 |
< |
Phaser r; |
606 |
> |
Phaser rt; |
607 |
|
int p = (int)(state >>> PHASE_SHIFT); |
608 |
|
if (phase < 0) |
609 |
|
return phase; |
610 |
|
if (p == phase && |
611 |
< |
(p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) |
612 |
< |
return r.internalAwaitAdvance(phase, null); |
611 |
> |
(p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) |
612 |
> |
return rt.internalAwaitAdvance(phase, null); |
613 |
|
return p; |
614 |
|
} |
615 |
|
|
629 |
|
*/ |
630 |
|
public int awaitAdvanceInterruptibly(int phase) |
631 |
|
throws InterruptedException { |
632 |
< |
Phaser r; |
632 |
> |
Phaser rt; |
633 |
|
int p = (int)(state >>> PHASE_SHIFT); |
634 |
|
if (phase < 0) |
635 |
|
return phase; |
636 |
|
if (p == phase && |
637 |
< |
(p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) { |
637 |
> |
(p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { |
638 |
|
QNode node = new QNode(this, phase, true, false, 0L); |
639 |
< |
p = r.internalAwaitAdvance(phase, node); |
639 |
> |
p = rt.internalAwaitAdvance(phase, node); |
640 |
|
if (node.wasInterrupted) |
641 |
|
throw new InterruptedException(); |
642 |
|
} |
667 |
|
long timeout, TimeUnit unit) |
668 |
|
throws InterruptedException, TimeoutException { |
669 |
|
long nanos = unit.toNanos(timeout); |
670 |
< |
Phaser r; |
670 |
> |
Phaser rt; |
671 |
|
int p = (int)(state >>> PHASE_SHIFT); |
672 |
|
if (phase < 0) |
673 |
|
return phase; |
674 |
|
if (p == phase && |
675 |
< |
(p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) { |
675 |
> |
(p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { |
676 |
|
QNode node = new QNode(this, phase, true, true, nanos); |
677 |
< |
p = r.internalAwaitAdvance(phase, node); |
677 |
> |
p = rt.internalAwaitAdvance(phase, node); |
678 |
|
if (node.wasInterrupted) |
679 |
|
throw new InterruptedException(); |
680 |
|
else if (p == phase) |
819 |
|
* @return {@code true} if this barrier should terminate |
820 |
|
*/ |
821 |
|
protected boolean onAdvance(int phase, int registeredParties) { |
822 |
< |
return registeredParties <= 0; |
822 |
> |
return registeredParties == 0; |
823 |
|
} |
824 |
|
|
825 |
|
/** |
851 |
|
* Removes and signals threads from queue for phase. |
852 |
|
*/ |
853 |
|
private void releaseWaiters(int phase) { |
854 |
< |
AtomicReference<QNode> head = queueFor(phase); |
854 |
> |
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; |
855 |
|
QNode q; |
856 |
|
int p; |
857 |
|
while ((q = head.get()) != null && |
873 |
|
* avoid it when threads regularly arrive: When a thread in |
874 |
|
* internalAwaitAdvance notices another arrival before blocking, |
875 |
|
* and there appear to be enough CPUs available, it spins |
876 |
< |
* SPINS_PER_ARRIVAL more times before blocking. Plus, even on |
877 |
< |
* uniprocessors, there is at least one intervening Thread.yield |
878 |
< |
* before blocking. The value trades off good-citizenship vs big |
879 |
< |
* unnecessary slowdowns. |
876 |
> |
* SPINS_PER_ARRIVAL more times before blocking. The value trades |
877 |
> |
* off good-citizenship vs big unnecessary slowdowns. |
878 |
|
*/ |
879 |
|
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; |
880 |
|
|
895 |
|
int p; |
896 |
|
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { |
897 |
|
int unarrived = (int)s & UNARRIVED_MASK; |
898 |
< |
if (unarrived != lastUnarrived) { |
898 |
> |
if (node != null && node.isReleasable()) { |
899 |
> |
p = (int)(state >>> PHASE_SHIFT); |
900 |
> |
break; // done or aborted |
901 |
> |
} |
902 |
> |
else if (node == null && Thread.interrupted()) { |
903 |
> |
node = new QNode(this, phase, false, false, 0L); |
904 |
> |
node.wasInterrupted = true; |
905 |
> |
} |
906 |
> |
else if (unarrived != lastUnarrived) { |
907 |
|
if (lastUnarrived == -1) // ensure old queue clean |
908 |
|
releaseWaiters(phase-1); |
909 |
|
if ((lastUnarrived = unarrived) < NCPU) |
910 |
|
spins += SPINS_PER_ARRIVAL; |
911 |
|
} |
912 |
< |
else if (spins > 0) { |
913 |
< |
if (--spins == (SPINS_PER_ARRIVAL >>> 1)) |
914 |
< |
Thread.yield(); // yield midway through spin |
909 |
< |
} |
910 |
< |
else if (node == null) // must be noninterruptible |
912 |
> |
else if (spins > 0) |
913 |
> |
--spins; |
914 |
> |
else if (node == null) // null if noninterruptible mode |
915 |
|
node = new QNode(this, phase, false, false, 0L); |
912 |
– |
else if (node.isReleasable()) { |
913 |
– |
p = (int)(state >>> PHASE_SHIFT); |
914 |
– |
break; // aborted |
915 |
– |
} |
916 |
|
else if (!queued) { // push onto queue |
917 |
< |
AtomicReference<QNode> head = queueFor(phase); |
917 |
> |
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; |
918 |
|
QNode q = head.get(); |
919 |
|
if (q == null || q.phase == phase) { |
920 |
|
node.next = q; |
921 |
|
if ((p = (int)(state >>> PHASE_SHIFT)) != phase) |
922 |
|
break; // recheck to avoid stale enqueue |
923 |
< |
else |
924 |
< |
queued = head.compareAndSet(q, node); |
923 |
> |
queued = head.compareAndSet(q, node); |
924 |
|
} |
925 |
|
} |
926 |
|
else { |
935 |
|
if (node != null) { |
936 |
|
if (node.thread != null) |
937 |
|
node.thread = null; // disable unpark() in node.signal |
938 |
< |
if (!node.interruptible && node.wasInterrupted) |
938 |
> |
if (node.wasInterrupted && !node.interruptible) |
939 |
|
Thread.currentThread().interrupt(); |
940 |
|
} |
941 |
|
if (p != phase) |
976 |
|
else { |
977 |
|
if (Thread.interrupted()) |
978 |
|
wasInterrupted = true; |
979 |
< |
if (interruptible && wasInterrupted) |
979 |
> |
if (wasInterrupted && interruptible) |
980 |
|
t = null; |
981 |
|
else if (timed) { |
982 |
|
if (nanos > 0) { |