ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.53 by jsr166, Tue Oct 27 19:59:43 2009 UTC vs.
Revision 1.60 by dl, Fri Oct 30 12:06:31 2009 UTC

# Line 361 | Line 361 | public class LinkedTransferQueue<E> exte
361       * precede or follow CASes use simple relaxed forms.  Other
362       * cleanups use releasing/lazy writes.
363       */
364 <    static final class Node {
364 >    static final class Node<E> {
365          final boolean isData;   // false if this is a request node
366          volatile Object item;   // initially non-null if isData; CASed to match
367 <        volatile Node next;
367 >        volatile Node<E> next;
368          volatile Thread waiter; // null until waiting
369  
370          // CAS methods for fields
371 <        final boolean casNext(Node cmp, Node val) {
371 >        final boolean casNext(Node<E> cmp, Node<E> val) {
372              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
373          }
374  
375          final boolean casItem(Object cmp, Object val) {
376 +            assert cmp == null || cmp.getClass() != Node.class;
377              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
378          }
379  
# Line 380 | Line 381 | public class LinkedTransferQueue<E> exte
381           * Creates a new node. Uses relaxed write because item can only
382           * be seen if followed by CAS.
383           */
384 <        Node(Object item, boolean isData) {
384 >        Node(E item, boolean isData) {
385              UNSAFE.putObject(this, itemOffset, item); // relaxed write
386              this.isData = isData;
387          }
# Line 409 | Line 410 | public class LinkedTransferQueue<E> exte
410           */
411          final boolean isMatched() {
412              Object x = item;
413 <            return x == this || (x != null) != isData;
413 >            return (x == this) || ((x == null) == isData);
414 >        }
415 >
416 >        /**
417 >         * Returns true if this is an unmatched request node.
418 >         */
419 >        final boolean isUnmatchedRequest() {
420 >            return !isData && item == null;
421          }
422  
423          /**
# Line 427 | Line 435 | public class LinkedTransferQueue<E> exte
435           * Tries to artificially match a data node -- used by remove.
436           */
437          final boolean tryMatchData() {
438 +            assert isData;
439              Object x = item;
440              if (x != null && x != this && casItem(x, null)) {
441                  LockSupport.unpark(waiter);
# Line 448 | Line 457 | public class LinkedTransferQueue<E> exte
457      }
458  
459      /** head of the queue; null until first enqueue */
460 <    private transient volatile Node head;
460 >    transient volatile Node<E> head;
461  
462      /** predecessor of dangling unspliceable node */
463 <    private transient volatile Node cleanMe; // decl here to reduce contention
463 >    private transient volatile Node<E> cleanMe; // decl here reduces contention
464  
465      /** tail of the queue; null until first append */
466 <    private transient volatile Node tail;
466 >    private transient volatile Node<E> tail;
467  
468      // CAS methods for fields
469 <    private boolean casTail(Node cmp, Node val) {
469 >    private boolean casTail(Node<E> cmp, Node<E> val) {
470          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
471      }
472  
473 <    private boolean casHead(Node cmp, Node val) {
473 >    private boolean casHead(Node<E> cmp, Node<E> val) {
474          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
475      }
476  
477 <    private boolean casCleanMe(Node cmp, Node val) {
477 >    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
478          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
479      }
480  
# Line 478 | Line 487 | public class LinkedTransferQueue<E> exte
487      private static final int SYNC    = 2; // for transfer, take
488      private static final int TIMEOUT = 3; // for timed poll, tryTransfer
489  
490 +    @SuppressWarnings("unchecked")
491 +    static <E> E cast(Object item) {
492 +        assert item == null || item.getClass() != Node.class;
493 +        return (E) item;
494 +    }
495 +
496      /**
497       * Implements all queuing methods. See above for explanation.
498       *
# Line 488 | Line 503 | public class LinkedTransferQueue<E> exte
503       * @return an item if matched, else e
504       * @throws NullPointerException if haveData mode but e is null
505       */
506 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
506 >    private E xfer(E e, boolean haveData, int how, long nanos) {
507          if (haveData && (e == null))
508              throw new NullPointerException();
509 <        Node s = null;                        // the node to append, if needed
509 >        Node<E> s = null;                     // the node to append, if needed
510  
511          retry: for (;;) {                     // restart on append race
512  
513 <            for (Node h = head, p = h; p != null;) { // find & match first node
513 >            for (Node<E> h = head, p = h; p != null;) {
514 >                // find & match first node
515                  boolean isData = p.isData;
516                  Object item = p.item;
517                  if (item != p && (item != null) == isData) { // unmatched
518                      if (isData == haveData)   // can't match
519                          break;
520                      if (p.casItem(item, e)) { // match
521 <                        for (Node q = p; q != h;) {
522 <                            Node n = q.next;  // update head by 2
521 >                        for (Node<E> q = p; q != h;) {
522 >                            Node<E> n = q.next; // update head by 2
523                              if (n != null)    // unless singleton
524                                  q = n;
525                              if (head == h && casHead(h, q)) {
# Line 515 | Line 531 | public class LinkedTransferQueue<E> exte
531                                  break;        // unless slack < 2
532                          }
533                          LockSupport.unpark(p.waiter);
534 <                        return item;
534 >                        return this.<E>cast(item);
535                      }
536                  }
537 <                Node n = p.next;
537 >                Node<E> n = p.next;
538                  p = (p != n) ? n : (h = head); // Use head if p offlist
539              }
540  
541              if (how >= ASYNC) {               // No matches available
542                  if (s == null)
543 <                    s = new Node(e, haveData);
544 <                Node pred = tryAppend(s, haveData);
543 >                    s = new Node<E>(e, haveData);
544 >                Node<E> pred = tryAppend(s, haveData);
545                  if (pred == null)
546                      continue retry;           // lost race vs opposite mode
547                  if (how >= SYNC)
# Line 544 | Line 560 | public class LinkedTransferQueue<E> exte
560       * different mode, else s's predecessor, or s itself if no
561       * predecessor
562       */
563 <    private Node tryAppend(Node s, boolean haveData) {
564 <        for (Node t = tail, p = t;;) { // move p to last node and append
565 <            Node n, u;                        // temps for reads of next & tail
563 >    private Node<E> tryAppend(Node<E> s, boolean haveData) {
564 >        for (Node<E> t = tail, p = t;;) { // move p to last node and append
565 >            Node<E> n, u;                     // temps for reads of next & tail
566              if (p == null && (p = head) == null) {
567                  if (casHead(null, s))
568                      return s;                 // initialize
# Line 582 | Line 598 | public class LinkedTransferQueue<E> exte
598       * @param nanos timeout value
599       * @return matched item, or e if unmatched on interrupt or timeout
600       */
601 <    private Object awaitMatch(Node s, Node pred, Object e,
586 <                              int how, long nanos) {
601 >    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
602          long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
603          Thread w = Thread.currentThread();
604          int spins = -1; // initialized after first item and cancel checks
# Line 592 | Line 607 | public class LinkedTransferQueue<E> exte
607          for (;;) {
608              Object item = s.item;
609              if (item != e) {                  // matched
610 +                assert item != s;
611                  s.forgetContents();           // avoid garbage
612 <                return item;
612 >                return this.<E>cast(item);
613              }
614              if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
615 <                     s.casItem(e, s)) {       // cancel
615 >                    s.casItem(e, s)) {       // cancel
616                  unsplice(pred, s);
617                  return e;
618              }
# Line 632 | Line 648 | public class LinkedTransferQueue<E> exte
648       * Returns spin/yield value for a node with given predecessor and
649       * data mode. See above for explanation.
650       */
651 <    private static int spinsFor(Node pred, boolean haveData) {
651 >    private static int spinsFor(Node<?> pred, boolean haveData) {
652          if (MP && pred != null) {
653              if (pred.isData != haveData)      // phase change
654                  return FRONT_SPINS + CHAINED_SPINS;
# Line 649 | Line 665 | public class LinkedTransferQueue<E> exte
665       * or trailing node; failing on contention.
666       */
667      private void shortenHeadPath() {
668 <        Node h, hn, p, q;
668 >        Node<E> h, hn, p, q;
669          if ((p = h = head) != null && h.isMatched() &&
670              (q = hn = h.next) != null) {
671 <            Node n;
671 >            Node<E> n;
672              while ((n = q.next) != q) {
673                  if (n == null || !q.isMatched()) {
674                      if (hn != q && h.next == hn)
# Line 671 | Line 687 | public class LinkedTransferQueue<E> exte
687       * Returns the first unmatched node of the given mode, or null if
688       * none.  Used by methods isEmpty, hasWaitingConsumer.
689       */
690 <    private Node firstOfMode(boolean data) {
691 <        for (Node p = head; p != null; ) {
690 >    private Node<E> firstOfMode(boolean data) {
691 >        for (Node<E> p = head; p != null; ) {
692              if (!p.isMatched())
693                  return (p.isData == data) ? p : null;
694 <            Node n = p.next;
694 >            Node<E> n = p.next;
695              p = (n != p) ? n : head;
696          }
697          return null;
# Line 683 | Line 699 | public class LinkedTransferQueue<E> exte
699  
700      /**
701       * Returns the item in the first unmatched node with isData; or
702 <     * null if none. Used by peek.
702 >     * null if none.  Used by peek.
703       */
704 <    private Object firstDataItem() {
705 <        for (Node p = head; p != null; ) {
704 >    private E firstDataItem() {
705 >        for (Node<E> p = head; p != null; ) {
706              boolean isData = p.isData;
707              Object item = p.item;
708              if (item != p && (item != null) == isData)
709 <                return isData ? item : null;
710 <            Node n = p.next;
709 >                return isData ? this.<E>cast(item) : null;
710 >            Node<E> n = p.next;
711              p = (n != p) ? n : head;
712          }
713          return null;
# Line 703 | Line 719 | public class LinkedTransferQueue<E> exte
719       */
720      private int countOfMode(boolean data) {
721          int count = 0;
722 <        for (Node p = head; p != null; ) {
722 >        for (Node<E> p = head; p != null; ) {
723              if (!p.isMatched()) {
724                  if (p.isData != data)
725                      return 0;
726                  if (++count == Integer.MAX_VALUE) // saturated
727                      break;
728              }
729 <            Node n = p.next;
729 >            Node<E> n = p.next;
730              if (n != p)
731                  p = n;
732              else {
# Line 722 | Line 738 | public class LinkedTransferQueue<E> exte
738      }
739  
740      final class Itr implements Iterator<E> {
741 <        private Node nextNode;   // next node to return item for
742 <        private Object nextItem; // the corresponding item
743 <        private Node lastRet;    // last returned node, to support remove
741 >        private Node<E> nextNode;   // next node to return item for
742 >        private E nextItem;         // the corresponding item
743 >        private Node<E> lastRet;    // last returned node, to support remove
744 >        private Node<E> lastPred;   // predecessor to unlink lastRet
745  
746          /**
747           * Moves to next node after prev, or first node if prev null.
748           */
749 <        private void advance(Node prev) {
749 >        private void advance(Node<E> prev) {
750 >            lastPred = lastRet;
751              lastRet = prev;
752 <            Node p;
752 >            Node<E> p;
753              if (prev == null || (p = prev.next) == prev)
754                  p = head;
755              while (p != null) {
756                  Object item = p.item;
757                  if (p.isData) {
758                      if (item != null && item != p) {
759 <                        nextItem = item;
759 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
760                          nextNode = p;
761                          return;
762                      }
763                  }
764                  else if (item == null)
765                      break;
766 <                Node n = p.next;
766 >                Node<E> n = p.next;
767                  p = (n != p) ? n : head;
768              }
769              nextNode = null;
# Line 760 | Line 778 | public class LinkedTransferQueue<E> exte
778          }
779  
780          public final E next() {
781 <            Node p = nextNode;
781 >            Node<E> p = nextNode;
782              if (p == null) throw new NoSuchElementException();
783 <            Object e = nextItem;
783 >            E e = nextItem;
784              advance(p);
785 <            return (E) e;
785 >            return e;
786          }
787  
788          public final void remove() {
789 <            Node p = lastRet;
789 >            Node<E> p = lastRet;
790              if (p == null) throw new IllegalStateException();
791 <            lastRet = null;
774 <            findAndRemoveNode(p);
791 >            findAndRemoveDataNode(lastPred, p);
792          }
793      }
794  
# Line 784 | Line 801 | public class LinkedTransferQueue<E> exte
801       * @param pred predecessor of node to be unspliced
802       * @param s the node to be unspliced
803       */
804 <    private void unsplice(Node pred, Node s) {
804 >    private void unsplice(Node<E> pred, Node<E> s) {
805          s.forgetContents(); // clear unneeded fields
806          /*
807           * At any given time, exactly one node on list cannot be
# Line 797 | Line 814 | public class LinkedTransferQueue<E> exte
814           */
815          if (pred != null && pred != s) {
816              while (pred.next == s) {
817 <                Node oldpred = (cleanMe == null) ? null : reclean();
818 <                Node n = s.next;
817 >                Node<E> oldpred = (cleanMe == null) ? null : reclean();
818 >                Node<E> n = s.next;
819                  if (n != null) {
820                      if (n != s)
821                          pred.casNext(s, n);
822                      break;
823                  }
824                  if (oldpred == pred ||      // Already saved
825 <                    (oldpred == null && casCleanMe(null, pred)))
826 <                    break;                  // Postpone cleaning
825 >                    ((oldpred == null || oldpred.next == s) &&
826 >                     casCleanMe(oldpred, pred))) {
827 >                    break;
828 >                }
829              }
830          }
831      }
# Line 817 | Line 836 | public class LinkedTransferQueue<E> exte
836       *
837       * @return current cleanMe node (or null)
838       */
839 <    private Node reclean() {
839 >    private Node<E> reclean() {
840          /*
841           * cleanMe is, or at one time was, predecessor of a cancelled
842           * node s that was the tail so could not be unspliced.  If it
# Line 828 | Line 847 | public class LinkedTransferQueue<E> exte
847           * we can (must) clear cleanMe without unsplicing.  This can
848           * loop only due to contention.
849           */
850 <        Node pred;
850 >        Node<E> pred;
851          while ((pred = cleanMe) != null) {
852 <            Node s = pred.next;
853 <            Node n;
852 >            Node<E> s = pred.next;
853 >            Node<E> n;
854              if (s == null || s == pred || !s.isMatched())
855                  casCleanMe(pred, null); // already gone
856              else if ((n = s.next) != null) {
# Line 847 | Line 866 | public class LinkedTransferQueue<E> exte
866  
867      /**
868       * Main implementation of Iterator.remove(). Find
869 <     * and unsplice the given node.
869 >     * and unsplice the given data node.
870 >     * @param possiblePred possible predecessor of s
871 >     * @param s the node to remove
872       */
873 <    final void findAndRemoveNode(Node s) {
873 >    final void findAndRemoveDataNode(Node<E> possiblePred, Node<E> s) {
874 >        assert s.isData;
875          if (s.tryMatchData()) {
876 <            Node pred = null;
877 <            Node p = head;
878 <            while (p != null) {
879 <                if (p == s) {
880 <                    unsplice(pred, p);
881 <                    break;
882 <                }
883 <                if (!p.isData && !p.isMatched())
884 <                    break;
885 <                pred = p;
886 <                if ((p = p.next) == pred) { // stale
887 <                    pred = null;
888 <                    p = head;
876 >            if (possiblePred != null && possiblePred.next == s)
877 >                unsplice(possiblePred, s); // was actual predecessor
878 >            else {
879 >                for (Node<E> pred = null, p = head; p != null; ) {
880 >                    if (p == s) {
881 >                        unsplice(pred, p);
882 >                        break;
883 >                    }
884 >                    if (p.isUnmatchedRequest())
885 >                        break;
886 >                    pred = p;
887 >                    if ((p = p.next) == pred) { // stale
888 >                        pred = null;
889 >                        p = head;
890 >                    }
891                  }
892              }
893          }
# Line 874 | Line 898 | public class LinkedTransferQueue<E> exte
898       */
899      private boolean findAndRemove(Object e) {
900          if (e != null) {
901 <            Node pred = null;
878 <            Node p = head;
879 <            while (p != null) {
901 >            for (Node<E> pred = null, p = head; p != null; ) {
902                  Object item = p.item;
903                  if (p.isData) {
904                      if (item != null && item != p && e.equals(item) &&
# Line 888 | Line 910 | public class LinkedTransferQueue<E> exte
910                  else if (item == null)
911                      break;
912                  pred = p;
913 <                if ((p = p.next) == pred) {
913 >                if ((p = p.next) == pred) { // stale
914                      pred = null;
915                      p = head;
916                  }
# Line 1024 | Line 1046 | public class LinkedTransferQueue<E> exte
1046      }
1047  
1048      public E take() throws InterruptedException {
1049 <        Object e = xfer(null, false, SYNC, 0);
1049 >        E e = xfer(null, false, SYNC, 0);
1050          if (e != null)
1051 <            return (E)e;
1051 >            return e;
1052          Thread.interrupted();
1053          throw new InterruptedException();
1054      }
1055  
1056      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1057 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1057 >        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1058          if (e != null || !Thread.interrupted())
1059 <            return (E)e;
1059 >            return e;
1060          throw new InterruptedException();
1061      }
1062  
1063      public E poll() {
1064 <        return (E)xfer(null, false, NOW, 0);
1064 >        return xfer(null, false, NOW, 0);
1065      }
1066  
1067      /**
# Line 1096 | Line 1118 | public class LinkedTransferQueue<E> exte
1118      }
1119  
1120      public E peek() {
1121 <        return (E) firstDataItem();
1121 >        return firstDataItem();
1122      }
1123  
1124      /**
# Line 1221 | Line 1243 | public class LinkedTransferQueue<E> exte
1243       *
1244       * @return a sun.misc.Unsafe
1245       */
1246 <    private static sun.misc.Unsafe getUnsafe() {
1246 >    static sun.misc.Unsafe getUnsafe() {
1247          try {
1248              return sun.misc.Unsafe.getUnsafe();
1249          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines