ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.58 by jsr166, Wed Oct 28 10:23:38 2009 UTC vs.
Revision 1.66 by jsr166, Mon Nov 2 18:38:37 2009 UTC

# Line 361 | Line 361 | public class LinkedTransferQueue<E> exte
361       * precede or follow CASes use simple relaxed forms.  Other
362       * cleanups use releasing/lazy writes.
363       */
364 <    static final class Node<E> {
364 >    static final class Node {
365          final boolean isData;   // false if this is a request node
366          volatile Object item;   // initially non-null if isData; CASed to match
367 <        volatile Node<E> next;
367 >        volatile Node next;
368          volatile Thread waiter; // null until waiting
369  
370          // CAS methods for fields
371 <        final boolean casNext(Node<E> cmp, Node<E> val) {
371 >        final boolean casNext(Node cmp, Node val) {
372              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
373          }
374  
# Line 381 | Line 381 | public class LinkedTransferQueue<E> exte
381           * Creates a new node. Uses relaxed write because item can only
382           * be seen if followed by CAS.
383           */
384 <        Node(E item, boolean isData) {
384 >        Node(Object item, boolean isData) {
385              UNSAFE.putObject(this, itemOffset, item); // relaxed write
386              this.isData = isData;
387          }
# Line 457 | Line 457 | public class LinkedTransferQueue<E> exte
457      }
458  
459      /** head of the queue; null until first enqueue */
460 <    transient volatile Node<E> head;
460 >    transient volatile Node head;
461  
462      /** predecessor of dangling unspliceable node */
463 <    private transient volatile Node<E> cleanMe; // decl here reduces contention
463 >    private transient volatile Node cleanMe; // decl here reduces contention
464  
465      /** tail of the queue; null until first append */
466 <    private transient volatile Node<E> tail;
466 >    private transient volatile Node tail;
467  
468      // CAS methods for fields
469 <    private boolean casTail(Node<E> cmp, Node<E> val) {
469 >    private boolean casTail(Node cmp, Node val) {
470          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
471      }
472  
473 <    private boolean casHead(Node<E> cmp, Node<E> val) {
473 >    private boolean casHead(Node cmp, Node val) {
474          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
475      }
476  
477 <    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
477 >    private boolean casCleanMe(Node cmp, Node val) {
478          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
479      }
480  
481      /*
482 <     * Possible values for "how" argument in xfer method. Beware that
483 <     * the order of assigned numerical values matters.
482 >     * Possible values for "how" argument in xfer method.
483       */
484 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
485 <    private static final int ASYNC   = 1; // for offer, put, add
486 <    private static final int SYNC    = 2; // for transfer, take
487 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
484 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
485 >    private static final int ASYNC = 1; // for offer, put, add
486 >    private static final int SYNC  = 2; // for transfer, take
487 >    private static final int TIMED = 3; // for timed poll, tryTransfer
488  
489      @SuppressWarnings("unchecked")
490      static <E> E cast(Object item) {
# Line 498 | Line 497 | public class LinkedTransferQueue<E> exte
497       *
498       * @param e the item or null for take
499       * @param haveData true if this is a put, else a take
500 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
501 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
500 >     * @param how NOW, ASYNC, SYNC, or TIMED
501 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
502       * @return an item if matched, else e
503       * @throws NullPointerException if haveData mode but e is null
504       */
505      private E xfer(E e, boolean haveData, int how, long nanos) {
506          if (haveData && (e == null))
507              throw new NullPointerException();
508 <        Node<E> s = null;                     // the node to append, if needed
508 >        Node s = null;                        // the node to append, if needed
509  
510          retry: for (;;) {                     // restart on append race
511  
512 <            for (Node<E> h = head, p = h; p != null;) {
514 <                // find & match first node
512 >            for (Node h = head, p = h; p != null;) { // find & match first node
513                  boolean isData = p.isData;
514                  Object item = p.item;
515                  if (item != p && (item != null) == isData) { // unmatched
516                      if (isData == haveData)   // can't match
517                          break;
518                      if (p.casItem(item, e)) { // match
519 <                        for (Node<E> q = p; q != h;) {
520 <                            Node<E> n = q.next; // update head by 2
519 >                        for (Node q = p; q != h;) {
520 >                            Node n = q.next;  // update head by 2
521                              if (n != null)    // unless singleton
522                                  q = n;
523                              if (head == h && casHead(h, q)) {
# Line 534 | Line 532 | public class LinkedTransferQueue<E> exte
532                          return this.<E>cast(item);
533                      }
534                  }
535 <                Node<E> n = p.next;
535 >                Node n = p.next;
536                  p = (p != n) ? n : (h = head); // Use head if p offlist
537              }
538  
539 <            if (how >= ASYNC) {               // No matches available
539 >            if (how != NOW) {                 // No matches available
540                  if (s == null)
541 <                    s = new Node<E>(e, haveData);
542 <                Node<E> pred = tryAppend(s, haveData);
541 >                    s = new Node(e, haveData);
542 >                Node pred = tryAppend(s, haveData);
543                  if (pred == null)
544                      continue retry;           // lost race vs opposite mode
545 <                if (how >= SYNC)
546 <                    return awaitMatch(s, pred, e, how, nanos);
545 >                if (how != ASYNC)
546 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
547              }
548              return e; // not waiting
549          }
# Line 560 | Line 558 | public class LinkedTransferQueue<E> exte
558       * different mode, else s's predecessor, or s itself if no
559       * predecessor
560       */
561 <    private Node<E> tryAppend(Node<E> s, boolean haveData) {
562 <        for (Node<E> t = tail, p = t;;) { // move p to last node and append
563 <            Node<E> n, u;                     // temps for reads of next & tail
561 >    private Node tryAppend(Node s, boolean haveData) {
562 >        for (Node t = tail, p = t;;) {        // move p to last node and append
563 >            Node n, u;                        // temps for reads of next & tail
564              if (p == null && (p = head) == null) {
565                  if (casHead(null, s))
566                      return s;                 // initialize
# Line 594 | Line 592 | public class LinkedTransferQueue<E> exte
592       * predecessor, or null if unknown (the null case does not occur
593       * in any current calls but may in possible future extensions)
594       * @param e the comparison value for checking match
595 <     * @param how either SYNC or TIMEOUT
596 <     * @param nanos timeout value
595 >     * @param timed if true, wait only until timeout elapses
596 >     * @param nanos timeout in nanosecs, used only if timed is true
597       * @return matched item, or e if unmatched on interrupt or timeout
598       */
599 <    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
600 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
599 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
600 >        long lastTime = timed ? System.nanoTime() : 0L;
601          Thread w = Thread.currentThread();
602          int spins = -1; // initialized after first item and cancel checks
603          ThreadLocalRandom randomYields = null; // bound if needed
# Line 611 | Line 609 | public class LinkedTransferQueue<E> exte
609                  s.forgetContents();           // avoid garbage
610                  return this.<E>cast(item);
611              }
612 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
612 >            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
613                      s.casItem(e, s)) {       // cancel
614                  unsplice(pred, s);
615                  return e;
# Line 630 | Line 628 | public class LinkedTransferQueue<E> exte
628              else if (s.waiter == null) {
629                  s.waiter = w;                 // request unpark then recheck
630              }
631 <            else if (how == TIMEOUT) {
631 >            else if (timed) {
632                  long now = System.nanoTime();
633                  if ((nanos -= now - lastTime) > 0)
634                      LockSupport.parkNanos(this, nanos);
# Line 648 | Line 646 | public class LinkedTransferQueue<E> exte
646       * Returns spin/yield value for a node with given predecessor and
647       * data mode. See above for explanation.
648       */
649 <    private static int spinsFor(Node<?> pred, boolean haveData) {
649 >    private static int spinsFor(Node pred, boolean haveData) {
650          if (MP && pred != null) {
651              if (pred.isData != haveData)      // phase change
652                  return FRONT_SPINS + CHAINED_SPINS;
# Line 665 | Line 663 | public class LinkedTransferQueue<E> exte
663       * or trailing node; failing on contention.
664       */
665      private void shortenHeadPath() {
666 <        Node<E> h, hn, p, q;
666 >        Node h, hn, p, q;
667          if ((p = h = head) != null && h.isMatched() &&
668              (q = hn = h.next) != null) {
669 <            Node<E> n;
669 >            Node n;
670              while ((n = q.next) != q) {
671                  if (n == null || !q.isMatched()) {
672                      if (hn != q && h.next == hn)
# Line 684 | Line 682 | public class LinkedTransferQueue<E> exte
682      /* -------------- Traversal methods -------------- */
683  
684      /**
685 +     * Returns the successor of p, or the head node if p.next has been
686 +     * linked to self, which will only be true if traversing with a
687 +     * stale pointer that is now off the list.
688 +     */
689 +    final Node succ(Node p) {
690 +        Node next = p.next;
691 +        return (p == next) ? head : next;
692 +    }
693 +
694 +    /**
695       * Returns the first unmatched node of the given mode, or null if
696       * none.  Used by methods isEmpty, hasWaitingConsumer.
697       */
698 <    private Node<E> firstOfMode(boolean data) {
699 <        for (Node<E> p = head; p != null; ) {
698 >    private Node firstOfMode(boolean isData) {
699 >        for (Node p = head; p != null; p = succ(p)) {
700              if (!p.isMatched())
701 <                return (p.isData == data) ? p : null;
694 <            Node<E> n = p.next;
695 <            p = (n != p) ? n : head;
701 >                return (p.isData == isData) ? p : null;
702          }
703          return null;
704      }
# Line 702 | Line 708 | public class LinkedTransferQueue<E> exte
708       * null if none.  Used by peek.
709       */
710      private E firstDataItem() {
711 <        for (Node<E> p = head; p != null; ) {
706 <            boolean isData = p.isData;
711 >        for (Node p = head; p != null; p = succ(p)) {
712              Object item = p.item;
713 <            if (item != p && (item != null) == isData)
714 <                return isData ? this.<E>cast(item) : null;
715 <            Node<E> n = p.next;
716 <            p = (n != p) ? n : head;
713 >            if (p.isData) {
714 >                if (item != null && item != p)
715 >                    return this.<E>cast(item);
716 >            }
717 >            else if (item == null)
718 >                return null;
719          }
720          return null;
721      }
# Line 719 | Line 726 | public class LinkedTransferQueue<E> exte
726       */
727      private int countOfMode(boolean data) {
728          int count = 0;
729 <        for (Node<E> p = head; p != null; ) {
729 >        for (Node p = head; p != null; ) {
730              if (!p.isMatched()) {
731                  if (p.isData != data)
732                      return 0;
733                  if (++count == Integer.MAX_VALUE) // saturated
734                      break;
735              }
736 <            Node<E> n = p.next;
736 >            Node n = p.next;
737              if (n != p)
738                  p = n;
739              else {
# Line 738 | Line 745 | public class LinkedTransferQueue<E> exte
745      }
746  
747      final class Itr implements Iterator<E> {
748 <        private Node<E> nextNode;   // next node to return item for
749 <        private E nextItem;         // the corresponding item
750 <        private Node<E> lastRet;    // last returned node, to support remove
748 >        private Node nextNode;   // next node to return item for
749 >        private E nextItem;      // the corresponding item
750 >        private Node lastRet;    // last returned node, to support remove
751 >        private Node lastPred;   // predecessor to unlink lastRet
752  
753          /**
754           * Moves to next node after prev, or first node if prev null.
755           */
756 <        private void advance(Node<E> prev) {
756 >        private void advance(Node prev) {
757 >            lastPred = lastRet;
758              lastRet = prev;
759 <            Node<E> p;
760 <            if (prev == null || (p = prev.next) == prev)
752 <                p = head;
753 <            while (p != null) {
759 >            for (Node p = (prev == null) ? head : succ(prev);
760 >                 p != null; p = succ(p)) {
761                  Object item = p.item;
762                  if (p.isData) {
763                      if (item != null && item != p) {
# Line 761 | Line 768 | public class LinkedTransferQueue<E> exte
768                  }
769                  else if (item == null)
770                      break;
764                Node<E> n = p.next;
765                p = (n != p) ? n : head;
771              }
772              nextNode = null;
773          }
# Line 776 | Line 781 | public class LinkedTransferQueue<E> exte
781          }
782  
783          public final E next() {
784 <            Node<E> p = nextNode;
784 >            Node p = nextNode;
785              if (p == null) throw new NoSuchElementException();
786              E e = nextItem;
787              advance(p);
# Line 784 | Line 789 | public class LinkedTransferQueue<E> exte
789          }
790  
791          public final void remove() {
792 <            Node<E> p = lastRet;
792 >            Node p = lastRet;
793              if (p == null) throw new IllegalStateException();
794 <            lastRet = null;
790 <            findAndRemoveDataNode(p);
794 >            findAndRemoveDataNode(lastPred, p);
795          }
796      }
797  
# Line 800 | Line 804 | public class LinkedTransferQueue<E> exte
804       * @param pred predecessor of node to be unspliced
805       * @param s the node to be unspliced
806       */
807 <    private void unsplice(Node<E> pred, Node<E> s) {
807 >    private void unsplice(Node pred, Node s) {
808          s.forgetContents(); // clear unneeded fields
809          /*
810           * At any given time, exactly one node on list cannot be
# Line 813 | Line 817 | public class LinkedTransferQueue<E> exte
817           */
818          if (pred != null && pred != s) {
819              while (pred.next == s) {
820 <                Node<E> oldpred = (cleanMe == null) ? null : reclean();
821 <                Node<E> n = s.next;
820 >                Node oldpred = (cleanMe == null) ? null : reclean();
821 >                Node n = s.next;
822                  if (n != null) {
823                      if (n != s)
824                          pred.casNext(s, n);
825                      break;
826                  }
827                  if (oldpred == pred ||      // Already saved
828 <                    (oldpred == null && casCleanMe(null, pred)))
829 <                    break;                  // Postpone cleaning
828 >                    ((oldpred == null || oldpred.next == s) &&
829 >                     casCleanMe(oldpred, pred))) {
830 >                    break;
831 >                }
832              }
833          }
834      }
# Line 833 | Line 839 | public class LinkedTransferQueue<E> exte
839       *
840       * @return current cleanMe node (or null)
841       */
842 <    private Node<E> reclean() {
842 >    private Node reclean() {
843          /*
844           * cleanMe is, or at one time was, predecessor of a cancelled
845           * node s that was the tail so could not be unspliced.  If it
# Line 844 | Line 850 | public class LinkedTransferQueue<E> exte
850           * we can (must) clear cleanMe without unsplicing.  This can
851           * loop only due to contention.
852           */
853 <        Node<E> pred;
853 >        Node pred;
854          while ((pred = cleanMe) != null) {
855 <            Node<E> s = pred.next;
856 <            Node<E> n;
855 >            Node s = pred.next;
856 >            Node n;
857              if (s == null || s == pred || !s.isMatched())
858                  casCleanMe(pred, null); // already gone
859              else if ((n = s.next) != null) {
# Line 862 | Line 868 | public class LinkedTransferQueue<E> exte
868      }
869  
870      /**
871 <     * Main implementation of Iterator.remove(). Find
872 <     * and unsplice the given data node.
871 >     * Main implementation of Iterator.remove(). Finds
872 >     * and unsplices the given data node.
873 >     *
874 >     * @param possiblePred possible predecessor of s
875 >     * @param s the node to remove
876       */
877 <    final void findAndRemoveDataNode(Node<E> s) {
877 >    final void findAndRemoveDataNode(Node possiblePred, Node s) {
878          assert s.isData;
879          if (s.tryMatchData()) {
880 <            for (Node<E> pred = null, p = head; p != null; ) {
881 <                if (p == s) {
882 <                    unsplice(pred, p);
883 <                    break;
884 <                }
885 <                if (p.isUnmatchedRequest())
886 <                    break;
887 <                pred = p;
888 <                if ((p = p.next) == pred) { // stale
889 <                    pred = null;
890 <                    p = head;
880 >            if (possiblePred != null && possiblePred.next == s)
881 >                unsplice(possiblePred, s); // was actual predecessor
882 >            else {
883 >                for (Node pred = null, p = head; p != null; ) {
884 >                    if (p == s) {
885 >                        unsplice(pred, p);
886 >                        break;
887 >                    }
888 >                    if (p.isUnmatchedRequest())
889 >                        break;
890 >                    pred = p;
891 >                    if ((p = p.next) == pred) { // stale
892 >                        pred = null;
893 >                        p = head;
894 >                    }
895                  }
896              }
897          }
# Line 889 | Line 902 | public class LinkedTransferQueue<E> exte
902       */
903      private boolean findAndRemove(Object e) {
904          if (e != null) {
905 <            for (Node<E> pred = null, p = head; p != null; ) {
905 >            for (Node pred = null, p = head; p != null; ) {
906                  Object item = p.item;
907                  if (p.isData) {
908                      if (item != null && item != p && e.equals(item) &&
# Line 1029 | Line 1042 | public class LinkedTransferQueue<E> exte
1042       */
1043      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1044          throws InterruptedException {
1045 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1045 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1046              return true;
1047          if (!Thread.interrupted())
1048              return false;
# Line 1045 | Line 1058 | public class LinkedTransferQueue<E> exte
1058      }
1059  
1060      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1061 <        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1061 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1062          if (e != null || !Thread.interrupted())
1063              return e;
1064          throw new InterruptedException();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines