ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Phaser.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Phaser.java (file contents):
Revision 1.13 by dl, Sun Oct 24 21:45:16 2010 UTC vs.
Revision 1.14 by dl, Fri Nov 5 23:01:29 2010 UTC

# Line 293 | Line 293 | public class Phaser {
293  
294      /**
295       * Heads of Treiber stacks for waiting threads. To eliminate
296 <     * contention while releasing some threads while adding others, we
296 >     * contention when releasing some threads while adding others, we
297       * use two of them, alternating across even and odd phases.
298 +     * Subphasers share queues with root to speed up releases.
299       */
300      private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
301      private final AtomicReference<QNode> oddQ  = new AtomicReference<QNode>();
302  
303      private AtomicReference<QNode> queueFor(int phase) {
304 <        return ((phase & 1) == 0) ? evenQ : oddQ;
304 >        Phaser r = root;
305 >        return ((phase & 1) == 0) ? r.evenQ : r.oddQ;
306      }
307  
308      /**
# Line 315 | Line 317 | public class Phaser {
317       * Recursively resolves state.
318       */
319      private long reconcileState() {
320 <        Phaser p = parent;
320 >        Phaser par = parent;
321          long s = state;
322 <        if (p != null) {
323 <            while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
324 <                long parentState = p.getReconciledState();
322 >        if (par != null) {
323 >            int phase, rootPhase;
324 >            while ((phase = phaseOf(s)) >= 0 &&
325 >                   (rootPhase = phaseOf(root.state)) != phase &&
326 >                   (rootPhase < 0 || unarrivedOf(s) == 0)) {
327 >                long parentState = par.getReconciledState();
328                  int parentPhase = phaseOf(parentState);
329 <                int phase = phaseOf(s = state);
330 <                if (phase != parentPhase) {
331 <                    long next = trippedStateFor(parentPhase, partiesOf(s));
332 <                    if (casState(s, next)) {
333 <                        releaseWaiters(phase);
334 <                        s = next;
335 <                    }
329 >                int parties = partiesOf(s);
330 >                long next = trippedStateFor(parentPhase, parties);
331 >                if (phaseOf(root.state) == rootPhase &&
332 >                    parentPhase != phase &&
333 >                    state == s && casState(s, next)) {
334 >                    releaseWaiters(phase);
335 >                    if (parties == 0) // exit if the final deregistration
336 >                        break;
337                  }
338 +                s = state;
339              }
340          }
341          return s;
# Line 402 | Line 409 | public class Phaser {
409  
410      /**
411       * Adds a new unarrived party to this phaser.
412 +     * If an ongoing invocation of {@link #onAdvance} is in progress,
413 +     * this method waits until its completion before registering.
414       *
415       * @return the arrival phase number to which this registration applied
416       * @throws IllegalStateException if attempting to register more
# Line 413 | Line 422 | public class Phaser {
422  
423      /**
424       * Adds the given number of new unarrived parties to this phaser.
425 +     * If an ongoing invocation of {@link #onAdvance} is in progress,
426 +     * this method waits until its completion before registering.
427       *
428       * @param parties the number of additional parties required to trip barrier
429       * @return the arrival phase number to which this registration applied
# Line 432 | Line 443 | public class Phaser {
443       * Shared code for register, bulkRegister
444       */
445      private int doRegister(int registrations) {
446 +        Phaser par = parent;
447 +        long s;
448          int phase;
449 <        for (;;) {
450 <            long s = getReconciledState();
451 <            phase = phaseOf(s);
452 <            int unarrived = unarrivedOf(s) + registrations;
453 <            int parties = partiesOf(s) + registrations;
454 <            if (phase < 0)
455 <                break;
456 <            if (parties > ushortMask || unarrived > ushortMask)
457 <                throw new IllegalStateException(badBounds(parties, unarrived));
458 <            if (phase == phaseOf(root.state) &&
459 <                casState(s, stateFor(phase, parties, unarrived)))
460 <                break;
449 >        while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
450 >            int p = partiesOf(s);
451 >            int u = unarrivedOf(s);
452 >            int unarrived = u + registrations;
453 >            int parties = p + registrations;
454 >            if (par == null || phase == phaseOf(root.state)) {
455 >                if (parties > ushortMask || unarrived > ushortMask)
456 >                    throw new IllegalStateException(badBounds(parties,
457 >                                                              unarrived));
458 >                else if (p != 0 && u == 0)       // back off if advancing
459 >                    Thread.yield();              // not worth actually blocking
460 >                else if (casState(s, stateFor(phase, parties, unarrived)))
461 >                    break;
462 >            }
463          }
464          return phase;
465      }
# Line 460 | Line 475 | public class Phaser {
475       * of unarrived parties would become negative
476       */
477      public int arrive() {
478 +        Phaser par = parent;
479 +        long s;
480          int phase;
481 <        for (;;) {
465 <            long s = state;
466 <            phase = phaseOf(s);
467 <            if (phase < 0)
468 <                break;
481 >        while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
482              int parties = partiesOf(s);
483              int unarrived = unarrivedOf(s) - 1;
484 <            if (unarrived > 0) {        // Not the last arrival
485 <                if (casState(s, s - 1)) // s-1 adds one arrival
484 >            if (parties == 0 || unarrived < 0)
485 >                throw new IllegalStateException(badBounds(parties,
486 >                                                          unarrived));
487 >            else if (unarrived > 0) {           // Not the last arrival
488 >                if (casState(s, s - 1))         // s-1 adds one arrival
489                      break;
490              }
491 <            else if (unarrived == 0) {  // the last arrival
492 <                Phaser par = parent;
493 <                if (par == null) {      // directly trip
494 <                    if (casState
495 <                        (s,
496 <                         trippedStateFor(onAdvance(phase, parties) ? -1 :
481 <                                         ((phase + 1) & phaseMask), parties))) {
482 <                        releaseWaiters(phase);
483 <                        break;
484 <                    }
485 <                }
486 <                else {                  // cascade to parent
487 <                    if (casState(s, s - 1)) { // zeroes unarrived
488 <                        par.arrive();
489 <                        reconcileState();
490 <                        break;
491 <                    }
491 >            else if (par == null) {             // directly trip
492 >                if (casState(s, trippedStateFor(onAdvance(phase, parties) ? -1 :
493 >                                                ((phase + 1) & phaseMask),
494 >                                                parties))) {
495 >                    releaseWaiters(phase);
496 >                    break;
497                  }
498              }
499 <            else if (phase != phaseOf(root.state)) // or if unreconciled
499 >            else if (phaseOf(root.state) == phase && casState(s, s - 1)) {
500 >                par.arrive();                   // cascade to parent
501                  reconcileState();
502 <            else
503 <                throw new IllegalStateException(badBounds(parties, unarrived));
502 >                break;
503 >            }
504          }
505          return phase;
506      }
# Line 513 | Line 519 | public class Phaser {
519       * of registered or unarrived parties would become negative
520       */
521      public int arriveAndDeregister() {
522 <        // similar code to arrive, but too different to merge
522 >        // similar to arrive, but too different to merge
523          Phaser par = parent;
524 +        long s;
525          int phase;
526 <        for (;;) {
520 <            long s = state;
521 <            phase = phaseOf(s);
522 <            if (phase < 0)
523 <                break;
526 >        while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) {
527              int parties = partiesOf(s) - 1;
528              int unarrived = unarrivedOf(s) - 1;
529 <            if (parties >= 0) {
530 <                if (unarrived > 0 || (unarrived == 0 && par != null)) {
531 <                    if (casState
532 <                        (s,
533 <                         stateFor(phase, parties, unarrived))) {
534 <                        if (unarrived == 0) {
535 <                            par.arriveAndDeregister();
536 <                            reconcileState();
537 <                        }
538 <                        break;
539 <                    }
540 <                    continue;
541 <                }
539 <                if (unarrived == 0) {
540 <                    if (casState
541 <                        (s,
542 <                         trippedStateFor(onAdvance(phase, parties) ? -1 :
543 <                                         ((phase + 1) & phaseMask), parties))) {
544 <                        releaseWaiters(phase);
545 <                        break;
546 <                    }
547 <                    continue;
548 <                }
549 <                if (par != null && phase != phaseOf(root.state)) {
550 <                    reconcileState();
551 <                    continue;
529 >            if (parties < 0 || unarrived < 0)
530 >                throw new IllegalStateException(badBounds(parties,
531 >                                                          unarrived));
532 >            else if (unarrived > 0) {
533 >                if (casState(s, stateFor(phase, parties, unarrived)))
534 >                    break;
535 >            }
536 >            else if (par == null) {
537 >                if (casState(s, trippedStateFor(onAdvance(phase, parties)? -1:
538 >                                                (phase + 1) & phaseMask,
539 >                                                parties))) {
540 >                    releaseWaiters(phase);
541 >                    break;
542                  }
543              }
544 <            throw new IllegalStateException(badBounds(parties, unarrived));
544 >            else if (phaseOf(root.state) == phase &&
545 >                     casState(s, stateFor(phase, parties, 0))) {
546 >                if (parties == 0)
547 >                    par.arriveAndDeregister();
548 >                else
549 >                    par.arrive();
550 >                reconcileState();
551 >                break;
552 >            }
553          }
554          return phase;
555      }
# Line 589 | Line 587 | public class Phaser {
587      public int awaitAdvance(int phase) {
588          if (phase < 0)
589              return phase;
590 <        long s = getReconciledState();
593 <        int p = phaseOf(s);
590 >        int p = getPhase();
591          if (p != phase)
592              return p;
596        if (unarrivedOf(s) == 0 && parent != null)
597            parent.awaitAdvance(phase);
598        // Fall here even if parent waited, to reconcile and help release
593          return untimedWait(phase);
594      }
595  
# Line 618 | Line 612 | public class Phaser {
612          throws InterruptedException {
613          if (phase < 0)
614              return phase;
615 <        long s = getReconciledState();
622 <        int p = phaseOf(s);
615 >        int p = getPhase();
616          if (p != phase)
617              return p;
625        if (unarrivedOf(s) == 0 && parent != null)
626            parent.awaitAdvanceInterruptibly(phase);
618          return interruptibleWait(phase);
619      }
620  
# Line 651 | Line 642 | public class Phaser {
642      public int awaitAdvanceInterruptibly(int phase,
643                                           long timeout, TimeUnit unit)
644          throws InterruptedException, TimeoutException {
645 +        long nanos = unit.toNanos(timeout);
646          if (phase < 0)
647              return phase;
648 <        long s = getReconciledState();
657 <        int p = phaseOf(s);
648 >        int p = getPhase();
649          if (p != phase)
650              return p;
651 <        if (unarrivedOf(s) == 0 && parent != null)
661 <            parent.awaitAdvanceInterruptibly(phase, timeout, unit);
662 <        return timedWait(phase, unit.toNanos(timeout));
651 >        return timedWait(phase, nanos);
652      }
653  
654      /**
# Line 670 | Line 659 | public class Phaser {
659       * unexpected exceptions.
660       */
661      public void forceTermination() {
662 <        for (;;) {
663 <            long s = getReconciledState();
664 <            int phase = phaseOf(s);
665 <            int parties = partiesOf(s);
666 <            int unarrived = unarrivedOf(s);
667 <            if (phase < 0 ||
668 <                casState(s, stateFor(-1, parties, unarrived))) {
680 <                releaseWaiters(0);
681 <                releaseWaiters(1);
682 <                if (parent != null)
683 <                    parent.forceTermination();
684 <                return;
685 <            }
686 <        }
662 >        Phaser r = root;    // force at root then reconcile
663 >        long s;
664 >        while (phaseOf(s = r.state) >= 0)
665 >            r.casState(s, stateFor(-1, partiesOf(s), unarrivedOf(s)));
666 >        reconcileState();
667 >        releaseWaiters(0);  // ensure wakeups on both queues
668 >        releaseWaiters(1);
669      }
670  
671      /**
# Line 703 | Line 685 | public class Phaser {
685       * @return the number of parties
686       */
687      public int getRegisteredParties() {
688 <        return partiesOf(state);
688 >        return partiesOf(getReconciledState());
689      }
690  
691      /**
# Line 713 | Line 695 | public class Phaser {
695       * @return the number of arrived parties
696       */
697      public int getArrivedParties() {
698 <        return arrivedOf(state);
698 >        return arrivedOf(getReconciledState());
699      }
700  
701      /**
# Line 723 | Line 705 | public class Phaser {
705       * @return the number of unarrived parties
706       */
707      public int getUnarrivedParties() {
708 <        return unarrivedOf(state);
708 >        return unarrivedOf(getReconciledState());
709      }
710  
711      /**
# Line 777 | Line 759 | public class Phaser {
759       * termination for other reasons should also preserve this
760       * property.
761       *
780     * <p>You may override this method to perform an action with side
781     * effects visible to participating tasks, but it is only sensible
782     * to do so in designs where all parties register before any
783     * arrive, and all {@link #awaitAdvance} at each phase.
784     * Otherwise, you cannot ensure lack of interference from other
785     * parties during the invocation of this method. Additionally,
786     * method {@code onAdvance} may be invoked more than once per
787     * transition if registrations are intermixed with arrivals.
788     *
762       * @param phase the phase number on entering the barrier
763       * @param registeredParties the current number of registered parties
764       * @return {@code true} if this barrier should terminate

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines