ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.56 by dl, Wed Nov 17 10:48:59 2010 UTC vs.
Revision 1.57 by dl, Fri Nov 19 16:03:24 2010 UTC

# Line 245 | Line 245 | public class Phaser {
245      private static final int  PARTIES_SHIFT  = 16;
246      private static final int  PHASE_SHIFT    = 32;
247      private static final int  UNARRIVED_MASK = 0xffff;
248 <    private static final int  PARTIES_MASK   = 0xffff0000;
249 <    private static final long LPARTIES_MASK  = 0xffff0000L; // long version
248 >    private static final long PARTIES_MASK   = 0xffff0000L; // for masking long
249      private static final long ONE_ARRIVAL    = 1L;
250      private static final long ONE_PARTY      = 1L << PARTIES_SHIFT;
251      private static final long TERMINATION_PHASE  = -1L << PHASE_SHIFT;
# Line 304 | Line 303 | public class Phaser {
303       */
304      private int doArrive(long adj) {
305          for (;;) {
306 <            long s;
307 <            int phase, unarrived;
308 <            if ((phase = (int)((s = state) >>> PHASE_SHIFT)) < 0)
306 >            long s = state;
307 >            int phase = (int)(s >>> PHASE_SHIFT);
308 >            if (phase < 0)
309                  return phase;
310 <            else if ((unarrived = (int)s & UNARRIVED_MASK) == 0)
310 >            int unarrived = (int)s & UNARRIVED_MASK;
311 >            if (unarrived == 0)
312                  checkBadArrive(s);
313              else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
314                  if (unarrived == 1) {
315 <                    Phaser par;
316 <                    long p = s & LPARTIES_MASK; // unshifted parties field
315 >                    long p = s & PARTIES_MASK; // unshifted parties field
316                      long lu = p >>> PARTIES_SHIFT;
317                      int u = (int)lu;
318                      int nextPhase = (phase + 1) & MAX_PHASE;
319                      long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
320 <                    if ((par = parent) == null) {
320 >                    final Phaser parent = this.parent;
321 >                    if (parent == null) {
322                          if (onAdvance(phase, u))
323                              next |= TERMINATION_PHASE; // obliterate phase
324                          UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
325                          releaseWaiters(phase);
326                      }
327                      else {
328 <                        par.doArrive(u == 0?
329 <                                     ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
330 <                        if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
328 >                        parent.doArrive((u == 0) ?
329 >                                        ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
330 >                        if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase ||
331                              ((int)(state >>> PHASE_SHIFT) != nextPhase &&
332                               !UNSAFE.compareAndSwapLong(this, stateOffset,
333                                                          s, next)))
# Line 356 | Line 356 | public class Phaser {
356       * @param registrations number to add to both parties and unarrived fields
357       */
358      private int doRegister(int registrations) {
359 <        long adj = (long)registrations; // adjustment to state
360 <        adj |= adj << PARTIES_SHIFT;
361 <        Phaser par = parent;
359 >        // assert registrations > 0;
360 >        // adjustment to state
361 >        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
362 >        final Phaser parent = this.parent;
363          for (;;) {
364 <            int phase, parties;
365 <            long s = par == null? state : reconcileState();
366 <            if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
364 >            long s = (parent == null) ? state : reconcileState();
365 >            int phase = (int)(s >>> PHASE_SHIFT);
366 >            if (phase < 0)
367                  return phase;
368 <            if ((parties = (int)s >>> PARTIES_SHIFT) != 0 &&
369 <                ((int)s & UNARRIVED_MASK) == 0)
368 >            int parties = (int)s >>> PARTIES_SHIFT;
369 >            if (parties != 0 && ((int)s & UNARRIVED_MASK) == 0)
370                  internalAwaitAdvance(phase, null); // wait for onAdvance
371 <            else if (parties + registrations > MAX_PARTIES)
371 >            else if (registrations > MAX_PARTIES - parties)
372                  throw new IllegalStateException(badRegister(s));
373              else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
374                  return phase;
# Line 387 | Line 388 | public class Phaser {
388       */
389      private long reconcileState() {
390          Phaser par = parent;
391 <        if (par == null)
392 <            return state;
393 <        Phaser rt = root;
394 <        for (;;) {
395 <            long s, u;
396 <            int phase, rPhase, pPhase;
397 <            if ((phase = (int)((s = state)>>> PHASE_SHIFT)) < 0 ||
398 <                (rPhase = (int)(rt.state >>> PHASE_SHIFT)) == phase)
399 <                return s;
400 <            long pState = par.parent == null? par.state : par.reconcileState();
401 <            if (state == s) {
402 <                if ((rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) &&
403 <                    ((pPhase = (int)(pState >>> PHASE_SHIFT)) < 0 ||
404 <                     pPhase == ((phase + 1) & MAX_PHASE)))
405 <                    UNSAFE.compareAndSwapLong
406 <                        (this, stateOffset, s,
407 <                         (((long) pPhase) << PHASE_SHIFT) |
408 <                         (u = s & LPARTIES_MASK) |
408 <                         (u >>> PARTIES_SHIFT)); // reset unarrived to parties
409 <                else
410 <                    releaseWaiters(phase); // help release others
391 >        long s = state;
392 >        if (par != null) {
393 >            Phaser rt = root;
394 >            int phase, rPhase;
395 >            while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
396 >                   (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
397 >                if ((int)(par.state >>> PHASE_SHIFT) != rPhase)
398 >                    par.reconcileState();
399 >                else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
400 >                    long u = s & PARTIES_MASK; // reset unarrived to parties
401 >                    long next = ((((long) rPhase) << PHASE_SHIFT) | u |
402 >                                 (u >>> PARTIES_SHIFT));
403 >                    if (state == s &&
404 >                        UNSAFE.compareAndSwapLong(this, stateOffset,
405 >                                                  s, s = next))
406 >                        break;
407 >                }
408 >                s = state;
409              }
410          }
411 +        return s;
412      }
413  
414      /**
# Line 505 | Line 504 | public class Phaser {
504      public int bulkRegister(int parties) {
505          if (parties < 0)
506              throw new IllegalArgumentException();
508        if (parties > MAX_PARTIES)
509            throw new IllegalStateException(badRegister(state));
507          if (parties == 0)
508              return getPhase();
509          return doRegister(parties);
# Line 573 | Line 570 | public class Phaser {
570       * if terminated or argument is negative
571       */
572      public int awaitAdvance(int phase) {
576        int p;
573          if (phase < 0)
574              return phase;
575 <        else if ((p = (int)((parent == null? state : reconcileState())
576 <                            >>> PHASE_SHIFT)) == phase)
577 <            return internalAwaitAdvance(phase, null);
582 <        else
583 <            return p;
575 >        long s = (parent == null) ? state : reconcileState();
576 >        int p = (int)(s >>> PHASE_SHIFT);
577 >        return (p != phase) ? p : internalAwaitAdvance(phase, null);
578      }
579  
580      /**
# Line 599 | Line 593 | public class Phaser {
593       */
594      public int awaitAdvanceInterruptibly(int phase)
595          throws InterruptedException {
602        int p;
596          if (phase < 0)
597              return phase;
598 <        if ((p = (int)((parent == null? state : reconcileState())
599 <                       >>> PHASE_SHIFT)) == phase) {
598 >        long s = (parent == null) ? state : reconcileState();
599 >        int p = (int)(s >>> PHASE_SHIFT);
600 >        if (p == phase) {
601              QNode node = new QNode(this, phase, true, false, 0L);
602              p = internalAwaitAdvance(phase, node);
603              if (node.wasInterrupted)
# Line 635 | Line 629 | public class Phaser {
629      public int awaitAdvanceInterruptibly(int phase,
630                                           long timeout, TimeUnit unit)
631          throws InterruptedException, TimeoutException {
638        long nanos = unit.toNanos(timeout);
639        int p;
632          if (phase < 0)
633              return phase;
634 <        if ((p = (int)((parent == null? state : reconcileState())
635 <                       >>> PHASE_SHIFT)) == phase) {
634 >        long s = (parent == null) ? state : reconcileState();
635 >        int p = (int)(s >>> PHASE_SHIFT);
636 >        if (p == phase) {
637 >            long nanos = unit.toNanos(timeout);
638              QNode node = new QNode(this, phase, true, true, nanos);
639              p = internalAwaitAdvance(phase, node);
640              if (node.wasInterrupted)
# Line 682 | Line 676 | public class Phaser {
676       * @return the phase number, or a negative value if terminated
677       */
678      public final int getPhase() {
679 <        return (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
679 >        return (int)(root.state >>> PHASE_SHIFT);
680      }
681  
682      /**
# Line 691 | Line 685 | public class Phaser {
685       * @return the number of parties
686       */
687      public int getRegisteredParties() {
688 <        return partiesOf(parent==null? state : reconcileState());
688 >        return partiesOf(state);
689      }
690  
691      /**
# Line 739 | Line 733 | public class Phaser {
733       * @return {@code true} if this barrier has been terminated
734       */
735      public boolean isTerminated() {
736 <        return (parent == null? state : reconcileState()) < 0;
736 >        return root.state < 0L;
737      }
738  
739      /**
# Line 803 | Line 797 | public class Phaser {
797      // Waiting mechanics
798  
799      /**
800 <     * Removes and signals threads from queue for phase
800 >     * Removes and signals threads from queue for phase.
801       */
802      private void releaseWaiters(int phase) {
803          AtomicReference<QNode> head = queueFor(phase);
# Line 817 | Line 811 | public class Phaser {
811          }
812      }
813  
820    /**
821     * Tries to enqueue given node in the appropriate wait queue.
822     *
823     * @return true if successful
824     */
825    private boolean tryEnqueue(int phase, QNode node) {
826        releaseWaiters(phase-1); // ensure old queue clean
827        AtomicReference<QNode> head = queueFor(phase);
828        QNode q = head.get();
829        return ((q == null || q.phase == phase) &&
830                (int)(root.state >>> PHASE_SHIFT) == phase &&
831                head.compareAndSet(node.next = q, node));
832    }
833
814      /** The number of CPUs, for spin control */
815      private static final int NCPU = Runtime.getRuntime().availableProcessors();
816  
# Line 862 | Line 842 | public class Phaser {
842          boolean queued = false;      // true when node is enqueued
843          int lastUnarrived = -1;      // to increase spins upon change
844          int spins = SPINS_PER_ARRIVAL;
845 <        for (;;) {
846 <            int p, unarrived;
845 >        long s;
846 >        int p;
847 >        while ((p = (int)((s = current.state) >>> PHASE_SHIFT)) == phase) {
848              Phaser par;
849 <            long s = current.state;
850 <            if ((p = (int)(s >>> PHASE_SHIFT)) != phase) {
851 <                if (node != null)
852 <                    node.onRelease();
853 <                releaseWaiters(phase);
854 <                return p;
849 >            int unarrived = (int)s & UNARRIVED_MASK;
850 >            if (unarrived != lastUnarrived) {
851 >                if (lastUnarrived == -1) // ensure old queue clean
852 >                    releaseWaiters(phase-1);
853 >                if ((lastUnarrived = unarrived) < NCPU)
854 >                    spins += SPINS_PER_ARRIVAL;
855              }
856 <            else if ((unarrived = (int)s & UNARRIVED_MASK) == 0 &&
876 <                     (par = current.parent) != null) {
856 >            else if (unarrived == 0 && (par = current.parent) != null) {
857                  current = par;       // if all arrived, use parent
858                  par = par.parent;
859                  lastUnarrived = -1;
860              }
881            else if (unarrived != lastUnarrived) {
882                if ((lastUnarrived = unarrived) < NCPU)
883                    spins += SPINS_PER_ARRIVAL;
884            }
861              else if (spins > 0) {
862                  if (--spins == (SPINS_PER_ARRIVAL >>> 1))
863                      Thread.yield();  // yield midway through spin
# Line 889 | Line 865 | public class Phaser {
865              else if (node == null)   // must be noninterruptible
866                  node = new QNode(this, phase, false, false, 0L);
867              else if (node.isReleasable()) {
868 <                if ((int)(reconcileState() >>> PHASE_SHIFT) == phase)
868 >                if ((p = (int)(root.state >>> PHASE_SHIFT)) != phase)
869 >                    break;
870 >                else
871                      return phase;    // aborted
872              }
873 <            else if (!queued)
874 <                queued = tryEnqueue(phase, node);
873 >            else if (!queued) {      // push onto queue
874 >                AtomicReference<QNode> head = queueFor(phase);
875 >                QNode q = head.get();
876 >                if (q == null || q.phase == phase) {
877 >                    node.next = q;
878 >                    if ((p = (int)(root.state >>> PHASE_SHIFT)) != phase)
879 >                        break;       // recheck to avoid stale enqueue
880 >                    else
881 >                        queued = head.compareAndSet(q, node);
882 >                }
883 >            }
884              else {
885                  try {
886                      ForkJoinPool.managedBlock(node);
# Line 902 | Line 889 | public class Phaser {
889                  }
890              }
891          }
892 +        releaseWaiters(phase);
893 +        if (node != null)
894 +            node.onRelease();
895 +        return p;
896      }
897  
898      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines