ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/Phaser.java
(Generate patch)

Comparing jsr166/src/jsr166y/Phaser.java (file contents):
Revision 1.52 by dl, Sat Nov 13 01:27:13 2010 UTC vs.
Revision 1.57 by dl, Fri Nov 19 16:03:24 2010 UTC

# Line 240 | Line 240 | public class Phaser {
240       */
241      private volatile long state;
242  
243 <    private static final int  MAX_COUNT      = 0xffff;
243 >    private static final int  MAX_PARTIES    = 0xffff;
244      private static final int  MAX_PHASE      = 0x7fffffff;
245      private static final int  PARTIES_SHIFT  = 16;
246      private static final int  PHASE_SHIFT    = 32;
247 <    private static final long UNARRIVED_MASK = 0xffffL;
248 <    private static final long PARTIES_MASK   = 0xffff0000L;
247 >    private static final int  UNARRIVED_MASK = 0xffff;
248 >    private static final long PARTIES_MASK   = 0xffff0000L; // for masking long
249      private static final long ONE_ARRIVAL    = 1L;
250      private static final long ONE_PARTY      = 1L << PARTIES_SHIFT;
251      private static final long TERMINATION_PHASE  = -1L << PHASE_SHIFT;
# Line 253 | Line 253 | public class Phaser {
253      // The following unpacking methods are usually manually inlined
254  
255      private static int unarrivedOf(long s) {
256 <        return (int) (s & UNARRIVED_MASK);
256 >        return (int)s & UNARRIVED_MASK;
257      }
258  
259      private static int partiesOf(long s) {
260 <        return ((int) (s & PARTIES_MASK)) >>> PARTIES_SHIFT;
260 >        return (int)s >>> PARTIES_SHIFT;
261      }
262  
263      private static int phaseOf(long s) {
# Line 302 | Line 302 | public class Phaser {
302       * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
303       */
304      private int doArrive(long adj) {
305 <        long s;
306 <        int phase, unarrived;
307 <        while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0) {
308 <            if ((unarrived = (int)(s & UNARRIVED_MASK)) != 0) {
309 <                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)) {
310 <                    if (unarrived == 1) {
311 <                        Phaser par;
312 <                        long p = s & PARTIES_MASK; // unshifted parties field
313 <                        long lu = p >>> PARTIES_SHIFT;
314 <                        int u = (int)lu;
315 <                        int nextPhase = (phase + 1) & MAX_PHASE;
316 <                        long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
317 <                        if ((par = parent) == null) {
318 <                            UNSAFE.compareAndSwapLong
319 <                                (this, stateOffset, s, onAdvance(phase, u)?
320 <                                 next | TERMINATION_PHASE : next);
321 <                            releaseWaiters(phase);
322 <                        }
323 <                        else {
324 <                            par.doArrive(u == 0?
325 <                                         ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
326 <                            if ((int)(par.state >>> PHASE_SHIFT) != nextPhase ||
327 <                                ((int)(state >>> PHASE_SHIFT) != nextPhase &&
328 <                                 !UNSAFE.compareAndSwapLong(this, stateOffset,
329 <                                                            s, next)))
330 <                                reconcileState();
331 <                        }
305 >        for (;;) {
306 >            long s = state;
307 >            int phase = (int)(s >>> PHASE_SHIFT);
308 >            if (phase < 0)
309 >                return phase;
310 >            int unarrived = (int)s & UNARRIVED_MASK;
311 >            if (unarrived == 0)
312 >                checkBadArrive(s);
313 >            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
314 >                if (unarrived == 1) {
315 >                    long p = s & PARTIES_MASK; // unshifted parties field
316 >                    long lu = p >>> PARTIES_SHIFT;
317 >                    int u = (int)lu;
318 >                    int nextPhase = (phase + 1) & MAX_PHASE;
319 >                    long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
320 >                    final Phaser parent = this.parent;
321 >                    if (parent == null) {
322 >                        if (onAdvance(phase, u))
323 >                            next |= TERMINATION_PHASE; // obliterate phase
324 >                        UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
325 >                        releaseWaiters(phase);
326 >                    }
327 >                    else {
328 >                        parent.doArrive((u == 0) ?
329 >                                        ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
330 >                        if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase ||
331 >                            ((int)(state >>> PHASE_SHIFT) != nextPhase &&
332 >                             !UNSAFE.compareAndSwapLong(this, stateOffset,
333 >                                                        s, next)))
334 >                            reconcileState();
335                      }
333                    break;
336                  }
337 +                return phase;
338              }
336            else if (state == s && reconcileState() == s) // recheck
337                throw new IllegalStateException(badArrive());
339          }
339        return phase;
340      }
341  
342      /**
343 <     * Returns message string for bounds exceptions on arrival.
344 <     * Declared out of-line from doArrive to reduce string op bulk.
343 >     * Rechecks state and throws bounds exceptions on arrival -- called
344 >     * only if unarrived is apparently zero.
345       */
346 <    private String badArrive() {
347 <        return ("Attempted arrival of unregistered party for " +
348 <                this.toString());
346 >    private void checkBadArrive(long s) {
347 >        if (reconcileState() == s)
348 >            throw new IllegalStateException
349 >                ("Attempted arrival of unregistered party for " +
350 >                 stateToString(s));
351      }
352  
353      /**
# Line 354 | Line 356 | public class Phaser {
356       * @param registrations number to add to both parties and unarrived fields
357       */
358      private int doRegister(int registrations) {
359 <        long adj = (long)registrations; // adjustment to state
360 <        adj |= adj << PARTIES_SHIFT;
361 <        Phaser par = parent;
362 <        long s;
363 <        int phase;
364 <        while ((phase = (int)((s = (par == null? state : reconcileState()))
365 <                              >>> PHASE_SHIFT)) >= 0) {
366 <            int parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT;
367 <            if (parties != 0 && (s & UNARRIVED_MASK) == 0)
359 >        // assert registrations > 0;
360 >        // adjustment to state
361 >        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
362 >        final Phaser parent = this.parent;
363 >        for (;;) {
364 >            long s = (parent == null) ? state : reconcileState();
365 >            int phase = (int)(s >>> PHASE_SHIFT);
366 >            if (phase < 0)
367 >                return phase;
368 >            int parties = (int)s >>> PARTIES_SHIFT;
369 >            if (parties != 0 && ((int)s & UNARRIVED_MASK) == 0)
370                  internalAwaitAdvance(phase, null); // wait for onAdvance
371 <            else if (parties + registrations > MAX_COUNT)
372 <                throw new IllegalStateException(badRegister());
371 >            else if (registrations > MAX_PARTIES - parties)
372 >                throw new IllegalStateException(badRegister(s));
373              else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
374 <                break;
374 >                return phase;
375          }
372        return phase;
376      }
377  
378      /**
379 <     * Returns message string for bounds exceptions on registration
379 >     * Returns message string for out of bounds exceptions on registration.
380       */
381 <    private String badRegister() {
382 <        return ("Attempt to register more than " + MAX_COUNT + " parties for "+
383 <                this.toString());
381 >    private String badRegister(long s) {
382 >        return "Attempt to register more than " +
383 >            MAX_PARTIES + " parties for " + stateToString(s);
384      }
385  
386      /**
387 <     * Recursively resolves lagged phase propagation from root if
385 <     * necessary.
387 >     * Recursively resolves lagged phase propagation from root if necessary.
388       */
389      private long reconcileState() {
390          Phaser par = parent;
391 <        if (par == null)
392 <            return state;
393 <        Phaser rt = root;
394 <        long s;
395 <        int phase, rPhase;
396 <        while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0 &&
397 <               (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
398 <            if (rPhase < 0 || (s & UNARRIVED_MASK) == 0) {
399 <                long ps = par.parent == null? par.state : par.reconcileState();
400 <                int pPhase = (int)(ps >>> PHASE_SHIFT);
401 <                if (pPhase < 0 || pPhase == ((phase + 1) & MAX_PHASE)) {
402 <                    if (state != s)
403 <                        continue;
404 <                    long p = s & PARTIES_MASK;
405 <                    long next = ((((long) pPhase) << PHASE_SHIFT) |
406 <                                 (p >>> PARTIES_SHIFT) | p);
405 <                    if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
406 <                        return next;
391 >        long s = state;
392 >        if (par != null) {
393 >            Phaser rt = root;
394 >            int phase, rPhase;
395 >            while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
396 >                   (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
397 >                if ((int)(par.state >>> PHASE_SHIFT) != rPhase)
398 >                    par.reconcileState();
399 >                else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
400 >                    long u = s & PARTIES_MASK; // reset unarrived to parties
401 >                    long next = ((((long) rPhase) << PHASE_SHIFT) | u |
402 >                                 (u >>> PARTIES_SHIFT));
403 >                    if (state == s &&
404 >                        UNSAFE.compareAndSwapLong(this, stateOffset,
405 >                                                  s, s = next))
406 >                        break;
407                  }
408 +                s = state;
409              }
409            if (state == s)
410                releaseWaiters(phase); // help release others
410          }
411          return s;
412      }
# Line 457 | Line 456 | public class Phaser {
456       * or greater than the maximum number of parties supported
457       */
458      public Phaser(Phaser parent, int parties) {
459 <        if (parties < 0 || parties > MAX_COUNT)
459 >        if (parties >>> PARTIES_SHIFT != 0)
460              throw new IllegalArgumentException("Illegal number of parties");
461          int phase;
462          this.parent = parent;
# Line 475 | Line 474 | public class Phaser {
474              phase = 0;
475          }
476          long p = (long)parties;
477 <        this.state = (((long) phase) << PHASE_SHIFT) | p | (p << PARTIES_SHIFT);
477 >        this.state = (((long)phase) << PHASE_SHIFT) | p | (p << PARTIES_SHIFT);
478      }
479  
480      /**
# Line 505 | Line 504 | public class Phaser {
504      public int bulkRegister(int parties) {
505          if (parties < 0)
506              throw new IllegalArgumentException();
508        if (parties > MAX_COUNT)
509            throw new IllegalStateException(badRegister());
507          if (parties == 0)
508              return getPhase();
509          return doRegister(parties);
# Line 575 | Line 572 | public class Phaser {
572      public int awaitAdvance(int phase) {
573          if (phase < 0)
574              return phase;
575 <        int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
576 <        if (p != phase)
577 <            return p;
581 <        return internalAwaitAdvance(phase, null);
575 >        long s = (parent == null) ? state : reconcileState();
576 >        int p = (int)(s >>> PHASE_SHIFT);
577 >        return (p != phase) ? p : internalAwaitAdvance(phase, null);
578      }
579  
580      /**
# Line 599 | Line 595 | public class Phaser {
595          throws InterruptedException {
596          if (phase < 0)
597              return phase;
598 <        int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
599 <        if (p != phase)
600 <            return p;
601 <        QNode node = new QNode(this, phase, true, false, 0L);
602 <        p = internalAwaitAdvance(phase, node);
603 <        if (node.wasInterrupted)
604 <            throw new InterruptedException();
605 <        else
606 <            return p;
598 >        long s = (parent == null) ? state : reconcileState();
599 >        int p = (int)(s >>> PHASE_SHIFT);
600 >        if (p == phase) {
601 >            QNode node = new QNode(this, phase, true, false, 0L);
602 >            p = internalAwaitAdvance(phase, node);
603 >            if (node.wasInterrupted)
604 >                throw new InterruptedException();
605 >        }
606 >        return p;
607      }
608  
609      /**
# Line 633 | Line 629 | public class Phaser {
629      public int awaitAdvanceInterruptibly(int phase,
630                                           long timeout, TimeUnit unit)
631          throws InterruptedException, TimeoutException {
636        long nanos = unit.toNanos(timeout);
632          if (phase < 0)
633              return phase;
634 <        int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT);
635 <        if (p != phase)
636 <            return p;
637 <        QNode node = new QNode(this, phase, true, true, nanos);
638 <        p = internalAwaitAdvance(phase, node);
639 <        if (node.wasInterrupted)
640 <            throw new InterruptedException();
641 <        else if (p == phase)
642 <            throw new TimeoutException();
643 <        else
644 <            return p;
634 >        long s = (parent == null) ? state : reconcileState();
635 >        int p = (int)(s >>> PHASE_SHIFT);
636 >        if (p == phase) {
637 >            long nanos = unit.toNanos(timeout);
638 >            QNode node = new QNode(this, phase, true, true, nanos);
639 >            p = internalAwaitAdvance(phase, node);
640 >            if (node.wasInterrupted)
641 >                throw new InterruptedException();
642 >            else if (p == phase)
643 >                throw new TimeoutException();
644 >        }
645 >        return p;
646      }
647  
648      /**
649 <     * Forces this barrier to enter termination state. Counts of
650 <     * arrived and registered parties are unaffected. If this phaser
651 <     * has a parent, it too is terminated. This method may be useful
652 <     * for coordinating recovery after one or more tasks encounter
653 <     * unexpected exceptions.
649 >     * Forces this barrier to enter termination state.  Counts of
650 >     * arrived and registered parties are unaffected.  If this phaser
651 >     * is a member of a tiered set of phasers, then all of the phasers
652 >     * in the set are terminated.  If this phaser is already
653 >     * terminated, this method has no effect.  This method may be
654 >     * useful for coordinating recovery after one or more tasks
655 >     * encounter unexpected exceptions.
656       */
657      public void forceTermination() {
658 <        Phaser r = root;    // force at root then reconcile
658 >        // Only need to change root state
659 >        final Phaser root = this.root;
660          long s;
661 <        while ((s = r.state) >= 0)
662 <            UNSAFE.compareAndSwapLong(r, stateOffset, s, s | TERMINATION_PHASE);
663 <        reconcileState();
664 <        releaseWaiters(0); // signal all threads
665 <        releaseWaiters(1);
661 >        while ((s = root.state) >= 0) {
662 >            if (UNSAFE.compareAndSwapLong(root, stateOffset,
663 >                                          s, s | TERMINATION_PHASE)) {
664 >                releaseWaiters(0); // signal all threads
665 >                releaseWaiters(1);
666 >                return;
667 >            }
668 >        }
669      }
670  
671      /**
# Line 674 | Line 676 | public class Phaser {
676       * @return the phase number, or a negative value if terminated
677       */
678      public final int getPhase() {
679 <        return (int)((parent == null? state : reconcileState()) >>> PHASE_SHIFT);
679 >        return (int)(root.state >>> PHASE_SHIFT);
680      }
681  
682      /**
# Line 683 | Line 685 | public class Phaser {
685       * @return the number of parties
686       */
687      public int getRegisteredParties() {
688 <        return partiesOf(parent == null? state : reconcileState());
688 >        return partiesOf(state);
689      }
690  
691      /**
# Line 693 | Line 695 | public class Phaser {
695       * @return the number of arrived parties
696       */
697      public int getArrivedParties() {
698 <        return arrivedOf(parent == null? state : reconcileState());
698 >        return arrivedOf(parent==null? state : reconcileState());
699      }
700  
701      /**
# Line 703 | Line 705 | public class Phaser {
705       * @return the number of unarrived parties
706       */
707      public int getUnarrivedParties() {
708 <        return unarrivedOf(parent == null? state : reconcileState());
708 >        return unarrivedOf(parent==null? state : reconcileState());
709      }
710  
711      /**
# Line 731 | Line 733 | public class Phaser {
733       * @return {@code true} if this barrier has been terminated
734       */
735      public boolean isTerminated() {
736 <        return (parent == null? state : reconcileState()) < 0;
736 >        return root.state < 0L;
737      }
738  
739      /**
# Line 779 | Line 781 | public class Phaser {
781       * @return a string identifying this barrier, as well as its state
782       */
783      public String toString() {
784 <        long s = reconcileState();
784 >        return stateToString(reconcileState());
785 >    }
786 >
787 >    /**
788 >     * Implementation of toString and string-based error messages
789 >     */
790 >    private String stateToString(long s) {
791          return super.toString() +
792              "[phase = " + phaseOf(s) +
793              " parties = " + partiesOf(s) +
794              " arrived = " + arrivedOf(s) + "]";
795      }
796  
797 +    // Waiting mechanics
798 +
799      /**
800 <     * Removes and signals threads from queue for phase
800 >     * Removes and signals threads from queue for phase.
801       */
802      private void releaseWaiters(int phase) {
803          AtomicReference<QNode> head = queueFor(phase);
# Line 801 | Line 811 | public class Phaser {
811          }
812      }
813  
804    /**
805     * Tries to enqueue given node in the appropriate wait queue.
806     *
807     * @return true if successful
808     */
809    private boolean tryEnqueue(int phase, QNode node) {
810        releaseWaiters(phase-1); // ensure old queue clean
811        AtomicReference<QNode> head = queueFor(phase);
812        QNode q = head.get();
813        return ((q == null || q.phase == phase) &&
814                (int)(root.state >>> PHASE_SHIFT) == phase &&
815                head.compareAndSet(node.next = q, node));
816    }
817
814      /** The number of CPUs, for spin control */
815      private static final int NCPU = Runtime.getRuntime().availableProcessors();
816  
# Line 826 | Line 822 | public class Phaser {
822       * avoid it when threads regularly arrive: When a thread in
823       * internalAwaitAdvance notices another arrival before blocking,
824       * and there appear to be enough CPUs available, it spins
825 <     * SPINS_PER_ARRIVAL more times before continuing to try to
826 <     * block. The value trades off good-citizenship vs big unnecessary
827 <     * slowdowns.
825 >     * SPINS_PER_ARRIVAL more times before blocking. Plus, even on
826 >     * uniprocessors, there is at least one intervening Thread.yield
827 >     * before blocking. The value trades off good-citizenship vs big
828 >     * unnecessary slowdowns.
829       */
830 <    static final int SPINS_PER_ARRIVAL = NCPU < 2? 1 : 1 << 8;
830 >    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
831  
832      /**
833       * Possibly blocks and waits for phase to advance unless aborted.
834       *
835       * @param phase current phase
836 <     * @param node if nonnull, the wait node to track interrupt and timeout;
836 >     * @param node if non-null, the wait node to track interrupt and timeout;
837       * if null, denotes noninterruptible wait
838       * @return current phase
839       */
840      private int internalAwaitAdvance(int phase, QNode node) {
841          Phaser current = this;       // to eventually wait at root if tiered
842 <        Phaser par = parent;
846 <        boolean queued = false;
847 <        int spins = SPINS_PER_ARRIVAL;
842 >        boolean queued = false;      // true when node is enqueued
843          int lastUnarrived = -1;      // to increase spins upon change
844 +        int spins = SPINS_PER_ARRIVAL;
845          long s;
846          int p;
847          while ((p = (int)((s = current.state) >>> PHASE_SHIFT)) == phase) {
848 <            int unarrived = (int)(s & UNARRIVED_MASK);
848 >            Phaser par;
849 >            int unarrived = (int)s & UNARRIVED_MASK;
850              if (unarrived != lastUnarrived) {
851 +                if (lastUnarrived == -1) // ensure old queue clean
852 +                    releaseWaiters(phase-1);
853                  if ((lastUnarrived = unarrived) < NCPU)
854                      spins += SPINS_PER_ARRIVAL;
855              }
856 <            else if (unarrived == 0 && par != null) {
856 >            else if (unarrived == 0 && (par = current.parent) != null) {
857                  current = par;       // if all arrived, use parent
858                  par = par.parent;
859 +                lastUnarrived = -1;
860              }
861 <            else if (spins > 0)
862 <                --spins;
863 <            else if (node == null)
861 >            else if (spins > 0) {
862 >                if (--spins == (SPINS_PER_ARRIVAL >>> 1))
863 >                    Thread.yield();  // yield midway through spin
864 >            }
865 >            else if (node == null)   // must be noninterruptible
866                  node = new QNode(this, phase, false, false, 0L);
867 <            else if (node.isReleasable())
868 <                break;
869 <            else if (!queued)
870 <                queued = tryEnqueue(phase, node);
867 >            else if (node.isReleasable()) {
868 >                if ((p = (int)(root.state >>> PHASE_SHIFT)) != phase)
869 >                    break;
870 >                else
871 >                    return phase;    // aborted
872 >            }
873 >            else if (!queued) {      // push onto queue
874 >                AtomicReference<QNode> head = queueFor(phase);
875 >                QNode q = head.get();
876 >                if (q == null || q.phase == phase) {
877 >                    node.next = q;
878 >                    if ((p = (int)(root.state >>> PHASE_SHIFT)) != phase)
879 >                        break;       // recheck to avoid stale enqueue
880 >                    else
881 >                        queued = head.compareAndSet(q, node);
882 >                }
883 >            }
884              else {
885                  try {
886                      ForkJoinPool.managedBlock(node);
# Line 874 | Line 889 | public class Phaser {
889                  }
890              }
891          }
892 <        if (node != null) {
893 <            if (node.thread != null)
894 <                node.thread = null;
880 <            if (!node.interruptible && node.wasInterrupted)
881 <                Thread.currentThread().interrupt();
882 <        }
883 <        if (p == phase)
884 <            p = (int)(reconcileState() >>> PHASE_SHIFT);
885 <        if (p != phase)
886 <            releaseWaiters(phase);
892 >        releaseWaiters(phase);
893 >        if (node != null)
894 >            node.onRelease();
895          return p;
896      }
897  
# Line 956 | Line 964 | public class Phaser {
964                  LockSupport.unpark(t);
965              }
966          }
967 +
968 +        void onRelease() { // actions upon return from internalAwaitAdvance
969 +            if (!interruptible && wasInterrupted)
970 +                Thread.currentThread().interrupt();
971 +            if (thread != null)
972 +                thread = null;
973 +        }
974 +
975      }
976  
977      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines