ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.51 by dl, Sat Oct 24 14:33:29 2009 UTC vs.
Revision 1.66 by jsr166, Mon Nov 2 18:38:37 2009 UTC

# Line 373 | Line 373 | public class LinkedTransferQueue<E> exte
373          }
374  
375          final boolean casItem(Object cmp, Object val) {
376 +            assert cmp == null || cmp.getClass() != Node.class;
377              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
378          }
379  
# Line 409 | Line 410 | public class LinkedTransferQueue<E> exte
410           */
411          final boolean isMatched() {
412              Object x = item;
413 <            return x == this || (x != null) != isData;
413 >            return (x == this) || ((x == null) == isData);
414 >        }
415 >
416 >        /**
417 >         * Returns true if this is an unmatched request node.
418 >         */
419 >        final boolean isUnmatchedRequest() {
420 >            return !isData && item == null;
421          }
422  
423          /**
# Line 427 | Line 435 | public class LinkedTransferQueue<E> exte
435           * Tries to artificially match a data node -- used by remove.
436           */
437          final boolean tryMatchData() {
438 +            assert isData;
439              Object x = item;
440              if (x != null && x != this && casItem(x, null)) {
441                  LockSupport.unpark(waiter);
# Line 448 | Line 457 | public class LinkedTransferQueue<E> exte
457      }
458  
459      /** head of the queue; null until first enqueue */
460 <    private transient volatile Node head;
460 >    transient volatile Node head;
461  
462      /** predecessor of dangling unspliceable node */
463 <    private transient volatile Node cleanMe; // decl here to reduce contention
463 >    private transient volatile Node cleanMe; // decl here reduces contention
464  
465      /** tail of the queue; null until first append */
466      private transient volatile Node tail;
# Line 470 | Line 479 | public class LinkedTransferQueue<E> exte
479      }
480  
481      /*
482 <     * Possible values for "how" argument in xfer method. Beware that
474 <     * the order of assigned numerical values matters.
482 >     * Possible values for "how" argument in xfer method.
483       */
484 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
485 <    private static final int ASYNC   = 1; // for offer, put, add
486 <    private static final int SYNC    = 2; // for transfer, take
487 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
484 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
485 >    private static final int ASYNC = 1; // for offer, put, add
486 >    private static final int SYNC  = 2; // for transfer, take
487 >    private static final int TIMED = 3; // for timed poll, tryTransfer
488 >
489 >    @SuppressWarnings("unchecked")
490 >    static <E> E cast(Object item) {
491 >        assert item == null || item.getClass() != Node.class;
492 >        return (E) item;
493 >    }
494  
495      /**
496       * Implements all queuing methods. See above for explanation.
497       *
498       * @param e the item or null for take
499       * @param haveData true if this is a put, else a take
500 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
501 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
500 >     * @param how NOW, ASYNC, SYNC, or TIMED
501 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
502       * @return an item if matched, else e
503       * @throws NullPointerException if haveData mode but e is null
504       */
505 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
505 >    private E xfer(E e, boolean haveData, int how, long nanos) {
506          if (haveData && (e == null))
507              throw new NullPointerException();
508          Node s = null;                        // the node to append, if needed
# Line 502 | Line 516 | public class LinkedTransferQueue<E> exte
516                      if (isData == haveData)   // can't match
517                          break;
518                      if (p.casItem(item, e)) { // match
519 <                        LockSupport.unpark(p.waiter);
520 <                        while (p != h) {      // update head
521 <                            Node n = p.next;  // by 2 unless singleton
522 <                            if (n != null)
523 <                                p = n;
510 <                            if (head == h && casHead(h, p)) {
519 >                        for (Node q = p; q != h;) {
520 >                            Node n = q.next;  // update head by 2
521 >                            if (n != null)    // unless singleton
522 >                                q = n;
523 >                            if (head == h && casHead(h, q)) {
524                                  h.forgetNext();
525                                  break;
526                              }                 // advance and retry
527                              if ((h = head)   == null ||
528 <                                (p = h.next) == null || !p.isMatched())
528 >                                (q = h.next) == null || !q.isMatched())
529                                  break;        // unless slack < 2
530                          }
531 <                        return item;
531 >                        LockSupport.unpark(p.waiter);
532 >                        return this.<E>cast(item);
533                      }
534                  }
535                  Node n = p.next;
536                  p = (p != n) ? n : (h = head); // Use head if p offlist
537              }
538  
539 <            if (how >= ASYNC) {               // No matches available
539 >            if (how != NOW) {                 // No matches available
540                  if (s == null)
541                      s = new Node(e, haveData);
542                  Node pred = tryAppend(s, haveData);
543                  if (pred == null)
544                      continue retry;           // lost race vs opposite mode
545 <                if (how >= SYNC)
546 <                    return awaitMatch(s, pred, e, how, nanos);
545 >                if (how != ASYNC)
546 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
547              }
548              return e; // not waiting
549          }
# Line 545 | Line 559 | public class LinkedTransferQueue<E> exte
559       * predecessor
560       */
561      private Node tryAppend(Node s, boolean haveData) {
562 <        for (Node t = tail, p = t;;) { // move p to last node and append
562 >        for (Node t = tail, p = t;;) {        // move p to last node and append
563              Node n, u;                        // temps for reads of next & tail
564              if (p == null && (p = head) == null) {
565                  if (casHead(null, s))
# Line 578 | Line 592 | public class LinkedTransferQueue<E> exte
592       * predecessor, or null if unknown (the null case does not occur
593       * in any current calls but may in possible future extensions)
594       * @param e the comparison value for checking match
595 <     * @param how either SYNC or TIMEOUT
596 <     * @param nanos timeout value
595 >     * @param timed if true, wait only until timeout elapses
596 >     * @param nanos timeout in nanosecs, used only if timed is true
597       * @return matched item, or e if unmatched on interrupt or timeout
598       */
599 <    private Object awaitMatch(Node s, Node pred, Object e,
600 <                              int how, long nanos) {
587 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
599 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
600 >        long lastTime = timed ? System.nanoTime() : 0L;
601          Thread w = Thread.currentThread();
602          int spins = -1; // initialized after first item and cancel checks
603          ThreadLocalRandom randomYields = null; // bound if needed
# Line 592 | Line 605 | public class LinkedTransferQueue<E> exte
605          for (;;) {
606              Object item = s.item;
607              if (item != e) {                  // matched
608 +                assert item != s;
609                  s.forgetContents();           // avoid garbage
610 <                return item;
610 >                return this.<E>cast(item);
611              }
612 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
613 <                     s.casItem(e, s)) {       // cancel
612 >            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
613 >                    s.casItem(e, s)) {       // cancel
614                  unsplice(pred, s);
615                  return e;
616              }
# Line 614 | Line 628 | public class LinkedTransferQueue<E> exte
628              else if (s.waiter == null) {
629                  s.waiter = w;                 // request unpark then recheck
630              }
631 <            else if (how == TIMEOUT) {
631 >            else if (timed) {
632                  long now = System.nanoTime();
633                  if ((nanos -= now - lastTime) > 0)
634                      LockSupport.parkNanos(this, nanos);
# Line 668 | Line 682 | public class LinkedTransferQueue<E> exte
682      /* -------------- Traversal methods -------------- */
683  
684      /**
685 +     * Returns the successor of p, or the head node if p.next has been
686 +     * linked to self, which will only be true if traversing with a
687 +     * stale pointer that is now off the list.
688 +     */
689 +    final Node succ(Node p) {
690 +        Node next = p.next;
691 +        return (p == next) ? head : next;
692 +    }
693 +
694 +    /**
695       * Returns the first unmatched node of the given mode, or null if
696       * none.  Used by methods isEmpty, hasWaitingConsumer.
697       */
698 <    private Node firstOfMode(boolean data) {
699 <        for (Node p = head; p != null; ) {
698 >    private Node firstOfMode(boolean isData) {
699 >        for (Node p = head; p != null; p = succ(p)) {
700              if (!p.isMatched())
701 <                return (p.isData == data) ? p : null;
678 <            Node n = p.next;
679 <            p = (n != p) ? n : head;
701 >                return (p.isData == isData) ? p : null;
702          }
703          return null;
704      }
705  
706      /**
707       * Returns the item in the first unmatched node with isData; or
708 <     * null if none. Used by peek.
708 >     * null if none.  Used by peek.
709       */
710 <    private Object firstDataItem() {
711 <        for (Node p = head; p != null; ) {
690 <            boolean isData = p.isData;
710 >    private E firstDataItem() {
711 >        for (Node p = head; p != null; p = succ(p)) {
712              Object item = p.item;
713 <            if (item != p && (item != null) == isData)
714 <                return isData ? item : null;
715 <            Node n = p.next;
716 <            p = (n != p) ? n : head;
713 >            if (p.isData) {
714 >                if (item != null && item != p)
715 >                    return this.<E>cast(item);
716 >            }
717 >            else if (item == null)
718 >                return null;
719          }
720          return null;
721      }
# Line 723 | Line 746 | public class LinkedTransferQueue<E> exte
746  
747      final class Itr implements Iterator<E> {
748          private Node nextNode;   // next node to return item for
749 <        private Object nextItem; // the corresponding item
749 >        private E nextItem;      // the corresponding item
750          private Node lastRet;    // last returned node, to support remove
751 +        private Node lastPred;   // predecessor to unlink lastRet
752  
753          /**
754           * Moves to next node after prev, or first node if prev null.
755           */
756          private void advance(Node prev) {
757 +            lastPred = lastRet;
758              lastRet = prev;
759 <            Node p;
760 <            if (prev == null || (p = prev.next) == prev)
736 <                p = head;
737 <            while (p != null) {
759 >            for (Node p = (prev == null) ? head : succ(prev);
760 >                 p != null; p = succ(p)) {
761                  Object item = p.item;
762                  if (p.isData) {
763                      if (item != null && item != p) {
764 <                        nextItem = item;
764 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
765                          nextNode = p;
766                          return;
767                      }
768                  }
769                  else if (item == null)
770                      break;
748                Node n = p.next;
749                p = (n != p) ? n : head;
771              }
772              nextNode = null;
773          }
# Line 762 | Line 783 | public class LinkedTransferQueue<E> exte
783          public final E next() {
784              Node p = nextNode;
785              if (p == null) throw new NoSuchElementException();
786 <            Object e = nextItem;
786 >            E e = nextItem;
787              advance(p);
788 <            return (E) e;
788 >            return e;
789          }
790  
791          public final void remove() {
792              Node p = lastRet;
793              if (p == null) throw new IllegalStateException();
794 <            lastRet = null;
774 <            findAndRemoveNode(p);
794 >            findAndRemoveDataNode(lastPred, p);
795          }
796      }
797  
# Line 805 | Line 825 | public class LinkedTransferQueue<E> exte
825                      break;
826                  }
827                  if (oldpred == pred ||      // Already saved
828 <                    (oldpred == null && casCleanMe(null, pred)))
829 <                    break;                  // Postpone cleaning
828 >                    ((oldpred == null || oldpred.next == s) &&
829 >                     casCleanMe(oldpred, pred))) {
830 >                    break;
831 >                }
832              }
833          }
834      }
# Line 846 | Line 868 | public class LinkedTransferQueue<E> exte
868      }
869  
870      /**
871 <     * Main implementation of Iterator.remove(). Find
872 <     * and unsplice the given node.
871 >     * Main implementation of Iterator.remove(). Finds
872 >     * and unsplices the given data node.
873 >     *
874 >     * @param possiblePred possible predecessor of s
875 >     * @param s the node to remove
876       */
877 <    final void findAndRemoveNode(Node s) {
877 >    final void findAndRemoveDataNode(Node possiblePred, Node s) {
878 >        assert s.isData;
879          if (s.tryMatchData()) {
880 <            Node pred = null;
881 <            Node p = head;
882 <            while (p != null) {
883 <                if (p == s) {
884 <                    unsplice(pred, p);
885 <                    break;
886 <                }
887 <                if (!p.isData && !p.isMatched())
888 <                    break;
889 <                pred = p;
890 <                if ((p = p.next) == pred) { // stale
891 <                    pred = null;
892 <                    p = head;
880 >            if (possiblePred != null && possiblePred.next == s)
881 >                unsplice(possiblePred, s); // was actual predecessor
882 >            else {
883 >                for (Node pred = null, p = head; p != null; ) {
884 >                    if (p == s) {
885 >                        unsplice(pred, p);
886 >                        break;
887 >                    }
888 >                    if (p.isUnmatchedRequest())
889 >                        break;
890 >                    pred = p;
891 >                    if ((p = p.next) == pred) { // stale
892 >                        pred = null;
893 >                        p = head;
894 >                    }
895                  }
896              }
897          }
# Line 874 | Line 902 | public class LinkedTransferQueue<E> exte
902       */
903      private boolean findAndRemove(Object e) {
904          if (e != null) {
905 <            Node pred = null;
878 <            Node p = head;
879 <            while (p != null) {
905 >            for (Node pred = null, p = head; p != null; ) {
906                  Object item = p.item;
907                  if (p.isData) {
908                      if (item != null && item != p && e.equals(item) &&
# Line 888 | Line 914 | public class LinkedTransferQueue<E> exte
914                  else if (item == null)
915                      break;
916                  pred = p;
917 <                if ((p = p.next) == pred) {
917 >                if ((p = p.next) == pred) { // stale
918                      pred = null;
919                      p = head;
920                  }
# Line 1016 | Line 1042 | public class LinkedTransferQueue<E> exte
1042       */
1043      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1044          throws InterruptedException {
1045 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1045 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1046              return true;
1047          if (!Thread.interrupted())
1048              return false;
# Line 1024 | Line 1050 | public class LinkedTransferQueue<E> exte
1050      }
1051  
1052      public E take() throws InterruptedException {
1053 <        Object e = xfer(null, false, SYNC, 0);
1053 >        E e = xfer(null, false, SYNC, 0);
1054          if (e != null)
1055 <            return (E)e;
1055 >            return e;
1056          Thread.interrupted();
1057          throw new InterruptedException();
1058      }
1059  
1060      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1061 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1061 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1062          if (e != null || !Thread.interrupted())
1063 <            return (E)e;
1063 >            return e;
1064          throw new InterruptedException();
1065      }
1066  
1067      public E poll() {
1068 <        return (E)xfer(null, false, NOW, 0);
1068 >        return xfer(null, false, NOW, 0);
1069      }
1070  
1071      /**
# Line 1096 | Line 1122 | public class LinkedTransferQueue<E> exte
1122      }
1123  
1124      public E peek() {
1125 <        return (E) firstDataItem();
1125 >        return firstDataItem();
1126      }
1127  
1128      /**
# Line 1192 | Line 1218 | public class LinkedTransferQueue<E> exte
1218          }
1219      }
1220  
1195
1221      // Unsafe mechanics
1222  
1223      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
# Line 1215 | Line 1240 | public class LinkedTransferQueue<E> exte
1240          }
1241      }
1242  
1243 <    private static sun.misc.Unsafe getUnsafe() {
1243 >    /**
1244 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1245 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
1246 >     * into a jdk.
1247 >     *
1248 >     * @return a sun.misc.Unsafe
1249 >     */
1250 >    static sun.misc.Unsafe getUnsafe() {
1251          try {
1252              return sun.misc.Unsafe.getUnsafe();
1253          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines