ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.61 by jsr166, Sun Nov 28 21:21:03 2010 UTC vs.
Revision 1.62 by dl, Mon Nov 29 00:52:28 2010 UTC

# Line 347 | Line 347 | public class Phaser {
347                      else {
348                          parent.doArrive((u == 0) ?
349                                          ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
350 <                        if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase ||
351 <                            ((int)(state >>> PHASE_SHIFT) != nextPhase &&
352 <                             !UNSAFE.compareAndSwapLong(this, stateOffset,
353 <                                                        s, next)))
350 >                        if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase)
351                              reconcileState();
352 +                        else if (state == s)
353 +                            UNSAFE.compareAndSwapLong(this, stateOffset, s,
354 +                                                      next);
355                      }
356                  }
357                  return phase;
# Line 413 | Line 413 | public class Phaser {
413              int phase, rPhase;
414              while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
415                     (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
416 <                if ((int)(par.state >>> PHASE_SHIFT) != rPhase)
416 >                if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase)
417                      par.reconcileState();
418                  else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
419                      long u = s & PARTIES_MASK; // reset unarrived to parties
# Line 587 | Line 587 | public class Phaser {
587       * of unarrived parties would become negative
588       */
589      public int arriveAndAwaitAdvance() {
590 <        return awaitAdvance(arrive());
590 >        return awaitAdvance(doArrive(ONE_ARRIVAL));
591      }
592  
593      /**
# Line 603 | Line 603 | public class Phaser {
603       * if terminated or argument is negative
604       */
605      public int awaitAdvance(int phase) {
606 <        Phaser r;
606 >        Phaser rt;
607          int p = (int)(state >>> PHASE_SHIFT);
608          if (phase < 0)
609              return phase;
610          if (p == phase &&
611 <            (p = (int)((r = root).state >>> PHASE_SHIFT)) == phase)
612 <            return r.internalAwaitAdvance(phase, null);
611 >            (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase)
612 >            return rt.internalAwaitAdvance(phase, null);
613          return p;
614      }
615  
# Line 629 | Line 629 | public class Phaser {
629       */
630      public int awaitAdvanceInterruptibly(int phase)
631          throws InterruptedException {
632 <        Phaser r;
632 >        Phaser rt;
633          int p = (int)(state >>> PHASE_SHIFT);
634          if (phase < 0)
635              return phase;
636          if (p == phase &&
637 <            (p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) {
637 >            (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
638              QNode node = new QNode(this, phase, true, false, 0L);
639 <            p = r.internalAwaitAdvance(phase, node);
639 >            p = rt.internalAwaitAdvance(phase, node);
640              if (node.wasInterrupted)
641                  throw new InterruptedException();
642          }
# Line 667 | Line 667 | public class Phaser {
667                                           long timeout, TimeUnit unit)
668          throws InterruptedException, TimeoutException {
669          long nanos = unit.toNanos(timeout);
670 <        Phaser r;
670 >        Phaser rt;
671          int p = (int)(state >>> PHASE_SHIFT);
672          if (phase < 0)
673              return phase;
674          if (p == phase &&
675 <            (p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) {
675 >            (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
676              QNode node = new QNode(this, phase, true, true, nanos);
677 <            p = r.internalAwaitAdvance(phase, node);
677 >            p = rt.internalAwaitAdvance(phase, node);
678              if (node.wasInterrupted)
679                  throw new InterruptedException();
680              else if (p == phase)
# Line 819 | Line 819 | public class Phaser {
819       * @return {@code true} if this barrier should terminate
820       */
821      protected boolean onAdvance(int phase, int registeredParties) {
822 <        return registeredParties <= 0;
822 >        return registeredParties == 0;
823      }
824  
825      /**
# Line 851 | Line 851 | public class Phaser {
851       * Removes and signals threads from queue for phase.
852       */
853      private void releaseWaiters(int phase) {
854 <        AtomicReference<QNode> head = queueFor(phase);
854 >        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
855          QNode q;
856          int p;
857          while ((q = head.get()) != null &&
# Line 873 | Line 873 | public class Phaser {
873       * avoid it when threads regularly arrive: When a thread in
874       * internalAwaitAdvance notices another arrival before blocking,
875       * and there appear to be enough CPUs available, it spins
876 <     * SPINS_PER_ARRIVAL more times before blocking. Plus, even on
877 <     * uniprocessors, there is at least one intervening Thread.yield
878 <     * before blocking. The value trades off good-citizenship vs big
879 <     * unnecessary slowdowns.
876 >     * SPINS_PER_ARRIVAL more times before blocking. The value trades
877 >     * off good-citizenship vs big unnecessary slowdowns.
878       */
879      static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
880  
# Line 897 | Line 895 | public class Phaser {
895          int p;
896          while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
897              int unarrived = (int)s & UNARRIVED_MASK;
898 <            if (unarrived != lastUnarrived) {
898 >            if (node != null && node.isReleasable()) {
899 >                p = (int)(state >>> PHASE_SHIFT);
900 >                break;               // done or aborted
901 >            }
902 >            else if (node == null && Thread.interrupted()) {
903 >                node = new QNode(this, phase, false, false, 0L);
904 >                node.wasInterrupted = true;
905 >            }
906 >            else if (unarrived != lastUnarrived) {
907                  if (lastUnarrived == -1) // ensure old queue clean
908                      releaseWaiters(phase-1);
909                  if ((lastUnarrived = unarrived) < NCPU)
910                      spins += SPINS_PER_ARRIVAL;
911              }
912 <            else if (spins > 0) {
913 <                if (--spins == (SPINS_PER_ARRIVAL >>> 1))
914 <                    Thread.yield();  // yield midway through spin
909 <            }
910 <            else if (node == null)   // must be noninterruptible
912 >            else if (spins > 0)
913 >                --spins;
914 >            else if (node == null)   // null if noninterruptible mode
915                  node = new QNode(this, phase, false, false, 0L);
912            else if (node.isReleasable()) {
913                p = (int)(state >>> PHASE_SHIFT);
914                break;               // aborted
915            }
916              else if (!queued) {      // push onto queue
917 <                AtomicReference<QNode> head = queueFor(phase);
917 >                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
918                  QNode q = head.get();
919                  if (q == null || q.phase == phase) {
920                      node.next = q;
921                      if ((p = (int)(state >>> PHASE_SHIFT)) != phase)
922                          break;       // recheck to avoid stale enqueue
923 <                    else
924 <                        queued = head.compareAndSet(q, node);
923 >                    queued = head.compareAndSet(q, node);
924                  }
925              }
926              else {
# Line 936 | Line 935 | public class Phaser {
935          if (node != null) {
936              if (node.thread != null)
937                  node.thread = null; // disable unpark() in node.signal
938 <            if (!node.interruptible && node.wasInterrupted)
938 >            if (node.wasInterrupted && !node.interruptible)
939                  Thread.currentThread().interrupt();
940          }
941          if (p != phase)
# Line 977 | Line 976 | public class Phaser {
976                  else {
977                      if (Thread.interrupted())
978                          wasInterrupted = true;
979 <                    if (interruptible && wasInterrupted)
979 >                    if (wasInterrupted && interruptible)
980                          t = null;
981                      else if (timed) {
982                          if (nanos > 0) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines