ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Phaser.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Phaser.java (file contents):
Revision 1.20 by jsr166, Sat Nov 13 08:30:15 2010 UTC vs.
Revision 1.21 by dl, Sat Nov 13 13:07:42 2010 UTC

# Line 302 | Line 302 | public class Phaser {
302       * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
303       */
304      private int doArrive(long adj) {
305 <        long s;
306 <        int phase, unarrived;
307 <        while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0) {
308 <            if ((unarrived = (int)(s & UNARRIVED_MASK)) != 0) {
309 <                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)) {
310 <                    if (unarrived == 1) {
311 <                        Phaser par;
312 <                        long p = s & PARTIES_MASK; // unshifted parties field
313 <                        long lu = p >>> PARTIES_SHIFT;
314 <                        int u = (int)lu;
315 <                        int nextPhase = (phase + 1) & MAX_PHASE;
316 <                        long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
317 <                        if ((par = parent) == null) {
318 <                            UNSAFE.compareAndSwapLong
319 <                                (this, stateOffset, s, onAdvance(phase, u)?
320 <                                 next | TERMINATION_PHASE : next);
321 <                            releaseWaiters(phase);
322 <                        }
323 <                        else {
324 <                            par.doArrive(u == 0?
325 <                                         ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
326 <                            if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
327 <                                ((int)(state >>> PHASE_SHIFT) != nextPhase &&
328 <                                 !UNSAFE.compareAndSwapLong(this, stateOffset,
329 <                                                            s, next)))
330 <                                reconcileState();
331 <                        }
305 >        for (;;) {
306 >            long s;
307 >            int phase, unarrived;
308 >            if ((phase = (int)((s = state) >>> PHASE_SHIFT)) < 0)
309 >                return phase;
310 >            else if ((unarrived = (int)(s & UNARRIVED_MASK)) == 0)
311 >                checkBadArrive(s);
312 >            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)){
313 >                if (unarrived == 1) {
314 >                    Phaser par;
315 >                    long p = s & PARTIES_MASK; // unshifted parties field
316 >                    long lu = p >>> PARTIES_SHIFT;
317 >                    int u = (int)lu;
318 >                    int nextPhase = (phase + 1) & MAX_PHASE;
319 >                    long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
320 >                    if ((par = parent) == null) {
321 >                        UNSAFE.compareAndSwapLong
322 >                            (this, stateOffset, s, onAdvance(phase, u)?
323 >                             next | TERMINATION_PHASE : next);
324 >                        releaseWaiters(phase);
325 >                    }
326 >                    else {
327 >                        par.doArrive(u == 0?
328 >                                     ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
329 >                        if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
330 >                            ((int)(state >>> PHASE_SHIFT) != nextPhase &&
331 >                             !UNSAFE.compareAndSwapLong(this, stateOffset,
332 >                                                        s, next)))
333 >                            reconcileState();
334                      }
333                    break;
335                  }
336 +                return phase;
337              }
336            else if (state == s && reconcileState() == s) // recheck
337                throw new IllegalStateException(badArrive());
338          }
339        return phase;
339      }
340  
341      /**
342 <     * Returns message string for bounds exceptions on arrival.
343 <     * Declared out of line from doArrive to reduce string op bulk.
342 >     * Rechecks state and throws bounds exceptions on arrival -- called
343 >     * only if unarrived is apparently zero.
344       */
345 <    private String badArrive() {
346 <        return "Attempted arrival of unregistered party for " + toString();
345 >    private void checkBadArrive(long s) {
346 >        if (reconcileState() == s)
347 >            throw new IllegalStateException
348 >                ("Attempted arrival of unregistered party for " +
349 >                 stateToString(s));
350      }
351  
352      /**
# Line 356 | Line 358 | public class Phaser {
358          long adj = (long)registrations; // adjustment to state
359          adj |= adj << PARTIES_SHIFT;
360          Phaser par = parent;
361 <        long s;
362 <        int phase;
363 <        while ((phase = (int)((s = (par == null? state : reconcileState()))
364 <                              >>> PHASE_SHIFT)) >= 0) {
365 <            int parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT;
366 <            if (parties != 0 && (s & UNARRIVED_MASK) == 0)
361 >        for (;;) {
362 >            int phase, parties;
363 >            long s = par == null? state : reconcileState();
364 >            if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
365 >                return phase;
366 >            if ((parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT) != 0 &&
367 >                (s & UNARRIVED_MASK) == 0)
368                  internalAwaitAdvance(phase, null); // wait for onAdvance
369              else if (parties + registrations > MAX_COUNT)
370 <                throw new IllegalStateException(badRegister());
370 >                throw new IllegalStateException(badRegister(s));
371              else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
372 <                break;
372 >                return phase;
373          }
371        return phase;
374      }
375  
376      /**
377 <     * Returns message string for out of bounds exceptions on registration.
377 >     * Returns message string for bounds exceptions on registration
378       */
379 <    private String badRegister() {
379 >    private String badRegister(long s) {
380          return "Attempt to register more than " +
381 <            MAX_COUNT + " parties for " + toString();
381 >            MAX_COUNT + " parties for " + stateToString(s);
382      }
383  
384      /**
# Line 388 | Line 390 | public class Phaser {
390          if (par == null)
391              return state;
392          Phaser rt = root;
393 <        long s;
394 <        int phase, rPhase;
395 <        while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0 &&
396 <               (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
397 <            if (rPhase < 0 || (s & UNARRIVED_MASK) == 0) {
398 <                long ps = par.parent == null? par.state : par.reconcileState();
399 <                int pPhase = (int)(ps >>> PHASE_SHIFT);
400 <                if (pPhase < 0 || pPhase == ((phase + 1) & MAX_PHASE)) {
401 <                    if (state != s)
402 <                        continue;
403 <                    long p = s & PARTIES_MASK;
404 <                    long next = ((((long) pPhase) << PHASE_SHIFT) |
405 <                                 (p >>> PARTIES_SHIFT) | p);
406 <                    if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
407 <                        return next;
408 <                }
393 >        for (;;) {
394 >            long s, u;
395 >            int phase, rPhase, pPhase;
396 >            if ((phase = (int)((s = state)>>> PHASE_SHIFT)) < 0 ||
397 >                (rPhase = (int)(rt.state >>> PHASE_SHIFT)) == phase)
398 >                return s;
399 >            long pState = par.parent == null? par.state : par.reconcileState();
400 >            if (state == s) {
401 >                if ((rPhase < 0 || (s & UNARRIVED_MASK) == 0) &&
402 >                    ((pPhase = (int)(pState >>> PHASE_SHIFT)) < 0 ||
403 >                     pPhase == ((phase + 1) & MAX_PHASE)))
404 >                    UNSAFE.compareAndSwapLong
405 >                        (this, stateOffset, s,
406 >                         (((long) pPhase) << PHASE_SHIFT) |
407 >                         (u = s & PARTIES_MASK) |
408 >                         (u >>> PARTIES_SHIFT)); // reset unarrived to parties
409 >                else
410 >                    releaseWaiters(phase); // help release others
411              }
408            if (state == s)
409                releaseWaiters(phase); // help release others
412          }
411        return s;
413      }
414  
415      /**
# Line 505 | Line 506 | public class Phaser {
506          if (parties < 0)
507              throw new IllegalArgumentException();
508          if (parties > MAX_COUNT)
509 <            throw new IllegalStateException(badRegister());
509 >            throw new IllegalStateException(badRegister(state));
510          if (parties == 0)
511              return getPhase();
512          return doRegister(parties);
# Line 673 | Line 674 | public class Phaser {
674       * @return the phase number, or a negative value if terminated
675       */
676      public final int getPhase() {
677 <        return (int)((parent == null? state : reconcileState()) >>> PHASE_SHIFT);
677 >        return (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
678      }
679  
680      /**
# Line 682 | Line 683 | public class Phaser {
683       * @return the number of parties
684       */
685      public int getRegisteredParties() {
686 <        return partiesOf(parent == null? state : reconcileState());
686 >        return partiesOf(parent==null? state : reconcileState());
687      }
688  
689      /**
# Line 692 | Line 693 | public class Phaser {
693       * @return the number of arrived parties
694       */
695      public int getArrivedParties() {
696 <        return arrivedOf(parent == null? state : reconcileState());
696 >        return arrivedOf(parent==null? state : reconcileState());
697      }
698  
699      /**
# Line 702 | Line 703 | public class Phaser {
703       * @return the number of unarrived parties
704       */
705      public int getUnarrivedParties() {
706 <        return unarrivedOf(parent == null? state : reconcileState());
706 >        return unarrivedOf(parent==null? state : reconcileState());
707      }
708  
709      /**
# Line 769 | Line 770 | public class Phaser {
770      }
771  
772      /**
773 <     * Returns a string identifying this phaser, as well as its state.
774 <     * The state, in brackets, includes the String {@code "phase = "}
775 <     * followed by the phase number, {@code "parties = "} followed by
776 <     * the number of registered parties, and {@code "arrived = "}
777 <     * followed by the number of arrived parties.
773 >     * Returns a string identifying this phaser, as well as its
774 >     * state.  The state, in brackets, includes the String {@code
775 >     * "phase = "} followed by the phase number, {@code "parties = "}
776 >     * followed by the number of registered parties, and {@code
777 >     * "arrived = "} followed by the number of arrived parties.
778       *
779       * @return a string identifying this barrier, as well as its state
780       */
781      public String toString() {
782 <        long s = reconcileState();
782 >        return stateToString(reconcileState());
783 >    }
784 >
785 >    /**
786 >     * Implementation of toString and string-based error messages
787 >     */
788 >    private String stateToString(long s) {
789          return super.toString() +
790              "[phase = " + phaseOf(s) +
791              " parties = " + partiesOf(s) +
792              " arrived = " + arrivedOf(s) + "]";
793      }
794  
795 +    // Waiting mechanics
796 +
797      /**
798 <     * Removes and signals threads from queue for phase.
798 >     * Removes and signals threads from queue for phase
799       */
800      private void releaseWaiters(int phase) {
801          AtomicReference<QNode> head = queueFor(phase);
# Line 829 | Line 838 | public class Phaser {
838       * block. The value trades off good-citizenship vs big unnecessary
839       * slowdowns.
840       */
841 <    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
841 >    static final int SPINS_PER_ARRIVAL = NCPU < 2? 1 : 1 << 8;
842  
843      /**
844       * Possibly blocks and waits for phase to advance unless aborted.
845       *
846       * @param phase current phase
847 <     * @param node if non-null, the wait node to track interrupt and timeout;
847 >     * @param node if nonnull, the wait node to track interrupt and timeout;
848       * if null, denotes noninterruptible wait
849       * @return current phase
850       */
851      private int internalAwaitAdvance(int phase, QNode node) {
852          Phaser current = this;       // to eventually wait at root if tiered
853 <        Phaser par = parent;
845 <        boolean queued = false;
846 <        int spins = SPINS_PER_ARRIVAL;
853 >        boolean queued = false;      // true when node is enqueued
854          int lastUnarrived = -1;      // to increase spins upon change
855 <        long s;
856 <        int p;
857 <        while ((p = (int)((s = current.state) >>> PHASE_SHIFT)) == phase) {
858 <            int unarrived = (int)(s & UNARRIVED_MASK);
859 <            if (unarrived != lastUnarrived) {
855 >        int spins = SPINS_PER_ARRIVAL;
856 >        for (;;) {
857 >            int p, unarrived;
858 >            Phaser par;
859 >            long s = current.state;
860 >            if ((p = (int)(s >>> PHASE_SHIFT)) != phase) {
861 >                if (node != null)
862 >                    node.onRelease();
863 >                releaseWaiters(phase);
864 >                return p;
865 >            }
866 >            else if ((unarrived = (int)(s & UNARRIVED_MASK)) != lastUnarrived) {
867                  if ((lastUnarrived = unarrived) < NCPU)
868                      spins += SPINS_PER_ARRIVAL;
869              }
870 <            else if (unarrived == 0 && par != null) {
870 >            else if (unarrived == 0 && (par = current.parent) != null) {
871                  current = par;       // if all arrived, use parent
872                  par = par.parent;
873 +                lastUnarrived = -1;
874              }
875              else if (spins > 0)
876                  --spins;
877 <            else if (node == null)
877 >            else if (node == null)   // must be noninterruptible
878                  node = new QNode(this, phase, false, false, 0L);
879 <            else if (node.isReleasable())
880 <                break;
879 >            else if (node.isReleasable()) {
880 >                if ((int)(reconcileState() >>> PHASE_SHIFT) == phase)
881 >                    return phase;    // aborted
882 >            }
883              else if (!queued)
884                  queued = tryEnqueue(phase, node);
885              else {
# Line 873 | Line 890 | public class Phaser {
890                  }
891              }
892          }
876        if (node != null) {
877            if (node.thread != null)
878                node.thread = null;
879            if (!node.interruptible && node.wasInterrupted)
880                Thread.currentThread().interrupt();
881        }
882        if (p == phase)
883            p = (int)(reconcileState() >>> PHASE_SHIFT);
884        if (p != phase)
885            releaseWaiters(phase);
886        return p;
893      }
894  
895      /**
# Line 955 | Line 961 | public class Phaser {
961                  LockSupport.unpark(t);
962              }
963          }
964 +
965 +        void onRelease() { // actions upon return from internalAwaitAdvance
966 +            if (!interruptible && wasInterrupted)
967 +                Thread.currentThread().interrupt();
968 +            if (thread != null)
969 +                thread = null;
970 +        }
971 +
972      }
973  
974      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines