ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.13 by jsr166, Fri Oct 30 18:35:22 2009 UTC vs.
Revision 1.14 by jsr166, Mon Nov 2 15:33:59 2009 UTC

# Line 359 | Line 359 | public class LinkedTransferQueue<E> exte
359       * precede or follow CASes use simple relaxed forms.  Other
360       * cleanups use releasing/lazy writes.
361       */
362 <    static final class Node<E> {
362 >    static final class Node {
363          final boolean isData;   // false if this is a request node
364          volatile Object item;   // initially non-null if isData; CASed to match
365 <        volatile Node<E> next;
365 >        volatile Node next;
366          volatile Thread waiter; // null until waiting
367  
368          // CAS methods for fields
369 <        final boolean casNext(Node<E> cmp, Node<E> val) {
369 >        final boolean casNext(Node cmp, Node val) {
370              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
371          }
372  
# Line 379 | Line 379 | public class LinkedTransferQueue<E> exte
379           * Creates a new node. Uses relaxed write because item can only
380           * be seen if followed by CAS.
381           */
382 <        Node(E item, boolean isData) {
382 >        Node(Object item, boolean isData) {
383              UNSAFE.putObject(this, itemOffset, item); // relaxed write
384              this.isData = isData;
385          }
# Line 455 | Line 455 | public class LinkedTransferQueue<E> exte
455      }
456  
457      /** head of the queue; null until first enqueue */
458 <    transient volatile Node<E> head;
458 >    transient volatile Node head;
459  
460      /** predecessor of dangling unspliceable node */
461 <    private transient volatile Node<E> cleanMe; // decl here reduces contention
461 >    private transient volatile Node cleanMe; // decl here reduces contention
462  
463      /** tail of the queue; null until first append */
464 <    private transient volatile Node<E> tail;
464 >    private transient volatile Node tail;
465  
466      // CAS methods for fields
467 <    private boolean casTail(Node<E> cmp, Node<E> val) {
467 >    private boolean casTail(Node cmp, Node val) {
468          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
469      }
470  
471 <    private boolean casHead(Node<E> cmp, Node<E> val) {
471 >    private boolean casHead(Node cmp, Node val) {
472          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
473      }
474  
475 <    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
475 >    private boolean casCleanMe(Node cmp, Node val) {
476          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
477      }
478  
479      /*
480 <     * Possible values for "how" argument in xfer method. Beware that
481 <     * the order of assigned numerical values matters.
480 >     * Possible values for "how" argument in xfer method.
481       */
482 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
483 <    private static final int ASYNC   = 1; // for offer, put, add
484 <    private static final int SYNC    = 2; // for transfer, take
485 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
482 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
483 >    private static final int ASYNC = 1; // for offer, put, add
484 >    private static final int SYNC  = 2; // for transfer, take
485 >    private static final int TIMED = 3; // for timed poll, tryTransfer
486  
487      @SuppressWarnings("unchecked")
488      static <E> E cast(Object item) {
# Line 496 | Line 495 | public class LinkedTransferQueue<E> exte
495       *
496       * @param e the item or null for take
497       * @param haveData true if this is a put, else a take
498 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
499 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
498 >     * @param how NOW, ASYNC, SYNC, or TIMED
499 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
500       * @return an item if matched, else e
501       * @throws NullPointerException if haveData mode but e is null
502       */
503      private E xfer(E e, boolean haveData, int how, long nanos) {
504          if (haveData && (e == null))
505              throw new NullPointerException();
506 <        Node<E> s = null;                     // the node to append, if needed
506 >        Node s = null;                        // the node to append, if needed
507  
508          retry: for (;;) {                     // restart on append race
509  
510 <            for (Node<E> h = head, p = h; p != null;) {
512 <                // find & match first node
510 >            for (Node h = head, p = h; p != null;) { // find & match first node
511                  boolean isData = p.isData;
512                  Object item = p.item;
513                  if (item != p && (item != null) == isData) { // unmatched
514                      if (isData == haveData)   // can't match
515                          break;
516                      if (p.casItem(item, e)) { // match
517 <                        for (Node<E> q = p; q != h;) {
518 <                            Node<E> n = q.next; // update head by 2
517 >                        for (Node q = p; q != h;) {
518 >                            Node n = q.next;  // update head by 2
519                              if (n != null)    // unless singleton
520                                  q = n;
521                              if (head == h && casHead(h, q)) {
# Line 532 | Line 530 | public class LinkedTransferQueue<E> exte
530                          return this.<E>cast(item);
531                      }
532                  }
533 <                Node<E> n = p.next;
533 >                Node n = p.next;
534                  p = (p != n) ? n : (h = head); // Use head if p offlist
535              }
536  
537 <            if (how >= ASYNC) {               // No matches available
537 >            if (how != NOW) {                 // No matches available
538                  if (s == null)
539 <                    s = new Node<E>(e, haveData);
540 <                Node<E> pred = tryAppend(s, haveData);
539 >                    s = new Node(e, haveData);
540 >                Node pred = tryAppend(s, haveData);
541                  if (pred == null)
542                      continue retry;           // lost race vs opposite mode
543 <                if (how >= SYNC)
544 <                    return awaitMatch(s, pred, e, how, nanos);
543 >                if (how != ASYNC)
544 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
545              }
546              return e; // not waiting
547          }
# Line 558 | Line 556 | public class LinkedTransferQueue<E> exte
556       * different mode, else s's predecessor, or s itself if no
557       * predecessor
558       */
559 <    private Node<E> tryAppend(Node<E> s, boolean haveData) {
560 <        for (Node<E> t = tail, p = t;;) { // move p to last node and append
561 <            Node<E> n, u;                     // temps for reads of next & tail
559 >    private Node tryAppend(Node s, boolean haveData) {
560 >        for (Node t = tail, p = t;;) {        // move p to last node and append
561 >            Node n, u;                        // temps for reads of next & tail
562              if (p == null && (p = head) == null) {
563                  if (casHead(null, s))
564                      return s;                 // initialize
# Line 592 | Line 590 | public class LinkedTransferQueue<E> exte
590       * predecessor, or null if unknown (the null case does not occur
591       * in any current calls but may in possible future extensions)
592       * @param e the comparison value for checking match
593 <     * @param how either SYNC or TIMEOUT
594 <     * @param nanos timeout value
593 >     * @param timed if true, wait only until timeout elapses
594 >     * @param nanos timeout in nanosecs, used only if timed is true
595       * @return matched item, or e if unmatched on interrupt or timeout
596       */
597 <    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
598 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
597 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
598 >        long lastTime = timed ? System.nanoTime() : 0L;
599          Thread w = Thread.currentThread();
600          int spins = -1; // initialized after first item and cancel checks
601          ThreadLocalRandom randomYields = null; // bound if needed
# Line 609 | Line 607 | public class LinkedTransferQueue<E> exte
607                  s.forgetContents();           // avoid garbage
608                  return this.<E>cast(item);
609              }
610 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
610 >            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
611                      s.casItem(e, s)) {       // cancel
612                  unsplice(pred, s);
613                  return e;
# Line 628 | Line 626 | public class LinkedTransferQueue<E> exte
626              else if (s.waiter == null) {
627                  s.waiter = w;                 // request unpark then recheck
628              }
629 <            else if (how == TIMEOUT) {
629 >            else if (timed) {
630                  long now = System.nanoTime();
631                  if ((nanos -= now - lastTime) > 0)
632                      LockSupport.parkNanos(this, nanos);
# Line 646 | Line 644 | public class LinkedTransferQueue<E> exte
644       * Returns spin/yield value for a node with given predecessor and
645       * data mode. See above for explanation.
646       */
647 <    private static int spinsFor(Node<?> pred, boolean haveData) {
647 >    private static int spinsFor(Node pred, boolean haveData) {
648          if (MP && pred != null) {
649              if (pred.isData != haveData)      // phase change
650                  return FRONT_SPINS + CHAINED_SPINS;
# Line 663 | Line 661 | public class LinkedTransferQueue<E> exte
661       * or trailing node; failing on contention.
662       */
663      private void shortenHeadPath() {
664 <        Node<E> h, hn, p, q;
664 >        Node h, hn, p, q;
665          if ((p = h = head) != null && h.isMatched() &&
666              (q = hn = h.next) != null) {
667 <            Node<E> n;
667 >            Node n;
668              while ((n = q.next) != q) {
669                  if (n == null || !q.isMatched()) {
670                      if (hn != q && h.next == hn)
# Line 682 | Line 680 | public class LinkedTransferQueue<E> exte
680      /* -------------- Traversal methods -------------- */
681  
682      /**
683 +     * Returns the successor of p, or the head node if p.next has been
684 +     * linked to self, which will only be true if traversing with a
685 +     * stale pointer that is now off the list.
686 +     */
687 +    final Node succ(Node p) {
688 +        Node next = p.next;
689 +        return (p == next) ? head : next;
690 +    }
691 +
692 +    /**
693       * Returns the first unmatched node of the given mode, or null if
694       * none.  Used by methods isEmpty, hasWaitingConsumer.
695       */
696 <    private Node<E> firstOfMode(boolean data) {
697 <        for (Node<E> p = head; p != null; ) {
696 >    private Node firstOfMode(boolean isData) {
697 >        for (Node p = head; p != null; p = succ(p)) {
698              if (!p.isMatched())
699 <                return (p.isData == data) ? p : null;
692 <            Node<E> n = p.next;
693 <            p = (n != p) ? n : head;
699 >                return (p.isData == isData) ? p : null;
700          }
701          return null;
702      }
# Line 700 | Line 706 | public class LinkedTransferQueue<E> exte
706       * null if none.  Used by peek.
707       */
708      private E firstDataItem() {
709 <        for (Node<E> p = head; p != null; ) {
704 <            boolean isData = p.isData;
709 >        for (Node p = head; p != null; p = succ(p)) {
710              Object item = p.item;
711 <            if (item != p && (item != null) == isData)
712 <                return isData ? this.<E>cast(item) : null;
713 <            Node<E> n = p.next;
714 <            p = (n != p) ? n : head;
711 >            if (p.isData) {
712 >                if (item != null && item != p)
713 >                    return this.<E>cast(item);
714 >            }
715 >            else if (item == null)
716 >                return null;
717          }
718          return null;
719      }
# Line 717 | Line 724 | public class LinkedTransferQueue<E> exte
724       */
725      private int countOfMode(boolean data) {
726          int count = 0;
727 <        for (Node<E> p = head; p != null; ) {
727 >        for (Node p = head; p != null; ) {
728              if (!p.isMatched()) {
729                  if (p.isData != data)
730                      return 0;
731                  if (++count == Integer.MAX_VALUE) // saturated
732                      break;
733              }
734 <            Node<E> n = p.next;
734 >            Node n = p.next;
735              if (n != p)
736                  p = n;
737              else {
# Line 736 | Line 743 | public class LinkedTransferQueue<E> exte
743      }
744  
745      final class Itr implements Iterator<E> {
746 <        private Node<E> nextNode;   // next node to return item for
747 <        private E nextItem;         // the corresponding item
748 <        private Node<E> lastRet;    // last returned node, to support remove
749 <        private Node<E> lastPred;   // predecessor to unlink lastRet
746 >        private Node nextNode;   // next node to return item for
747 >        private E nextItem;      // the corresponding item
748 >        private Node lastRet;    // last returned node, to support remove
749 >        private Node lastPred;   // predecessor to unlink lastRet
750  
751          /**
752           * Moves to next node after prev, or first node if prev null.
753           */
754 <        private void advance(Node<E> prev) {
754 >        private void advance(Node prev) {
755              lastPred = lastRet;
756              lastRet = prev;
757 <            Node<E> p;
758 <            if (prev == null || (p = prev.next) == prev)
752 <                p = head;
753 <            while (p != null) {
757 >            for (Node p = (prev == null) ? head : succ(prev);
758 >                 p != null; p = succ(p)) {
759                  Object item = p.item;
760                  if (p.isData) {
761                      if (item != null && item != p) {
# Line 761 | Line 766 | public class LinkedTransferQueue<E> exte
766                  }
767                  else if (item == null)
768                      break;
764                Node<E> n = p.next;
765                p = (n != p) ? n : head;
769              }
770              nextNode = null;
771          }
# Line 776 | Line 779 | public class LinkedTransferQueue<E> exte
779          }
780  
781          public final E next() {
782 <            Node<E> p = nextNode;
782 >            Node p = nextNode;
783              if (p == null) throw new NoSuchElementException();
784              E e = nextItem;
785              advance(p);
# Line 784 | Line 787 | public class LinkedTransferQueue<E> exte
787          }
788  
789          public final void remove() {
790 <            Node<E> p = lastRet;
790 >            Node p = lastRet;
791              if (p == null) throw new IllegalStateException();
792              findAndRemoveDataNode(lastPred, p);
793          }
# Line 799 | Line 802 | public class LinkedTransferQueue<E> exte
802       * @param pred predecessor of node to be unspliced
803       * @param s the node to be unspliced
804       */
805 <    private void unsplice(Node<E> pred, Node<E> s) {
805 >    private void unsplice(Node pred, Node s) {
806          s.forgetContents(); // clear unneeded fields
807          /*
808           * At any given time, exactly one node on list cannot be
# Line 812 | Line 815 | public class LinkedTransferQueue<E> exte
815           */
816          if (pred != null && pred != s) {
817              while (pred.next == s) {
818 <                Node<E> oldpred = (cleanMe == null) ? null : reclean();
819 <                Node<E> n = s.next;
818 >                Node oldpred = (cleanMe == null) ? null : reclean();
819 >                Node n = s.next;
820                  if (n != null) {
821                      if (n != s)
822                          pred.casNext(s, n);
# Line 834 | Line 837 | public class LinkedTransferQueue<E> exte
837       *
838       * @return current cleanMe node (or null)
839       */
840 <    private Node<E> reclean() {
840 >    private Node reclean() {
841          /*
842           * cleanMe is, or at one time was, predecessor of a cancelled
843           * node s that was the tail so could not be unspliced.  If it
# Line 845 | Line 848 | public class LinkedTransferQueue<E> exte
848           * we can (must) clear cleanMe without unsplicing.  This can
849           * loop only due to contention.
850           */
851 <        Node<E> pred;
851 >        Node pred;
852          while ((pred = cleanMe) != null) {
853 <            Node<E> s = pred.next;
854 <            Node<E> n;
853 >            Node s = pred.next;
854 >            Node n;
855              if (s == null || s == pred || !s.isMatched())
856                  casCleanMe(pred, null); // already gone
857              else if ((n = s.next) != null) {
# Line 868 | Line 871 | public class LinkedTransferQueue<E> exte
871       * @param possiblePred possible predecessor of s
872       * @param s the node to remove
873       */
874 <    final void findAndRemoveDataNode(Node<E> possiblePred, Node<E> s) {
874 >    final void findAndRemoveDataNode(Node possiblePred, Node s) {
875          assert s.isData;
876          if (s.tryMatchData()) {
877              if (possiblePred != null && possiblePred.next == s)
878                  unsplice(possiblePred, s); // was actual predecessor
879              else {
880 <                for (Node<E> pred = null, p = head; p != null; ) {
880 >                for (Node pred = null, p = head; p != null; ) {
881                      if (p == s) {
882                          unsplice(pred, p);
883                          break;
# Line 896 | Line 899 | public class LinkedTransferQueue<E> exte
899       */
900      private boolean findAndRemove(Object e) {
901          if (e != null) {
902 <            for (Node<E> pred = null, p = head; p != null; ) {
902 >            for (Node pred = null, p = head; p != null; ) {
903                  Object item = p.item;
904                  if (p.isData) {
905                      if (item != null && item != p && e.equals(item) &&
# Line 1036 | Line 1039 | public class LinkedTransferQueue<E> exte
1039       */
1040      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1041          throws InterruptedException {
1042 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1042 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1043              return true;
1044          if (!Thread.interrupted())
1045              return false;
# Line 1052 | Line 1055 | public class LinkedTransferQueue<E> exte
1055      }
1056  
1057      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1058 <        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1058 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1059          if (e != null || !Thread.interrupted())
1060              return e;
1061          throw new InterruptedException();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines