ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.54 by dl, Sat Nov 13 13:10:04 2010 UTC vs.
Revision 1.58 by dl, Wed Nov 24 15:48:01 2010 UTC

# Line 86 | Line 86 | import java.util.concurrent.locks.LockSu
86   * #forceTermination} is also available to abruptly release waiting
87   * threads and allow them to terminate.
88   *
89 < * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged
90 < * in tree structures) to reduce contention. Phasers with large
91 < * numbers of parties that would otherwise experience heavy
89 > * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
90 > * constructed in tree structures) to reduce contention. Phasers with
91 > * large numbers of parties that would otherwise experience heavy
92   * synchronization contention costs may instead be set up so that
93   * groups of sub-phasers share a common parent.  This may greatly
94   * increase throughput even though it incurs greater per-operation
# Line 240 | Line 240 | public class Phaser {
240       */
241      private volatile long state;
242  
243 <    private static final int  MAX_COUNT      = 0xffff;
243 >    private static final int  MAX_PARTIES    = 0xffff;
244      private static final int  MAX_PHASE      = 0x7fffffff;
245      private static final int  PARTIES_SHIFT  = 16;
246      private static final int  PHASE_SHIFT    = 32;
247 <    private static final long UNARRIVED_MASK = 0xffffL;
248 <    private static final long PARTIES_MASK   = 0xffff0000L;
247 >    private static final int  UNARRIVED_MASK = 0xffff;
248 >    private static final long PARTIES_MASK   = 0xffff0000L; // for masking long
249      private static final long ONE_ARRIVAL    = 1L;
250      private static final long ONE_PARTY      = 1L << PARTIES_SHIFT;
251      private static final long TERMINATION_PHASE  = -1L << PHASE_SHIFT;
# Line 253 | Line 253 | public class Phaser {
253      // The following unpacking methods are usually manually inlined
254  
255      private static int unarrivedOf(long s) {
256 <        return (int) (s & UNARRIVED_MASK);
256 >        return (int)s & UNARRIVED_MASK;
257      }
258  
259      private static int partiesOf(long s) {
260 <        return ((int) (s & PARTIES_MASK)) >>> PARTIES_SHIFT;
260 >        return (int)s >>> PARTIES_SHIFT;
261      }
262  
263      private static int phaseOf(long s) {
# Line 303 | Line 303 | public class Phaser {
303       */
304      private int doArrive(long adj) {
305          for (;;) {
306 <            long s;
307 <            int phase, unarrived;
308 <            if ((phase = (int)((s = state) >>> PHASE_SHIFT)) < 0)
306 >            long s = state;
307 >            int phase = (int)(s >>> PHASE_SHIFT);
308 >            if (phase < 0)
309                  return phase;
310 <            else if ((unarrived = (int)(s & UNARRIVED_MASK)) == 0)
310 >            int unarrived = (int)s & UNARRIVED_MASK;
311 >            if (unarrived == 0)
312                  checkBadArrive(s);
313 <            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)){
313 >            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
314                  if (unarrived == 1) {
314                    Phaser par;
315                      long p = s & PARTIES_MASK; // unshifted parties field
316                      long lu = p >>> PARTIES_SHIFT;
317                      int u = (int)lu;
318                      int nextPhase = (phase + 1) & MAX_PHASE;
319                      long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
320 <                    if ((par = parent) == null) {
321 <                        UNSAFE.compareAndSwapLong
322 <                            (this, stateOffset, s, onAdvance(phase, u)?
323 <                             next | TERMINATION_PHASE : next);
320 >                    final Phaser parent = this.parent;
321 >                    if (parent == null) {
322 >                        if (onAdvance(phase, u))
323 >                            next |= TERMINATION_PHASE; // obliterate phase
324 >                        UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
325                          releaseWaiters(phase);
326                      }
327                      else {
328 <                        par.doArrive(u == 0?
329 <                                     ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
330 <                        if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
328 >                        parent.doArrive((u == 0) ?
329 >                                        ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
330 >                        if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase ||
331                              ((int)(state >>> PHASE_SHIFT) != nextPhase &&
332                               !UNSAFE.compareAndSwapLong(this, stateOffset,
333                                                          s, next)))
# Line 352 | Line 353 | public class Phaser {
353      /**
354       * Implementation of register, bulkRegister
355       *
356 <     * @param registrations number to add to both parties and unarrived fields
356 >     * @param registrations number to add to both parties and
357 >     * unarrived fields. Must be greater than zero.
358       */
359      private int doRegister(int registrations) {
360 <        long adj = (long)registrations; // adjustment to state
361 <        adj |= adj << PARTIES_SHIFT;
362 <        Phaser par = parent;
360 >        // adjustment to state
361 >        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
362 >        final Phaser parent = this.parent;
363          for (;;) {
364 <            int phase, parties;
365 <            long s = par == null? state : reconcileState();
366 <            if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
364 >            long s = (parent == null) ? state : reconcileState();
365 >            int parties = (int)s >>> PARTIES_SHIFT;
366 >            int phase = (int)(s >>> PHASE_SHIFT);
367 >            if (phase < 0)
368                  return phase;
369 <            if ((parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT) != 0 &&
367 <                (s & UNARRIVED_MASK) == 0)
369 >            else if (parties != 0 && ((int)s & UNARRIVED_MASK) == 0)
370                  internalAwaitAdvance(phase, null); // wait for onAdvance
371 <            else if (parties + registrations > MAX_COUNT)
371 >            else if (registrations > MAX_PARTIES - parties)
372                  throw new IllegalStateException(badRegister(s));
373              else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
374                  return phase;
# Line 374 | Line 376 | public class Phaser {
376      }
377  
378      /**
379 <     * Returns message string for bounds exceptions on registration
379 >     * Returns message string for out of bounds exceptions on registration.
380       */
381      private String badRegister(long s) {
382          return "Attempt to register more than " +
383 <            MAX_COUNT + " parties for " + stateToString(s);
383 >            MAX_PARTIES + " parties for " + stateToString(s);
384      }
385  
386      /**
387 <     * Recursively resolves lagged phase propagation from root if
386 <     * necessary.
387 >     * Recursively resolves lagged phase propagation from root if necessary.
388       */
389      private long reconcileState() {
390          Phaser par = parent;
391 <        if (par == null)
392 <            return state;
393 <        Phaser rt = root;
394 <        for (;;) {
395 <            long s, u;
396 <            int phase, rPhase, pPhase;
397 <            if ((phase = (int)((s = state)>>> PHASE_SHIFT)) < 0 ||
398 <                (rPhase = (int)(rt.state >>> PHASE_SHIFT)) == phase)
399 <                return s;
400 <            long pState = par.parent == null? par.state : par.reconcileState();
401 <            if (state == s) {
402 <                if ((rPhase < 0 || (s & UNARRIVED_MASK) == 0) &&
403 <                    ((pPhase = (int)(pState >>> PHASE_SHIFT)) < 0 ||
404 <                     pPhase == ((phase + 1) & MAX_PHASE)))
405 <                    UNSAFE.compareAndSwapLong
406 <                        (this, stateOffset, s,
407 <                         (((long) pPhase) << PHASE_SHIFT) |
408 <                         (u = s & PARTIES_MASK) |
408 <                         (u >>> PARTIES_SHIFT)); // reset unarrived to parties
409 <                else
410 <                    releaseWaiters(phase); // help release others
391 >        long s = state;
392 >        if (par != null) {
393 >            Phaser rt = root;
394 >            int phase, rPhase;
395 >            while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
396 >                   (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
397 >                if ((int)(par.state >>> PHASE_SHIFT) != rPhase)
398 >                    par.reconcileState();
399 >                else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
400 >                    long u = s & PARTIES_MASK; // reset unarrived to parties
401 >                    long next = ((((long) rPhase) << PHASE_SHIFT) | u |
402 >                                 (u >>> PARTIES_SHIFT));
403 >                    if (state == s &&
404 >                        UNSAFE.compareAndSwapLong(this, stateOffset,
405 >                                                  s, s = next))
406 >                        break;
407 >                }
408 >                s = state;
409              }
410          }
411 +        return s;
412      }
413  
414      /**
# Line 434 | Line 433 | public class Phaser {
433      }
434  
435      /**
436 <     * Creates a new phaser with the given parent, without any
438 <     * initially registered parties. If parent is non-null this phaser
439 <     * is registered with the parent and its initial phase number is
440 <     * the same as that of parent phaser.
436 >     * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
437       *
438       * @param parent the parent phaser
439       */
# Line 447 | Line 443 | public class Phaser {
443  
444      /**
445       * Creates a new phaser with the given parent and number of
446 <     * registered unarrived parties. If parent is non-null, this phaser
447 <     * is registered with the parent and its initial phase number is
448 <     * the same as that of parent phaser.
446 >     * registered unarrived parties. If parent is non-null, this
447 >     * phaser is registered with the parent and its initial phase
448 >     * number is the same as that of parent phaser.  If the number of
449 >     * parties is zero, the parent phaser will not proceed until this
450 >     * child phaser registers parties and advances, or this child
451 >     * phaser deregisters with its parent, or the parent is otherwise
452 >     * terminated.  This child Phaser will be deregistered from its
453 >     * parent automatically upon any invocation of the child's {@link
454 >     * #arriveAndDeregister} method that results in the child's number
455 >     * of registered parties becoming zero. (Although rarely
456 >     * appropriate, this child may also explicity deregister from its
457 >     * parent using {@code getParent().arriveAndDeregister()}.)  After
458 >     * deregistration, the child cannot re-register. (Instead, you can
459 >     * create a new child Phaser.)
460       *
461       * @param parent the parent phaser
462       * @param parties the number of parties required to trip barrier
# Line 457 | Line 464 | public class Phaser {
464       * or greater than the maximum number of parties supported
465       */
466      public Phaser(Phaser parent, int parties) {
467 <        if (parties < 0 || parties > MAX_COUNT)
467 >        if (parties >>> PARTIES_SHIFT != 0)
468              throw new IllegalArgumentException("Illegal number of parties");
469          int phase;
470          this.parent = parent;
# Line 466 | Line 473 | public class Phaser {
473              this.root = r;
474              this.evenQ = r.evenQ;
475              this.oddQ = r.oddQ;
476 <            phase = parent.register();
476 >            phase = parent.doRegister(1);
477          }
478          else {
479              this.root = this;
# Line 475 | Line 482 | public class Phaser {
482              phase = 0;
483          }
484          long p = (long)parties;
485 <        this.state = (((long) phase) << PHASE_SHIFT) | p | (p << PARTIES_SHIFT);
485 >        this.state = (((long)phase) << PHASE_SHIFT) | p | (p << PARTIES_SHIFT);
486      }
487  
488      /**
# Line 505 | Line 512 | public class Phaser {
512      public int bulkRegister(int parties) {
513          if (parties < 0)
514              throw new IllegalArgumentException();
508        if (parties > MAX_COUNT)
509            throw new IllegalStateException(badRegister(state));
515          if (parties == 0)
516              return getPhase();
517          return doRegister(parties);
# Line 575 | Line 580 | public class Phaser {
580      public int awaitAdvance(int phase) {
581          if (phase < 0)
582              return phase;
583 <        int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
584 <        if (p != phase)
585 <            return p;
581 <        return internalAwaitAdvance(phase, null);
583 >        long s = (parent == null) ? state : reconcileState();
584 >        int p = (int)(s >>> PHASE_SHIFT);
585 >        return (p != phase) ? p : internalAwaitAdvance(phase, null);
586      }
587  
588      /**
# Line 599 | Line 603 | public class Phaser {
603          throws InterruptedException {
604          if (phase < 0)
605              return phase;
606 <        int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
607 <        if (p != phase)
608 <            return p;
609 <        QNode node = new QNode(this, phase, true, false, 0L);
610 <        p = internalAwaitAdvance(phase, node);
611 <        if (node.wasInterrupted)
612 <            throw new InterruptedException();
613 <        else
614 <            return p;
606 >        long s = (parent == null) ? state : reconcileState();
607 >        int p = (int)(s >>> PHASE_SHIFT);
608 >        if (p == phase) {
609 >            QNode node = new QNode(this, phase, true, false, 0L);
610 >            p = internalAwaitAdvance(phase, node);
611 >            if (node.wasInterrupted)
612 >                throw new InterruptedException();
613 >        }
614 >        return p;
615      }
616  
617      /**
# Line 633 | Line 637 | public class Phaser {
637      public int awaitAdvanceInterruptibly(int phase,
638                                           long timeout, TimeUnit unit)
639          throws InterruptedException, TimeoutException {
636        long nanos = unit.toNanos(timeout);
640          if (phase < 0)
641              return phase;
642 <        int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
643 <        if (p != phase)
644 <            return p;
645 <        QNode node = new QNode(this, phase, true, true, nanos);
646 <        p = internalAwaitAdvance(phase, node);
647 <        if (node.wasInterrupted)
648 <            throw new InterruptedException();
649 <        else if (p == phase)
650 <            throw new TimeoutException();
651 <        else
652 <            return p;
642 >        long s = (parent == null) ? state : reconcileState();
643 >        int p = (int)(s >>> PHASE_SHIFT);
644 >        if (p == phase) {
645 >            long nanos = unit.toNanos(timeout);
646 >            QNode node = new QNode(this, phase, true, true, nanos);
647 >            p = internalAwaitAdvance(phase, node);
648 >            if (node.wasInterrupted)
649 >                throw new InterruptedException();
650 >            else if (p == phase)
651 >                throw new TimeoutException();
652 >        }
653 >        return p;
654      }
655  
656      /**
657 <     * Forces this barrier to enter termination state. Counts of
658 <     * arrived and registered parties are unaffected. If this phaser
659 <     * has a parent, it too is terminated. This method may be useful
660 <     * for coordinating recovery after one or more tasks encounter
661 <     * unexpected exceptions.
657 >     * Forces this barrier to enter termination state.  Counts of
658 >     * arrived and registered parties are unaffected.  If this phaser
659 >     * is a member of a tiered set of phasers, then all of the phasers
660 >     * in the set are terminated.  If this phaser is already
661 >     * terminated, this method has no effect.  This method may be
662 >     * useful for coordinating recovery after one or more tasks
663 >     * encounter unexpected exceptions.
664       */
665      public void forceTermination() {
666 <        Phaser r = root;    // force at root then reconcile
666 >        // Only need to change root state
667 >        final Phaser root = this.root;
668          long s;
669 <        while ((s = r.state) >= 0)
670 <            UNSAFE.compareAndSwapLong(r, stateOffset, s, s | TERMINATION_PHASE);
671 <        reconcileState();
672 <        releaseWaiters(0); // signal all threads
673 <        releaseWaiters(1);
669 >        while ((s = root.state) >= 0) {
670 >            if (UNSAFE.compareAndSwapLong(root, stateOffset,
671 >                                          s, s | TERMINATION_PHASE)) {
672 >                releaseWaiters(0); // signal all threads
673 >                releaseWaiters(1);
674 >                return;
675 >            }
676 >        }
677      }
678  
679      /**
# Line 674 | Line 684 | public class Phaser {
684       * @return the phase number, or a negative value if terminated
685       */
686      public final int getPhase() {
687 <        return (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
687 >        return (int)(root.state >>> PHASE_SHIFT);
688      }
689  
690      /**
# Line 683 | Line 693 | public class Phaser {
693       * @return the number of parties
694       */
695      public int getRegisteredParties() {
696 <        return partiesOf(parent==null? state : reconcileState());
696 >        return partiesOf(state);
697      }
698  
699      /**
# Line 731 | Line 741 | public class Phaser {
741       * @return {@code true} if this barrier has been terminated
742       */
743      public boolean isTerminated() {
744 <        return (parent == null? state : reconcileState()) < 0;
744 >        return root.state < 0L;
745      }
746  
747      /**
# Line 795 | Line 805 | public class Phaser {
805      // Waiting mechanics
806  
807      /**
808 <     * Removes and signals threads from queue for phase
808 >     * Removes and signals threads from queue for phase.
809       */
810      private void releaseWaiters(int phase) {
811          AtomicReference<QNode> head = queueFor(phase);
# Line 809 | Line 819 | public class Phaser {
819          }
820      }
821  
812    /**
813     * Tries to enqueue given node in the appropriate wait queue.
814     *
815     * @return true if successful
816     */
817    private boolean tryEnqueue(int phase, QNode node) {
818        releaseWaiters(phase-1); // ensure old queue clean
819        AtomicReference<QNode> head = queueFor(phase);
820        QNode q = head.get();
821        return ((q == null || q.phase == phase) &&
822                (int)(root.state >>> PHASE_SHIFT) == phase &&
823                head.compareAndSet(node.next = q, node));
824    }
825
822      /** The number of CPUs, for spin control */
823      private static final int NCPU = Runtime.getRuntime().availableProcessors();
824  
# Line 834 | Line 830 | public class Phaser {
830       * avoid it when threads regularly arrive: When a thread in
831       * internalAwaitAdvance notices another arrival before blocking,
832       * and there appear to be enough CPUs available, it spins
833 <     * SPINS_PER_ARRIVAL more times before continuing to try to
834 <     * block. The value trades off good-citizenship vs big unnecessary
835 <     * slowdowns.
833 >     * SPINS_PER_ARRIVAL more times before blocking. Plus, even on
834 >     * uniprocessors, there is at least one intervening Thread.yield
835 >     * before blocking. The value trades off good-citizenship vs big
836 >     * unnecessary slowdowns.
837       */
838 <    static final int SPINS_PER_ARRIVAL = NCPU < 2? 1 : 1 << 8;
838 >    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
839  
840      /**
841       * Possibly blocks and waits for phase to advance unless aborted.
# Line 853 | Line 850 | public class Phaser {
850          boolean queued = false;      // true when node is enqueued
851          int lastUnarrived = -1;      // to increase spins upon change
852          int spins = SPINS_PER_ARRIVAL;
853 <        for (;;) {
854 <            int p, unarrived;
853 >        long s;
854 >        int p;
855 >        while ((p = (int)((s = current.state) >>> PHASE_SHIFT)) == phase) {
856              Phaser par;
857 <            long s = current.state;
858 <            if ((p = (int)(s >>> PHASE_SHIFT)) != phase) {
859 <                if (node != null)
860 <                    node.onRelease();
863 <                releaseWaiters(phase);
864 <                return p;
865 <            }
866 <            else if ((unarrived = (int)(s & UNARRIVED_MASK)) != lastUnarrived) {
857 >            int unarrived = (int)s & UNARRIVED_MASK;
858 >            if (unarrived != lastUnarrived) {
859 >                if (lastUnarrived == -1) // ensure old queue clean
860 >                    releaseWaiters(phase-1);
861                  if ((lastUnarrived = unarrived) < NCPU)
862                      spins += SPINS_PER_ARRIVAL;
863              }
# Line 872 | Line 866 | public class Phaser {
866                  par = par.parent;
867                  lastUnarrived = -1;
868              }
869 <            else if (spins > 0)
870 <                --spins;
869 >            else if (spins > 0) {
870 >                if (--spins == (SPINS_PER_ARRIVAL >>> 1))
871 >                    Thread.yield();  // yield midway through spin
872 >            }
873              else if (node == null)   // must be noninterruptible
874                  node = new QNode(this, phase, false, false, 0L);
875              else if (node.isReleasable()) {
876 <                if ((int)(reconcileState() >>> PHASE_SHIFT) == phase)
876 >                if ((p = (int)(root.state >>> PHASE_SHIFT)) != phase)
877 >                    break;
878 >                else
879                      return phase;    // aborted
880              }
881 <            else if (!queued)
882 <                queued = tryEnqueue(phase, node);
881 >            else if (!queued) {      // push onto queue
882 >                AtomicReference<QNode> head = queueFor(phase);
883 >                QNode q = head.get();
884 >                if (q == null || q.phase == phase) {
885 >                    node.next = q;
886 >                    if ((p = (int)(root.state >>> PHASE_SHIFT)) != phase)
887 >                        break;       // recheck to avoid stale enqueue
888 >                    else
889 >                        queued = head.compareAndSet(q, node);
890 >                }
891 >            }
892              else {
893                  try {
894                      ForkJoinPool.managedBlock(node);
# Line 890 | Line 897 | public class Phaser {
897                  }
898              }
899          }
900 +        releaseWaiters(phase);
901 +        if (node != null)
902 +            node.onRelease();
903 +        return p;
904      }
905  
906      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines