ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.53 by jsr166, Sat Nov 13 05:59:25 2010 UTC vs.
Revision 1.54 by dl, Sat Nov 13 13:10:04 2010 UTC

# Line 302 | Line 302 | public class Phaser {
302       * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
303       */
304      private int doArrive(long adj) {
305 <        long s;
306 <        int phase, unarrived;
307 <        while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0) {
308 <            if ((unarrived = (int)(s & UNARRIVED_MASK)) != 0) {
309 <                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)) {
310 <                    if (unarrived == 1) {
311 <                        Phaser par;
312 <                        long p = s & PARTIES_MASK; // unshifted parties field
313 <                        long lu = p >>> PARTIES_SHIFT;
314 <                        int u = (int)lu;
315 <                        int nextPhase = (phase + 1) & MAX_PHASE;
316 <                        long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
317 <                        if ((par = parent) == null) {
318 <                            UNSAFE.compareAndSwapLong
319 <                                (this, stateOffset, s, onAdvance(phase, u)?
320 <                                 next | TERMINATION_PHASE : next);
321 <                            releaseWaiters(phase);
322 <                        }
323 <                        else {
324 <                            par.doArrive(u == 0?
325 <                                         ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
326 <                            if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
327 <                                ((int)(state >>> PHASE_SHIFT) != nextPhase &&
328 <                                 !UNSAFE.compareAndSwapLong(this, stateOffset,
329 <                                                            s, next)))
330 <                                reconcileState();
331 <                        }
305 >        for (;;) {
306 >            long s;
307 >            int phase, unarrived;
308 >            if ((phase = (int)((s = state) >>> PHASE_SHIFT)) < 0)
309 >                return phase;
310 >            else if ((unarrived = (int)(s & UNARRIVED_MASK)) == 0)
311 >                checkBadArrive(s);
312 >            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)){
313 >                if (unarrived == 1) {
314 >                    Phaser par;
315 >                    long p = s & PARTIES_MASK; // unshifted parties field
316 >                    long lu = p >>> PARTIES_SHIFT;
317 >                    int u = (int)lu;
318 >                    int nextPhase = (phase + 1) & MAX_PHASE;
319 >                    long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
320 >                    if ((par = parent) == null) {
321 >                        UNSAFE.compareAndSwapLong
322 >                            (this, stateOffset, s, onAdvance(phase, u)?
323 >                             next | TERMINATION_PHASE : next);
324 >                        releaseWaiters(phase);
325 >                    }
326 >                    else {
327 >                        par.doArrive(u == 0?
328 >                                     ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
329 >                        if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
330 >                            ((int)(state >>> PHASE_SHIFT) != nextPhase &&
331 >                             !UNSAFE.compareAndSwapLong(this, stateOffset,
332 >                                                        s, next)))
333 >                            reconcileState();
334                      }
333                    break;
335                  }
336 +                return phase;
337              }
336            else if (state == s && reconcileState() == s) // recheck
337                throw new IllegalStateException(badArrive());
338          }
339        return phase;
339      }
340  
341      /**
342 <     * Returns message string for bounds exceptions on arrival.
343 <     * Declared out of-line from doArrive to reduce string op bulk.
342 >     * Rechecks state and throws bounds exceptions on arrival -- called
343 >     * only if unarrived is apparently zero.
344       */
345 <    private String badArrive() {
346 <        return ("Attempted arrival of unregistered party for " +
347 <                this.toString());
345 >    private void checkBadArrive(long s) {
346 >        if (reconcileState() == s)
347 >            throw new IllegalStateException
348 >                ("Attempted arrival of unregistered party for " +
349 >                 stateToString(s));
350      }
351  
352      /**
# Line 357 | Line 358 | public class Phaser {
358          long adj = (long)registrations; // adjustment to state
359          adj |= adj << PARTIES_SHIFT;
360          Phaser par = parent;
361 <        long s;
362 <        int phase;
363 <        while ((phase = (int)((s = (par == null? state : reconcileState()))
364 <                              >>> PHASE_SHIFT)) >= 0) {
365 <            int parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT;
366 <            if (parties != 0 && (s & UNARRIVED_MASK) == 0)
361 >        for (;;) {
362 >            int phase, parties;
363 >            long s = par == null? state : reconcileState();
364 >            if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
365 >                return phase;
366 >            if ((parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT) != 0 &&
367 >                (s & UNARRIVED_MASK) == 0)
368                  internalAwaitAdvance(phase, null); // wait for onAdvance
369              else if (parties + registrations > MAX_COUNT)
370 <                throw new IllegalStateException(badRegister());
370 >                throw new IllegalStateException(badRegister(s));
371              else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
372 <                break;
372 >                return phase;
373          }
372        return phase;
374      }
375  
376      /**
377       * Returns message string for bounds exceptions on registration
378       */
379 <    private String badRegister() {
380 <        return ("Attempt to register more than " + MAX_COUNT + " parties for "+
381 <                this.toString());
379 >    private String badRegister(long s) {
380 >        return "Attempt to register more than " +
381 >            MAX_COUNT + " parties for " + stateToString(s);
382      }
383  
384      /**
# Line 389 | Line 390 | public class Phaser {
390          if (par == null)
391              return state;
392          Phaser rt = root;
393 <        long s;
394 <        int phase, rPhase;
395 <        while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0 &&
396 <               (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
397 <            if (rPhase < 0 || (s & UNARRIVED_MASK) == 0) {
398 <                long ps = par.parent == null? par.state : par.reconcileState();
399 <                int pPhase = (int)(ps >>> PHASE_SHIFT);
400 <                if (pPhase < 0 || pPhase == ((phase + 1) & MAX_PHASE)) {
401 <                    if (state != s)
402 <                        continue;
403 <                    long p = s & PARTIES_MASK;
404 <                    long next = ((((long) pPhase) << PHASE_SHIFT) |
405 <                                 (p >>> PARTIES_SHIFT) | p);
406 <                    if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
407 <                        return next;
408 <                }
393 >        for (;;) {
394 >            long s, u;
395 >            int phase, rPhase, pPhase;
396 >            if ((phase = (int)((s = state)>>> PHASE_SHIFT)) < 0 ||
397 >                (rPhase = (int)(rt.state >>> PHASE_SHIFT)) == phase)
398 >                return s;
399 >            long pState = par.parent == null? par.state : par.reconcileState();
400 >            if (state == s) {
401 >                if ((rPhase < 0 || (s & UNARRIVED_MASK) == 0) &&
402 >                    ((pPhase = (int)(pState >>> PHASE_SHIFT)) < 0 ||
403 >                     pPhase == ((phase + 1) & MAX_PHASE)))
404 >                    UNSAFE.compareAndSwapLong
405 >                        (this, stateOffset, s,
406 >                         (((long) pPhase) << PHASE_SHIFT) |
407 >                         (u = s & PARTIES_MASK) |
408 >                         (u >>> PARTIES_SHIFT)); // reset unarrived to parties
409 >                else
410 >                    releaseWaiters(phase); // help release others
411              }
409            if (state == s)
410                releaseWaiters(phase); // help release others
412          }
412        return s;
413      }
414  
415      /**
# Line 506 | Line 506 | public class Phaser {
506          if (parties < 0)
507              throw new IllegalArgumentException();
508          if (parties > MAX_COUNT)
509 <            throw new IllegalStateException(badRegister());
509 >            throw new IllegalStateException(badRegister(state));
510          if (parties == 0)
511              return getPhase();
512          return doRegister(parties);
# Line 674 | Line 674 | public class Phaser {
674       * @return the phase number, or a negative value if terminated
675       */
676      public final int getPhase() {
677 <        return (int)((parent == null? state : reconcileState()) >>> PHASE_SHIFT);
677 >        return (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
678      }
679  
680      /**
# Line 683 | Line 683 | public class Phaser {
683       * @return the number of parties
684       */
685      public int getRegisteredParties() {
686 <        return partiesOf(parent == null? state : reconcileState());
686 >        return partiesOf(parent==null? state : reconcileState());
687      }
688  
689      /**
# Line 693 | Line 693 | public class Phaser {
693       * @return the number of arrived parties
694       */
695      public int getArrivedParties() {
696 <        return arrivedOf(parent == null? state : reconcileState());
696 >        return arrivedOf(parent==null? state : reconcileState());
697      }
698  
699      /**
# Line 703 | Line 703 | public class Phaser {
703       * @return the number of unarrived parties
704       */
705      public int getUnarrivedParties() {
706 <        return unarrivedOf(parent == null? state : reconcileState());
706 >        return unarrivedOf(parent==null? state : reconcileState());
707      }
708  
709      /**
# Line 779 | Line 779 | public class Phaser {
779       * @return a string identifying this barrier, as well as its state
780       */
781      public String toString() {
782 <        long s = reconcileState();
782 >        return stateToString(reconcileState());
783 >    }
784 >
785 >    /**
786 >     * Implementation of toString and string-based error messages
787 >     */
788 >    private String stateToString(long s) {
789          return super.toString() +
790              "[phase = " + phaseOf(s) +
791              " parties = " + partiesOf(s) +
792              " arrived = " + arrivedOf(s) + "]";
793      }
794  
795 +    // Waiting mechanics
796 +
797      /**
798       * Removes and signals threads from queue for phase
799       */
# Line 842 | Line 850 | public class Phaser {
850       */
851      private int internalAwaitAdvance(int phase, QNode node) {
852          Phaser current = this;       // to eventually wait at root if tiered
853 <        Phaser par = parent;
846 <        boolean queued = false;
847 <        int spins = SPINS_PER_ARRIVAL;
853 >        boolean queued = false;      // true when node is enqueued
854          int lastUnarrived = -1;      // to increase spins upon change
855 <        long s;
856 <        int p;
857 <        while ((p = (int)((s = current.state) >>> PHASE_SHIFT)) == phase) {
858 <            int unarrived = (int)(s & UNARRIVED_MASK);
859 <            if (unarrived != lastUnarrived) {
855 >        int spins = SPINS_PER_ARRIVAL;
856 >        for (;;) {
857 >            int p, unarrived;
858 >            Phaser par;
859 >            long s = current.state;
860 >            if ((p = (int)(s >>> PHASE_SHIFT)) != phase) {
861 >                if (node != null)
862 >                    node.onRelease();
863 >                releaseWaiters(phase);
864 >                return p;
865 >            }
866 >            else if ((unarrived = (int)(s & UNARRIVED_MASK)) != lastUnarrived) {
867                  if ((lastUnarrived = unarrived) < NCPU)
868                      spins += SPINS_PER_ARRIVAL;
869              }
870 <            else if (unarrived == 0 && par != null) {
870 >            else if (unarrived == 0 && (par = current.parent) != null) {
871                  current = par;       // if all arrived, use parent
872                  par = par.parent;
873 +                lastUnarrived = -1;
874              }
875              else if (spins > 0)
876                  --spins;
877 <            else if (node == null)
877 >            else if (node == null)   // must be noninterruptible
878                  node = new QNode(this, phase, false, false, 0L);
879 <            else if (node.isReleasable())
880 <                break;
879 >            else if (node.isReleasable()) {
880 >                if ((int)(reconcileState() >>> PHASE_SHIFT) == phase)
881 >                    return phase;    // aborted
882 >            }
883              else if (!queued)
884                  queued = tryEnqueue(phase, node);
885              else {
# Line 874 | Line 890 | public class Phaser {
890                  }
891              }
892          }
877        if (node != null) {
878            if (node.thread != null)
879                node.thread = null;
880            if (!node.interruptible && node.wasInterrupted)
881                Thread.currentThread().interrupt();
882        }
883        if (p == phase)
884            p = (int)(reconcileState() >>> PHASE_SHIFT);
885        if (p != phase)
886            releaseWaiters(phase);
887        return p;
893      }
894  
895      /**
# Line 956 | Line 961 | public class Phaser {
961                  LockSupport.unpark(t);
962              }
963          }
964 +
965 +        void onRelease() { // actions upon return from internalAwaitAdvance
966 +            if (!interruptible && wasInterrupted)
967 +                Thread.currentThread().interrupt();
968 +            if (thread != null)
969 +                thread = null;
970 +        }
971 +
972      }
973  
974      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines