ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Phaser.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Phaser.java (file contents):
Revision 1.14 by dl, Fri Nov 5 23:01:29 2010 UTC vs.
Revision 1.15 by dl, Sat Nov 6 16:11:50 2010 UTC

# Line 297 | Line 297 | public class Phaser {
297       * use two of them, alternating across even and odd phases.
298       * Subphasers share queues with root to speed up releases.
299       */
300 <    private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
301 <    private final AtomicReference<QNode> oddQ  = new AtomicReference<QNode>();
300 >    private final AtomicReference<QNode> evenQ;
301 >    private final AtomicReference<QNode> oddQ;
302  
303      private AtomicReference<QNode> queueFor(int phase) {
304 <        Phaser r = root;
305 <        return ((phase & 1) == 0) ? r.evenQ : r.oddQ;
304 >        return ((phase & 1) == 0) ? evenQ : oddQ;
305      }
306  
307      /**
# Line 324 | Line 323 | public class Phaser {
323              while ((phase = phaseOf(s)) >= 0 &&
324                     (rootPhase = phaseOf(root.state)) != phase &&
325                     (rootPhase < 0 || unarrivedOf(s) == 0)) {
326 <                long parentState = par.getReconciledState();
327 <                int parentPhase = phaseOf(parentState);
328 <                int parties = partiesOf(s);
329 <                long next = trippedStateFor(parentPhase, parties);
330 <                if (phaseOf(root.state) == rootPhase &&
332 <                    parentPhase != phase &&
333 <                    state == s && casState(s, next)) {
334 <                    releaseWaiters(phase);
335 <                    if (parties == 0) // exit if the final deregistration
336 <                        break;
326 >                int parentPhase = phaseOf(par.getReconciledState());
327 >                if (parentPhase != phase) {
328 >                    long next = trippedStateFor(parentPhase, partiesOf(s));
329 >                    if (state == s)
330 >                        UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
331                  }
332                  s = state;
333              }
# Line 347 | Line 341 | public class Phaser {
341       * phaser will need to first register for it.
342       */
343      public Phaser() {
344 <        this(null);
344 >        this(null, 0);
345      }
346  
347      /**
# Line 371 | Line 365 | public class Phaser {
365       * @param parent the parent phaser
366       */
367      public Phaser(Phaser parent) {
368 <        int phase = 0;
375 <        this.parent = parent;
376 <        if (parent != null) {
377 <            this.root = parent.root;
378 <            phase = parent.register();
379 <        }
380 <        else
381 <            this.root = this;
382 <        this.state = trippedStateFor(phase, 0);
368 >        this(parent, 0);
369      }
370  
371      /**
# Line 396 | Line 382 | public class Phaser {
382      public Phaser(Phaser parent, int parties) {
383          if (parties < 0 || parties > ushortMask)
384              throw new IllegalArgumentException("Illegal number of parties");
385 <        int phase = 0;
385 >        int phase;
386          this.parent = parent;
387          if (parent != null) {
388 <            this.root = parent.root;
388 >            Phaser r = parent.root;
389 >            this.root = r;
390 >            this.evenQ = r.evenQ;
391 >            this.oddQ = r.oddQ;
392              phase = parent.register();
393          }
394 <        else
394 >        else {
395              this.root = this;
396 +            this.evenQ = new AtomicReference<QNode>();
397 +            this.oddQ = new AtomicReference<QNode>();
398 +            phase = 0;
399 +        }
400          this.state = trippedStateFor(phase, parties);
401      }
402  
403      /**
404       * Adds a new unarrived party to this phaser.
405       * If an ongoing invocation of {@link #onAdvance} is in progress,
406 <     * this method waits until its completion before registering.
406 >     * this method may wait until its completion before registering.
407       *
408       * @return the arrival phase number to which this registration applied
409       * @throws IllegalStateException if attempting to register more
# Line 423 | Line 416 | public class Phaser {
416      /**
417       * Adds the given number of new unarrived parties to this phaser.
418       * If an ongoing invocation of {@link #onAdvance} is in progress,
419 <     * this method waits until its completion before registering.
419 >     * this method may wait until its completion before registering.
420       *
421       * @param parties the number of additional parties required to trip barrier
422       * @return the arrival phase number to which this registration applied
# Line 451 | Line 444 | public class Phaser {
444              int u = unarrivedOf(s);
445              int unarrived = u + registrations;
446              int parties = p + registrations;
447 <            if (par == null || phase == phaseOf(root.state)) {
448 <                if (parties > ushortMask || unarrived > ushortMask)
449 <                    throw new IllegalStateException(badBounds(parties,
450 <                                                              unarrived));
451 <                else if (p != 0 && u == 0)       // back off if advancing
452 <                    Thread.yield();              // not worth actually blocking
453 <                else if (casState(s, stateFor(phase, parties, unarrived)))
447 >            if (u == 0 && p != 0)  // if tripped, wait for advance
448 >                untimedWait(phase);
449 >            else if (parties > ushortMask)
450 >                throw new IllegalStateException(badBounds(parties, unarrived));
451 >            else if (par == null || phaseOf(root.state) == phase) {
452 >                long next = stateFor(phase, parties, unarrived);
453 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
454                      break;
455              }
456          }
# Line 481 | Line 474 | public class Phaser {
474          while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
475              int parties = partiesOf(s);
476              int unarrived = unarrivedOf(s) - 1;
477 <            if (parties == 0 || unarrived < 0)
478 <                throw new IllegalStateException(badBounds(parties,
479 <                                                          unarrived));
487 <            else if (unarrived > 0) {           // Not the last arrival
488 <                if (casState(s, s - 1))         // s-1 adds one arrival
489 <                    break;
477 >            if (unarrived > 0) {                // Not the last arrival
478 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1))
479 >                    break;                      // s-1 adds one arrival
480              }
481 +            else if (unarrived < 0)
482 +                throw new IllegalStateException(badBounds(parties, unarrived));
483              else if (par == null) {             // directly trip
484 <                if (casState(s, trippedStateFor(onAdvance(phase, parties) ? -1 :
485 <                                                ((phase + 1) & phaseMask),
486 <                                                parties))) {
484 >                long next = trippedStateFor(onAdvance(phase, parties) ? -1 :
485 >                                            ((phase + 1) & phaseMask),
486 >                                            parties);
487 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) {
488                      releaseWaiters(phase);
489                      break;
490                  }
491              }
492 <            else if (phaseOf(root.state) == phase && casState(s, s - 1)) {
492 >            else if (phaseOf(root.state) == phase &&
493 >                     UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1)) {
494                  par.arrive();                   // cascade to parent
495                  reconcileState();
496                  break;
# Line 526 | Line 520 | public class Phaser {
520          while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
521              int parties = partiesOf(s) - 1;
522              int unarrived = unarrivedOf(s) - 1;
523 <            if (parties < 0 || unarrived < 0)
524 <                throw new IllegalStateException(badBounds(parties,
525 <                                                          unarrived));
532 <            else if (unarrived > 0) {
533 <                if (casState(s, stateFor(phase, parties, unarrived)))
523 >            if (unarrived > 0) {
524 >                long next = stateFor(phase, parties, unarrived);
525 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
526                      break;
527              }
528 +            else if (unarrived < 0)
529 +                throw new IllegalStateException(badBounds(parties, unarrived));
530              else if (par == null) {
531 <                if (casState(s, trippedStateFor(onAdvance(phase, parties)? -1:
532 <                                                (phase + 1) & phaseMask,
533 <                                                parties))) {
531 >                long next = trippedStateFor(onAdvance(phase, parties)? -1:
532 >                                            (phase + 1) & phaseMask,
533 >                                            parties);
534 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) {
535                      releaseWaiters(phase);
536                      break;
537                  }
538              }
539 <            else if (phaseOf(root.state) == phase &&
540 <                     casState(s, stateFor(phase, parties, 0))) {
541 <                if (parties == 0)
542 <                    par.arriveAndDeregister();
543 <                else
544 <                    par.arrive();
545 <                reconcileState();
546 <                break;
539 >            else if (phaseOf(root.state) == phase) {
540 >                long next = stateFor(phase, parties, 0);
541 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) {
542 >                    if (parties == 0)
543 >                        par.arriveAndDeregister();
544 >                    else
545 >                        par.arrive();
546 >                    reconcileState();
547 >                    break;
548 >                }
549              }
550          }
551          return phase;
# Line 575 | Line 572 | public class Phaser {
572       * Awaits the phase of the barrier to advance from the given phase
573       * value, returning immediately if the current phase of the
574       * barrier is not equal to the given phase value or this barrier
575 <     * is terminated.  It is an unenforced usage error for an
579 <     * unregistered party to invoke this method.
575 >     * is terminated.
576       *
577       * @param phase an arrival phase number, or negative value if
578       * terminated; this argument is normally the value returned by a
# Line 598 | Line 594 | public class Phaser {
594       * value, throwing {@code InterruptedException} if interrupted
595       * while waiting, or returning immediately if the current phase of
596       * the barrier is not equal to the given phase value or this
597 <     * barrier is terminated. It is an unenforced usage error for an
602 <     * unregistered party to invoke this method.
597 >     * barrier is terminated.
598       *
599       * @param phase an arrival phase number, or negative value if
600       * terminated; this argument is normally the value returned by a
# Line 624 | Line 619 | public class Phaser {
619       * InterruptedException} if interrupted while waiting, or
620       * returning immediately if the current phase of the barrier is
621       * not equal to the given phase value or this barrier is
622 <     * terminated.  It is an unenforced usage error for an
628 <     * unregistered party to invoke this method.
622 >     * terminated.
623       *
624       * @param phase an arrival phase number, or negative value if
625       * terminated; this argument is normally the value returned by a
# Line 662 | Line 656 | public class Phaser {
656          Phaser r = root;    // force at root then reconcile
657          long s;
658          while (phaseOf(s = r.state) >= 0)
659 <            r.casState(s, stateFor(-1, partiesOf(s), unarrivedOf(s)));
659 >            UNSAFE.compareAndSwapLong(r, stateOffset, s,
660 >                                      stateFor(-1, partiesOf(s),
661 >                                               unarrivedOf(s)));
662          reconcileState();
663          releaseWaiters(0);  // ensure wakeups on both queues
664          releaseWaiters(1);
# Line 749 | Line 745 | public class Phaser {
745       * which case no advance occurs.
746       *
747       * <p>The arguments to this method provide the state of the phaser
748 <     * prevailing for the current transition. (When called from within
749 <     * an implementation of {@code onAdvance} the values returned by
750 <     * methods such as {@code getPhase} may or may not reliably
751 <     * indicate the state to which this transition applies.)
748 >     * prevailing for the current transition.  The results and effects
749 >     * of invoking phase-related methods (including {@code getPhase}
750 >     * as well as arrival, registration, and waiting methods) from
751 >     * within {@code onAdvance} are unspecified and should not be
752 >     * relied on. Similarly, while it is possible to override this
753 >     * method to produce side-effects visible to participating tasks,
754 >     * it is in general safe to do so only in designs in which all
755 >     * parties register before any arrive, and all {@link
756 >     * #awaitAdvance} at each phase.
757       *
758       * <p>The default version returns {@code true} when the number of
759       * registered parties is zero. Normally, overrides that arrange
# Line 878 | Line 879 | public class Phaser {
879      }
880  
881      /**
882 +     * The number of times to spin before blocking waiting for advance.
883 +     */
884 +    static final int MAX_SPINS =
885 +        Runtime.getRuntime().availableProcessors() == 1 ? 0 : 1 << 8;
886 +
887 +    /**
888       * Enqueues node and waits unless aborted or signalled.
889       *
890       * @return current phase
# Line 886 | Line 893 | public class Phaser {
893          QNode node = null;
894          boolean queued = false;
895          boolean interrupted = false;
896 +        int spins = MAX_SPINS;
897          int p;
898          while ((p = getPhase()) == phase) {
899              if (Thread.interrupted())
900                  interrupted = true;
901 +            else if (spins > 0) {
902 +                if (--spins == 0)
903 +                    Thread.yield();
904 +            }
905              else if (node == null)
906                  node = new QNode(this, phase, false, false, 0, 0);
907              else if (!queued)
# Line 913 | Line 925 | public class Phaser {
925          QNode node = null;
926          boolean queued = false;
927          boolean interrupted = false;
928 +        int spins = MAX_SPINS;
929          int p;
930          while ((p = getPhase()) == phase && !interrupted) {
931              if (Thread.interrupted())
932                  interrupted = true;
933 +            else if (spins > 0) {
934 +                if (--spins == 0)
935 +                    Thread.yield();
936 +            }
937              else if (node == null)
938                  node = new QNode(this, phase, true, false, 0, 0);
939              else if (!queued)
# Line 943 | Line 960 | public class Phaser {
960          QNode node = null;
961          boolean queued = false;
962          boolean interrupted = false;
963 +        int spins = MAX_SPINS;
964          int p;
965          while ((p = getPhase()) == phase && !interrupted) {
966              if (Thread.interrupted())
967                  interrupted = true;
968              else if (nanos - (System.nanoTime() - startTime) <= 0)
969                  break;
970 +            else if (spins > 0) {
971 +                if (--spins == 0)
972 +                    Thread.yield();
973 +            }
974              else if (node == null)
975                  node = new QNode(this, phase, true, true, startTime, nanos);
976              else if (!queued)
# Line 973 | Line 995 | public class Phaser {
995      private static final long stateOffset =
996          objectFieldOffset("state", Phaser.class);
997  
976    private final boolean casState(long cmp, long val) {
977        return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
978    }
979
998      private static long objectFieldOffset(String field, Class<?> klazz) {
999          try {
1000              return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines