ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.49 by dl, Fri Nov 5 23:01:47 2010 UTC vs.
Revision 1.50 by dl, Sat Nov 6 16:12:10 2010 UTC

# Line 298 | Line 298 | public class Phaser {
298       * use two of them, alternating across even and odd phases.
299       * Subphasers share queues with root to speed up releases.
300       */
301 <    private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
302 <    private final AtomicReference<QNode> oddQ  = new AtomicReference<QNode>();
301 >    private final AtomicReference<QNode> evenQ;
302 >    private final AtomicReference<QNode> oddQ;
303  
304      private AtomicReference<QNode> queueFor(int phase) {
305 <        Phaser r = root;
306 <        return ((phase & 1) == 0) ? r.evenQ : r.oddQ;
305 >        return ((phase & 1) == 0) ? evenQ : oddQ;
306      }
307  
308      /**
# Line 325 | Line 324 | public class Phaser {
324              while ((phase = phaseOf(s)) >= 0 &&
325                     (rootPhase = phaseOf(root.state)) != phase &&
326                     (rootPhase < 0 || unarrivedOf(s) == 0)) {
327 <                long parentState = par.getReconciledState();
328 <                int parentPhase = phaseOf(parentState);
329 <                int parties = partiesOf(s);
330 <                long next = trippedStateFor(parentPhase, parties);
331 <                if (phaseOf(root.state) == rootPhase &&
333 <                    parentPhase != phase &&
334 <                    state == s && casState(s, next)) {
335 <                    releaseWaiters(phase);
336 <                    if (parties == 0) // exit if the final deregistration
337 <                        break;
327 >                int parentPhase = phaseOf(par.getReconciledState());
328 >                if (parentPhase != phase) {
329 >                    long next = trippedStateFor(parentPhase, partiesOf(s));
330 >                    if (state == s)
331 >                        UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
332                  }
333                  s = state;
334              }
# Line 348 | Line 342 | public class Phaser {
342       * phaser will need to first register for it.
343       */
344      public Phaser() {
345 <        this(null);
345 >        this(null, 0);
346      }
347  
348      /**
# Line 372 | Line 366 | public class Phaser {
366       * @param parent the parent phaser
367       */
368      public Phaser(Phaser parent) {
369 <        int phase = 0;
376 <        this.parent = parent;
377 <        if (parent != null) {
378 <            this.root = parent.root;
379 <            phase = parent.register();
380 <        }
381 <        else
382 <            this.root = this;
383 <        this.state = trippedStateFor(phase, 0);
369 >        this(parent, 0);
370      }
371  
372      /**
# Line 397 | Line 383 | public class Phaser {
383      public Phaser(Phaser parent, int parties) {
384          if (parties < 0 || parties > ushortMask)
385              throw new IllegalArgumentException("Illegal number of parties");
386 <        int phase = 0;
386 >        int phase;
387          this.parent = parent;
388          if (parent != null) {
389 <            this.root = parent.root;
389 >            Phaser r = parent.root;
390 >            this.root = r;
391 >            this.evenQ = r.evenQ;
392 >            this.oddQ = r.oddQ;
393              phase = parent.register();
394          }
395 <        else
395 >        else {
396              this.root = this;
397 +            this.evenQ = new AtomicReference<QNode>();
398 +            this.oddQ = new AtomicReference<QNode>();
399 +            phase = 0;
400 +        }
401          this.state = trippedStateFor(phase, parties);
402      }
403  
404      /**
405       * Adds a new unarrived party to this phaser.
406       * If an ongoing invocation of {@link #onAdvance} is in progress,
407 <     * this method waits until its completion before registering.
407 >     * this method may wait until its completion before registering.
408       *
409       * @return the arrival phase number to which this registration applied
410       * @throws IllegalStateException if attempting to register more
# Line 424 | Line 417 | public class Phaser {
417      /**
418       * Adds the given number of new unarrived parties to this phaser.
419       * If an ongoing invocation of {@link #onAdvance} is in progress,
420 <     * this method waits until its completion before registering.
420 >     * this method may wait until its completion before registering.
421       *
422       * @param parties the number of additional parties required to trip barrier
423       * @return the arrival phase number to which this registration applied
# Line 452 | Line 445 | public class Phaser {
445              int u = unarrivedOf(s);
446              int unarrived = u + registrations;
447              int parties = p + registrations;
448 <            if (par == null || phase == phaseOf(root.state)) {
449 <                if (parties > ushortMask || unarrived > ushortMask)
450 <                    throw new IllegalStateException(badBounds(parties,
451 <                                                              unarrived));
452 <                else if (p != 0 && u == 0)       // back off if advancing
453 <                    Thread.yield();              // not worth actually blocking
454 <                else if (casState(s, stateFor(phase, parties, unarrived)))
448 >            if (u == 0 && p != 0)  // if tripped, wait for advance
449 >                untimedWait(phase);
450 >            else if (parties > ushortMask)
451 >                throw new IllegalStateException(badBounds(parties, unarrived));
452 >            else if (par == null || phaseOf(root.state) == phase) {
453 >                long next = stateFor(phase, parties, unarrived);
454 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
455                      break;
456              }
457          }
# Line 482 | Line 475 | public class Phaser {
475          while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
476              int parties = partiesOf(s);
477              int unarrived = unarrivedOf(s) - 1;
478 <            if (parties == 0 || unarrived < 0)
479 <                throw new IllegalStateException(badBounds(parties,
480 <                                                          unarrived));
488 <            else if (unarrived > 0) {           // Not the last arrival
489 <                if (casState(s, s - 1))         // s-1 adds one arrival
490 <                    break;
478 >            if (unarrived > 0) {                // Not the last arrival
479 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1))
480 >                    break;                      // s-1 adds one arrival
481              }
482 +            else if (unarrived < 0)
483 +                throw new IllegalStateException(badBounds(parties, unarrived));
484              else if (par == null) {             // directly trip
485 <                if (casState(s, trippedStateFor(onAdvance(phase, parties) ? -1 :
486 <                                                ((phase + 1) & phaseMask),
487 <                                                parties))) {
485 >                long next = trippedStateFor(onAdvance(phase, parties) ? -1 :
486 >                                            ((phase + 1) & phaseMask),
487 >                                            parties);
488 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) {
489                      releaseWaiters(phase);
490                      break;
491                  }
492              }
493 <            else if (phaseOf(root.state) == phase && casState(s, s - 1)) {
493 >            else if (phaseOf(root.state) == phase &&
494 >                     UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1)) {
495                  par.arrive();                   // cascade to parent
496                  reconcileState();
497                  break;
# Line 527 | Line 521 | public class Phaser {
521          while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
522              int parties = partiesOf(s) - 1;
523              int unarrived = unarrivedOf(s) - 1;
524 <            if (parties < 0 || unarrived < 0)
525 <                throw new IllegalStateException(badBounds(parties,
526 <                                                          unarrived));
533 <            else if (unarrived > 0) {
534 <                if (casState(s, stateFor(phase, parties, unarrived)))
524 >            if (unarrived > 0) {
525 >                long next = stateFor(phase, parties, unarrived);
526 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
527                      break;
528              }
529 +            else if (unarrived < 0)
530 +                throw new IllegalStateException(badBounds(parties, unarrived));
531              else if (par == null) {
532 <                if (casState(s, trippedStateFor(onAdvance(phase, parties)? -1:
533 <                                                (phase + 1) & phaseMask,
534 <                                                parties))) {
532 >                long next = trippedStateFor(onAdvance(phase, parties)? -1:
533 >                                            (phase + 1) & phaseMask,
534 >                                            parties);
535 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) {
536                      releaseWaiters(phase);
537                      break;
538                  }
539              }
540 <            else if (phaseOf(root.state) == phase &&
541 <                     casState(s, stateFor(phase, parties, 0))) {
542 <                if (parties == 0)
543 <                    par.arriveAndDeregister();
544 <                else
545 <                    par.arrive();
546 <                reconcileState();
547 <                break;
540 >            else if (phaseOf(root.state) == phase) {
541 >                long next = stateFor(phase, parties, 0);
542 >                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) {
543 >                    if (parties == 0)
544 >                        par.arriveAndDeregister();
545 >                    else
546 >                        par.arrive();
547 >                    reconcileState();
548 >                    break;
549 >                }
550              }
551          }
552          return phase;
# Line 576 | Line 573 | public class Phaser {
573       * Awaits the phase of the barrier to advance from the given phase
574       * value, returning immediately if the current phase of the
575       * barrier is not equal to the given phase value or this barrier
576 <     * is terminated.  It is an unenforced usage error for an
580 <     * unregistered party to invoke this method.
576 >     * is terminated.
577       *
578       * @param phase an arrival phase number, or negative value if
579       * terminated; this argument is normally the value returned by a
# Line 599 | Line 595 | public class Phaser {
595       * value, throwing {@code InterruptedException} if interrupted
596       * while waiting, or returning immediately if the current phase of
597       * the barrier is not equal to the given phase value or this
598 <     * barrier is terminated. It is an unenforced usage error for an
603 <     * unregistered party to invoke this method.
598 >     * barrier is terminated.
599       *
600       * @param phase an arrival phase number, or negative value if
601       * terminated; this argument is normally the value returned by a
# Line 625 | Line 620 | public class Phaser {
620       * InterruptedException} if interrupted while waiting, or
621       * returning immediately if the current phase of the barrier is
622       * not equal to the given phase value or this barrier is
623 <     * terminated.  It is an unenforced usage error for an
629 <     * unregistered party to invoke this method.
623 >     * terminated.
624       *
625       * @param phase an arrival phase number, or negative value if
626       * terminated; this argument is normally the value returned by a
# Line 663 | Line 657 | public class Phaser {
657          Phaser r = root;    // force at root then reconcile
658          long s;
659          while (phaseOf(s = r.state) >= 0)
660 <            r.casState(s, stateFor(-1, partiesOf(s), unarrivedOf(s)));
660 >            UNSAFE.compareAndSwapLong(r, stateOffset, s,
661 >                                      stateFor(-1, partiesOf(s),
662 >                                               unarrivedOf(s)));
663          reconcileState();
664          releaseWaiters(0);  // ensure wakeups on both queues
665          releaseWaiters(1);
# Line 750 | Line 746 | public class Phaser {
746       * which case no advance occurs.
747       *
748       * <p>The arguments to this method provide the state of the phaser
749 <     * prevailing for the current transition. (When called from within
750 <     * an implementation of {@code onAdvance} the values returned by
751 <     * methods such as {@code getPhase} may or may not reliably
752 <     * indicate the state to which this transition applies.)
749 >     * prevailing for the current transition.  The results and effects
750 >     * of invoking phase-related methods (including {@code getPhase}
751 >     * as well as arrival, registration, and waiting methods) from
752 >     * within {@code onAdvance} are unspecified and should not be
753 >     * relied on. Similarly, while it is possible to override this
754 >     * method to produce side-effects visible to participating tasks,
755 >     * it is in general safe to do so only in designs in which all
756 >     * parties register before any arrive, and all {@link
757 >     * #awaitAdvance} at each phase.
758       *
759       * <p>The default version returns {@code true} when the number of
760       * registered parties is zero. Normally, overrides that arrange
# Line 879 | Line 880 | public class Phaser {
880      }
881  
882      /**
883 +     * The number of times to spin before blocking waiting for advance.
884 +     */
885 +    static final int MAX_SPINS =
886 +        Runtime.getRuntime().availableProcessors() == 1 ? 0 : 1 << 8;
887 +
888 +    /**
889       * Enqueues node and waits unless aborted or signalled.
890       *
891       * @return current phase
# Line 887 | Line 894 | public class Phaser {
894          QNode node = null;
895          boolean queued = false;
896          boolean interrupted = false;
897 +        int spins = MAX_SPINS;
898          int p;
899          while ((p = getPhase()) == phase) {
900              if (Thread.interrupted())
901                  interrupted = true;
902 +            else if (spins > 0) {
903 +                if (--spins == 0)
904 +                    Thread.yield();
905 +            }
906              else if (node == null)
907                  node = new QNode(this, phase, false, false, 0, 0);
908              else if (!queued)
# Line 914 | Line 926 | public class Phaser {
926          QNode node = null;
927          boolean queued = false;
928          boolean interrupted = false;
929 +        int spins = MAX_SPINS;
930          int p;
931          while ((p = getPhase()) == phase && !interrupted) {
932              if (Thread.interrupted())
933                  interrupted = true;
934 +            else if (spins > 0) {
935 +                if (--spins == 0)
936 +                    Thread.yield();
937 +            }
938              else if (node == null)
939                  node = new QNode(this, phase, true, false, 0, 0);
940              else if (!queued)
# Line 944 | Line 961 | public class Phaser {
961          QNode node = null;
962          boolean queued = false;
963          boolean interrupted = false;
964 +        int spins = MAX_SPINS;
965          int p;
966          while ((p = getPhase()) == phase && !interrupted) {
967              if (Thread.interrupted())
968                  interrupted = true;
969              else if (nanos - (System.nanoTime() - startTime) <= 0)
970                  break;
971 +            else if (spins > 0) {
972 +                if (--spins == 0)
973 +                    Thread.yield();
974 +            }
975              else if (node == null)
976                  node = new QNode(this, phase, true, true, startTime, nanos);
977              else if (!queued)
# Line 974 | Line 996 | public class Phaser {
996      private static final long stateOffset =
997          objectFieldOffset("state", Phaser.class);
998  
977    private final boolean casState(long cmp, long val) {
978        return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
979    }
980
999      private static long objectFieldOffset(String field, Class<?> klazz) {
1000          try {
1001              return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines