ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.53 by jsr166, Tue Oct 27 19:59:43 2009 UTC vs.
Revision 1.54 by jsr166, Tue Oct 27 23:14:08 2009 UTC

# Line 361 | Line 361 | public class LinkedTransferQueue<E> exte
361       * precede or follow CASes use simple relaxed forms.  Other
362       * cleanups use releasing/lazy writes.
363       */
364 <    static final class Node {
364 >    static final class Node<E> {
365          final boolean isData;   // false if this is a request node
366          volatile Object item;   // initially non-null if isData; CASed to match
367 <        volatile Node next;
367 >        volatile Node<E> next;
368          volatile Thread waiter; // null until waiting
369  
370          // CAS methods for fields
371 <        final boolean casNext(Node cmp, Node val) {
371 >        final boolean casNext(Node<E> cmp, Node<E> val) {
372              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
373          }
374  
375          final boolean casItem(Object cmp, Object val) {
376 +            assert cmp.getClass() != Node.class;
377              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
378          }
379  
# Line 380 | Line 381 | public class LinkedTransferQueue<E> exte
381           * Creates a new node. Uses relaxed write because item can only
382           * be seen if followed by CAS.
383           */
384 <        Node(Object item, boolean isData) {
384 >        Node(E item, boolean isData) {
385              UNSAFE.putObject(this, itemOffset, item); // relaxed write
386              this.isData = isData;
387          }
# Line 448 | Line 449 | public class LinkedTransferQueue<E> exte
449      }
450  
451      /** head of the queue; null until first enqueue */
452 <    private transient volatile Node head;
452 >    transient volatile Node<E> head;
453  
454      /** predecessor of dangling unspliceable node */
455 <    private transient volatile Node cleanMe; // decl here to reduce contention
455 >    private transient volatile Node<E> cleanMe; // decl here reduces contention
456  
457      /** tail of the queue; null until first append */
458 <    private transient volatile Node tail;
458 >    private transient volatile Node<E> tail;
459  
460      // CAS methods for fields
461 <    private boolean casTail(Node cmp, Node val) {
461 >    private boolean casTail(Node<E> cmp, Node<E> val) {
462          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
463      }
464  
465 <    private boolean casHead(Node cmp, Node val) {
465 >    private boolean casHead(Node<E> cmp, Node<E> val) {
466          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
467      }
468  
469 <    private boolean casCleanMe(Node cmp, Node val) {
469 >    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
470          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
471      }
472  
# Line 488 | Line 489 | public class LinkedTransferQueue<E> exte
489       * @return an item if matched, else e
490       * @throws NullPointerException if haveData mode but e is null
491       */
492 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
492 >    private E xfer(E e, boolean haveData, int how, long nanos) {
493          if (haveData && (e == null))
494              throw new NullPointerException();
495 <        Node s = null;                        // the node to append, if needed
495 >        Node<E> s = null;                     // the node to append, if needed
496  
497          retry: for (;;) {                     // restart on append race
498  
499 <            for (Node h = head, p = h; p != null;) { // find & match first node
499 >            for (Node<E> h = head, p = h; p != null;) {
500 >                // find & match first node
501                  boolean isData = p.isData;
502                  Object item = p.item;
503                  if (item != p && (item != null) == isData) { // unmatched
504                      if (isData == haveData)   // can't match
505                          break;
506                      if (p.casItem(item, e)) { // match
507 <                        for (Node q = p; q != h;) {
508 <                            Node n = q.next;  // update head by 2
507 >                        for (Node<E> q = p; q != h;) {
508 >                            Node<E> n = q.next; // update head by 2
509                              if (n != null)    // unless singleton
510                                  q = n;
511                              if (head == h && casHead(h, q)) {
# Line 515 | Line 517 | public class LinkedTransferQueue<E> exte
517                                  break;        // unless slack < 2
518                          }
519                          LockSupport.unpark(p.waiter);
520 <                        return item;
520 >                        return this.<E>cast(item);
521                      }
522                  }
523 <                Node n = p.next;
523 >                Node<E> n = p.next;
524                  p = (p != n) ? n : (h = head); // Use head if p offlist
525              }
526  
527              if (how >= ASYNC) {               // No matches available
528                  if (s == null)
529 <                    s = new Node(e, haveData);
530 <                Node pred = tryAppend(s, haveData);
529 >                    s = new Node<E>(e, haveData);
530 >                Node<E> pred = tryAppend(s, haveData);
531                  if (pred == null)
532                      continue retry;           // lost race vs opposite mode
533                  if (how >= SYNC)
# Line 544 | Line 546 | public class LinkedTransferQueue<E> exte
546       * different mode, else s's predecessor, or s itself if no
547       * predecessor
548       */
549 <    private Node tryAppend(Node s, boolean haveData) {
550 <        for (Node t = tail, p = t;;) { // move p to last node and append
551 <            Node n, u;                        // temps for reads of next & tail
549 >    private Node<E> tryAppend(Node<E> s, boolean haveData) {
550 >        for (Node<E> t = tail, p = t;;) { // move p to last node and append
551 >            Node<E> n, u;                     // temps for reads of next & tail
552              if (p == null && (p = head) == null) {
553                  if (casHead(null, s))
554                      return s;                 // initialize
# Line 582 | Line 584 | public class LinkedTransferQueue<E> exte
584       * @param nanos timeout value
585       * @return matched item, or e if unmatched on interrupt or timeout
586       */
587 <    private Object awaitMatch(Node s, Node pred, Object e,
586 <                              int how, long nanos) {
587 >    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
588          long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
589          Thread w = Thread.currentThread();
590          int spins = -1; // initialized after first item and cancel checks
# Line 592 | Line 593 | public class LinkedTransferQueue<E> exte
593          for (;;) {
594              Object item = s.item;
595              if (item != e) {                  // matched
596 +                assert item != s;
597                  s.forgetContents();           // avoid garbage
598 <                return item;
598 >                return this.<E>cast(item);
599              }
600              if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
601 <                     s.casItem(e, s)) {       // cancel
601 >                    s.casItem(e, s)) {       // cancel
602                  unsplice(pred, s);
603                  return e;
604              }
# Line 632 | Line 634 | public class LinkedTransferQueue<E> exte
634       * Returns spin/yield value for a node with given predecessor and
635       * data mode. See above for explanation.
636       */
637 <    private static int spinsFor(Node pred, boolean haveData) {
637 >    private static int spinsFor(Node<?> pred, boolean haveData) {
638          if (MP && pred != null) {
639              if (pred.isData != haveData)      // phase change
640                  return FRONT_SPINS + CHAINED_SPINS;
# Line 649 | Line 651 | public class LinkedTransferQueue<E> exte
651       * or trailing node; failing on contention.
652       */
653      private void shortenHeadPath() {
654 <        Node h, hn, p, q;
654 >        Node<E> h, hn, p, q;
655          if ((p = h = head) != null && h.isMatched() &&
656              (q = hn = h.next) != null) {
657 <            Node n;
657 >            Node<E> n;
658              while ((n = q.next) != q) {
659                  if (n == null || !q.isMatched()) {
660                      if (hn != q && h.next == hn)
# Line 671 | Line 673 | public class LinkedTransferQueue<E> exte
673       * Returns the first unmatched node of the given mode, or null if
674       * none.  Used by methods isEmpty, hasWaitingConsumer.
675       */
676 <    private Node firstOfMode(boolean data) {
677 <        for (Node p = head; p != null; ) {
676 >    private Node<E> firstOfMode(boolean data) {
677 >        for (Node<E> p = head; p != null; ) {
678              if (!p.isMatched())
679                  return (p.isData == data) ? p : null;
680 <            Node n = p.next;
680 >            Node<E> n = p.next;
681              p = (n != p) ? n : head;
682          }
683          return null;
684      }
685  
686 +    @SuppressWarnings("unchecked")
687 +    static <E> E cast(Object item) {
688 +        assert item.getClass() != Node.class;
689 +        return (E) item;
690 +    }
691 +
692      /**
693       * Returns the item in the first unmatched node with isData; or
694 <     * null if none. Used by peek.
694 >     * null if none.  Used by peek.
695       */
696 <    private Object firstDataItem() {
697 <        for (Node p = head; p != null; ) {
696 >    private E firstDataItem() {
697 >        for (Node<E> p = head; p != null; ) {
698              boolean isData = p.isData;
699              Object item = p.item;
700              if (item != p && (item != null) == isData)
701 <                return isData ? item : null;
702 <            Node n = p.next;
701 >                return isData ? this.<E>cast(item) : null;
702 >            Node<E> n = p.next;
703              p = (n != p) ? n : head;
704          }
705          return null;
# Line 703 | Line 711 | public class LinkedTransferQueue<E> exte
711       */
712      private int countOfMode(boolean data) {
713          int count = 0;
714 <        for (Node p = head; p != null; ) {
714 >        for (Node<E> p = head; p != null; ) {
715              if (!p.isMatched()) {
716                  if (p.isData != data)
717                      return 0;
718                  if (++count == Integer.MAX_VALUE) // saturated
719                      break;
720              }
721 <            Node n = p.next;
721 >            Node<E> n = p.next;
722              if (n != p)
723                  p = n;
724              else {
# Line 722 | Line 730 | public class LinkedTransferQueue<E> exte
730      }
731  
732      final class Itr implements Iterator<E> {
733 <        private Node nextNode;   // next node to return item for
734 <        private Object nextItem; // the corresponding item
735 <        private Node lastRet;    // last returned node, to support remove
733 >        private Node<E> nextNode;   // next node to return item for
734 >        private E nextItem;         // the corresponding item
735 >        private Node<E> lastRet;    // last returned node, to support remove
736  
737          /**
738           * Moves to next node after prev, or first node if prev null.
739           */
740 <        private void advance(Node prev) {
740 >        private void advance(Node<E> prev) {
741              lastRet = prev;
742 <            Node p;
742 >            Node<E> p;
743              if (prev == null || (p = prev.next) == prev)
744                  p = head;
745              while (p != null) {
746                  Object item = p.item;
747                  if (p.isData) {
748                      if (item != null && item != p) {
749 <                        nextItem = item;
749 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
750                          nextNode = p;
751                          return;
752                      }
753                  }
754                  else if (item == null)
755                      break;
756 <                Node n = p.next;
756 >                Node<E> n = p.next;
757                  p = (n != p) ? n : head;
758              }
759              nextNode = null;
# Line 760 | Line 768 | public class LinkedTransferQueue<E> exte
768          }
769  
770          public final E next() {
771 <            Node p = nextNode;
771 >            Node<E> p = nextNode;
772              if (p == null) throw new NoSuchElementException();
773 <            Object e = nextItem;
773 >            E e = nextItem;
774              advance(p);
775 <            return (E) e;
775 >            return e;
776          }
777  
778          public final void remove() {
779 <            Node p = lastRet;
779 >            Node<E> p = lastRet;
780              if (p == null) throw new IllegalStateException();
781              lastRet = null;
782              findAndRemoveNode(p);
# Line 784 | Line 792 | public class LinkedTransferQueue<E> exte
792       * @param pred predecessor of node to be unspliced
793       * @param s the node to be unspliced
794       */
795 <    private void unsplice(Node pred, Node s) {
795 >    private void unsplice(Node<E> pred, Node<E> s) {
796          s.forgetContents(); // clear unneeded fields
797          /*
798           * At any given time, exactly one node on list cannot be
# Line 797 | Line 805 | public class LinkedTransferQueue<E> exte
805           */
806          if (pred != null && pred != s) {
807              while (pred.next == s) {
808 <                Node oldpred = (cleanMe == null) ? null : reclean();
809 <                Node n = s.next;
808 >                Node<E> oldpred = (cleanMe == null) ? null : reclean();
809 >                Node<E> n = s.next;
810                  if (n != null) {
811                      if (n != s)
812                          pred.casNext(s, n);
# Line 817 | Line 825 | public class LinkedTransferQueue<E> exte
825       *
826       * @return current cleanMe node (or null)
827       */
828 <    private Node reclean() {
828 >    private Node<E> reclean() {
829          /*
830           * cleanMe is, or at one time was, predecessor of a cancelled
831           * node s that was the tail so could not be unspliced.  If it
# Line 828 | Line 836 | public class LinkedTransferQueue<E> exte
836           * we can (must) clear cleanMe without unsplicing.  This can
837           * loop only due to contention.
838           */
839 <        Node pred;
839 >        Node<E> pred;
840          while ((pred = cleanMe) != null) {
841 <            Node s = pred.next;
842 <            Node n;
841 >            Node<E> s = pred.next;
842 >            Node<E> n;
843              if (s == null || s == pred || !s.isMatched())
844                  casCleanMe(pred, null); // already gone
845              else if ((n = s.next) != null) {
# Line 849 | Line 857 | public class LinkedTransferQueue<E> exte
857       * Main implementation of Iterator.remove(). Find
858       * and unsplice the given node.
859       */
860 <    final void findAndRemoveNode(Node s) {
860 >    final void findAndRemoveNode(Node<E> s) {
861          if (s.tryMatchData()) {
862 <            Node pred = null;
863 <            Node p = head;
862 >            Node<E> pred = null;
863 >            Node<E> p = head;
864              while (p != null) {
865                  if (p == s) {
866                      unsplice(pred, p);
# Line 874 | Line 882 | public class LinkedTransferQueue<E> exte
882       */
883      private boolean findAndRemove(Object e) {
884          if (e != null) {
885 <            Node pred = null;
886 <            Node p = head;
885 >            Node<E> pred = null;
886 >            Node<E> p = head;
887              while (p != null) {
888                  Object item = p.item;
889                  if (p.isData) {
# Line 1024 | Line 1032 | public class LinkedTransferQueue<E> exte
1032      }
1033  
1034      public E take() throws InterruptedException {
1035 <        Object e = xfer(null, false, SYNC, 0);
1035 >        E e = xfer(null, false, SYNC, 0);
1036          if (e != null)
1037 <            return (E)e;
1037 >            return e;
1038          Thread.interrupted();
1039          throw new InterruptedException();
1040      }
1041  
1042      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1043 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1043 >        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1044          if (e != null || !Thread.interrupted())
1045 <            return (E)e;
1045 >            return e;
1046          throw new InterruptedException();
1047      }
1048  
1049      public E poll() {
1050 <        return (E)xfer(null, false, NOW, 0);
1050 >        return xfer(null, false, NOW, 0);
1051      }
1052  
1053      /**
# Line 1096 | Line 1104 | public class LinkedTransferQueue<E> exte
1104      }
1105  
1106      public E peek() {
1107 <        return (E) firstDataItem();
1107 >        return firstDataItem();
1108      }
1109  
1110      /**
# Line 1221 | Line 1229 | public class LinkedTransferQueue<E> exte
1229       *
1230       * @return a sun.misc.Unsafe
1231       */
1232 <    private static sun.misc.Unsafe getUnsafe() {
1232 >    static sun.misc.Unsafe getUnsafe() {
1233          try {
1234              return sun.misc.Unsafe.getUnsafe();
1235          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines