[cvs] / jsr166 / src / main / java / util / concurrent / LinkedTransferQueue.java Repository:
ViewVC logotype

Diff of /jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.13, Fri Oct 30 18:35:22 2009 UTC revision 1.14, Mon Nov 2 15:33:59 2009 UTC
# Line 359  Line 359 
359       * precede or follow CASes use simple relaxed forms.  Other       * precede or follow CASes use simple relaxed forms.  Other
360       * cleanups use releasing/lazy writes.       * cleanups use releasing/lazy writes.
361       */       */
362      static final class Node<E> {      static final class Node {
363          final boolean isData;   // false if this is a request node          final boolean isData;   // false if this is a request node
364          volatile Object item;   // initially non-null if isData; CASed to match          volatile Object item;   // initially non-null if isData; CASed to match
365          volatile Node<E> next;          volatile Node next;
366          volatile Thread waiter; // null until waiting          volatile Thread waiter; // null until waiting
367    
368          // CAS methods for fields          // CAS methods for fields
369          final boolean casNext(Node<E> cmp, Node<E> val) {          final boolean casNext(Node cmp, Node val) {
370              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
371          }          }
372    
# Line 379  Line 379 
379           * Creates a new node. Uses relaxed write because item can only           * Creates a new node. Uses relaxed write because item can only
380           * be seen if followed by CAS.           * be seen if followed by CAS.
381           */           */
382          Node(E item, boolean isData) {          Node(Object item, boolean isData) {
383              UNSAFE.putObject(this, itemOffset, item); // relaxed write              UNSAFE.putObject(this, itemOffset, item); // relaxed write
384              this.isData = isData;              this.isData = isData;
385          }          }
# Line 455  Line 455 
455      }      }
456    
457      /** head of the queue; null until first enqueue */      /** head of the queue; null until first enqueue */
458      transient volatile Node<E> head;      transient volatile Node head;
459    
460      /** predecessor of dangling unspliceable node */      /** predecessor of dangling unspliceable node */
461      private transient volatile Node<E> cleanMe; // decl here reduces contention      private transient volatile Node cleanMe; // decl here reduces contention
462    
463      /** tail of the queue; null until first append */      /** tail of the queue; null until first append */
464      private transient volatile Node<E> tail;      private transient volatile Node tail;
465    
466      // CAS methods for fields      // CAS methods for fields
467      private boolean casTail(Node<E> cmp, Node<E> val) {      private boolean casTail(Node cmp, Node val) {
468          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
469      }      }
470    
471      private boolean casHead(Node<E> cmp, Node<E> val) {      private boolean casHead(Node cmp, Node val) {
472          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
473      }      }
474    
475      private boolean casCleanMe(Node<E> cmp, Node<E> val) {      private boolean casCleanMe(Node cmp, Node val) {
476          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
477      }      }
478    
479      /*      /*
480       * Possible values for "how" argument in xfer method. Beware that       * Possible values for "how" argument in xfer method.
      * the order of assigned numerical values matters.  
481       */       */
482      private static final int NOW     = 0; // for untimed poll, tryTransfer      private static final int NOW     = 0; // for untimed poll, tryTransfer
483      private static final int ASYNC   = 1; // for offer, put, add      private static final int ASYNC   = 1; // for offer, put, add
484      private static final int SYNC    = 2; // for transfer, take      private static final int SYNC    = 2; // for transfer, take
485      private static final int TIMEOUT = 3; // for timed poll, tryTransfer      private static final int TIMED = 3; // for timed poll, tryTransfer
486    
487      @SuppressWarnings("unchecked")      @SuppressWarnings("unchecked")
488      static <E> E cast(Object item) {      static <E> E cast(Object item) {
# Line 496  Line 495 
495       *       *
496       * @param e the item or null for take       * @param e the item or null for take
497       * @param haveData true if this is a put, else a take       * @param haveData true if this is a put, else a take
498       * @param how NOW, ASYNC, SYNC, or TIMEOUT       * @param how NOW, ASYNC, SYNC, or TIMED
499       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT       * @param nanos timeout in nanosecs, used only if mode is TIMED
500       * @return an item if matched, else e       * @return an item if matched, else e
501       * @throws NullPointerException if haveData mode but e is null       * @throws NullPointerException if haveData mode but e is null
502       */       */
503      private E xfer(E e, boolean haveData, int how, long nanos) {      private E xfer(E e, boolean haveData, int how, long nanos) {
504          if (haveData && (e == null))          if (haveData && (e == null))
505              throw new NullPointerException();              throw new NullPointerException();
506          Node<E> s = null;                     // the node to append, if needed          Node s = null;                        // the node to append, if needed
507    
508          retry: for (;;) {                     // restart on append race          retry: for (;;) {                     // restart on append race
509    
510              for (Node<E> h = head, p = h; p != null;) {              for (Node h = head, p = h; p != null;) { // find & match first node
                 // find & match first node  
511                  boolean isData = p.isData;                  boolean isData = p.isData;
512                  Object item = p.item;                  Object item = p.item;
513                  if (item != p && (item != null) == isData) { // unmatched                  if (item != p && (item != null) == isData) { // unmatched
514                      if (isData == haveData)   // can't match                      if (isData == haveData)   // can't match
515                          break;                          break;
516                      if (p.casItem(item, e)) { // match                      if (p.casItem(item, e)) { // match
517                          for (Node<E> q = p; q != h;) {                          for (Node q = p; q != h;) {
518                              Node<E> n = q.next; // update head by 2                              Node n = q.next;  // update head by 2
519                              if (n != null)    // unless singleton                              if (n != null)    // unless singleton
520                                  q = n;                                  q = n;
521                              if (head == h && casHead(h, q)) {                              if (head == h && casHead(h, q)) {
# Line 532  Line 530 
530                          return this.<E>cast(item);                          return this.<E>cast(item);
531                      }                      }
532                  }                  }
533                  Node<E> n = p.next;                  Node n = p.next;
534                  p = (p != n) ? n : (h = head); // Use head if p offlist                  p = (p != n) ? n : (h = head); // Use head if p offlist
535              }              }
536    
537              if (how >= ASYNC) {               // No matches available              if (how != NOW) {                 // No matches available
538                  if (s == null)                  if (s == null)
539                      s = new Node<E>(e, haveData);                      s = new Node(e, haveData);
540                  Node<E> pred = tryAppend(s, haveData);                  Node pred = tryAppend(s, haveData);
541                  if (pred == null)                  if (pred == null)
542                      continue retry;           // lost race vs opposite mode                      continue retry;           // lost race vs opposite mode
543                  if (how >= SYNC)                  if (how != ASYNC)
544                      return awaitMatch(s, pred, e, how, nanos);                      return awaitMatch(s, pred, e, (how == TIMED), nanos);
545              }              }
546              return e; // not waiting              return e; // not waiting
547          }          }
# Line 558  Line 556 
556       * different mode, else s's predecessor, or s itself if no       * different mode, else s's predecessor, or s itself if no
557       * predecessor       * predecessor
558       */       */
559      private Node<E> tryAppend(Node<E> s, boolean haveData) {      private Node tryAppend(Node s, boolean haveData) {
560          for (Node<E> t = tail, p = t;;) { // move p to last node and append          for (Node t = tail, p = t;;) {        // move p to last node and append
561              Node<E> n, u;                     // temps for reads of next & tail              Node n, u;                        // temps for reads of next & tail
562              if (p == null && (p = head) == null) {              if (p == null && (p = head) == null) {
563                  if (casHead(null, s))                  if (casHead(null, s))
564                      return s;                 // initialize                      return s;                 // initialize
# Line 592  Line 590 
590       * predecessor, or null if unknown (the null case does not occur       * predecessor, or null if unknown (the null case does not occur
591       * in any current calls but may in possible future extensions)       * in any current calls but may in possible future extensions)
592       * @param e the comparison value for checking match       * @param e the comparison value for checking match
593       * @param how either SYNC or TIMEOUT       * @param timed if true, wait only until timeout elapses
594       * @param nanos timeout value       * @param nanos timeout in nanosecs, used only if timed is true
595       * @return matched item, or e if unmatched on interrupt or timeout       * @return matched item, or e if unmatched on interrupt or timeout
596       */       */
597      private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {      private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
598          long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;          long lastTime = timed ? System.nanoTime() : 0L;
599          Thread w = Thread.currentThread();          Thread w = Thread.currentThread();
600          int spins = -1; // initialized after first item and cancel checks          int spins = -1; // initialized after first item and cancel checks
601          ThreadLocalRandom randomYields = null; // bound if needed          ThreadLocalRandom randomYields = null; // bound if needed
# Line 609  Line 607 
607                  s.forgetContents();           // avoid garbage                  s.forgetContents();           // avoid garbage
608                  return this.<E>cast(item);                  return this.<E>cast(item);
609              }              }
610              if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&              if ((w.isInterrupted() || (timed && nanos <= 0)) &&
611                      s.casItem(e, s)) {       // cancel                      s.casItem(e, s)) {       // cancel
612                  unsplice(pred, s);                  unsplice(pred, s);
613                  return e;                  return e;
# Line 628  Line 626 
626              else if (s.waiter == null) {              else if (s.waiter == null) {
627                  s.waiter = w;                 // request unpark then recheck                  s.waiter = w;                 // request unpark then recheck
628              }              }
629              else if (how == TIMEOUT) {              else if (timed) {
630                  long now = System.nanoTime();                  long now = System.nanoTime();
631                  if ((nanos -= now - lastTime) > 0)                  if ((nanos -= now - lastTime) > 0)
632                      LockSupport.parkNanos(this, nanos);                      LockSupport.parkNanos(this, nanos);
# Line 646  Line 644 
644       * Returns spin/yield value for a node with given predecessor and       * Returns spin/yield value for a node with given predecessor and
645       * data mode. See above for explanation.       * data mode. See above for explanation.
646       */       */
647      private static int spinsFor(Node<?> pred, boolean haveData) {      private static int spinsFor(Node pred, boolean haveData) {
648          if (MP && pred != null) {          if (MP && pred != null) {
649              if (pred.isData != haveData)      // phase change              if (pred.isData != haveData)      // phase change
650                  return FRONT_SPINS + CHAINED_SPINS;                  return FRONT_SPINS + CHAINED_SPINS;
# Line 663  Line 661 
661       * or trailing node; failing on contention.       * or trailing node; failing on contention.
662       */       */
663      private void shortenHeadPath() {      private void shortenHeadPath() {
664          Node<E> h, hn, p, q;          Node h, hn, p, q;
665          if ((p = h = head) != null && h.isMatched() &&          if ((p = h = head) != null && h.isMatched() &&
666              (q = hn = h.next) != null) {              (q = hn = h.next) != null) {
667              Node<E> n;              Node n;
668              while ((n = q.next) != q) {              while ((n = q.next) != q) {
669                  if (n == null || !q.isMatched()) {                  if (n == null || !q.isMatched()) {
670                      if (hn != q && h.next == hn)                      if (hn != q && h.next == hn)
# Line 682  Line 680 
680      /* -------------- Traversal methods -------------- */      /* -------------- Traversal methods -------------- */
681    
682      /**      /**
683         * Returns the successor of p, or the head node if p.next has been
684         * linked to self, which will only be true if traversing with a
685         * stale pointer that is now off the list.
686         */
687        final Node succ(Node p) {
688            Node next = p.next;
689            return (p == next) ? head : next;
690        }
691    
692        /**
693       * Returns the first unmatched node of the given mode, or null if       * Returns the first unmatched node of the given mode, or null if
694       * none.  Used by methods isEmpty, hasWaitingConsumer.       * none.  Used by methods isEmpty, hasWaitingConsumer.
695       */       */
696      private Node<E> firstOfMode(boolean data) {      private Node firstOfMode(boolean isData) {
697          for (Node<E> p = head; p != null; ) {          for (Node p = head; p != null; p = succ(p)) {
698              if (!p.isMatched())              if (!p.isMatched())
699                  return (p.isData == data) ? p : null;                  return (p.isData == isData) ? p : null;
             Node<E> n = p.next;  
             p = (n != p) ? n : head;  
700          }          }
701          return null;          return null;
702      }      }
# Line 700  Line 706 
706       * null if none.  Used by peek.       * null if none.  Used by peek.
707       */       */
708      private E firstDataItem() {      private E firstDataItem() {
709          for (Node<E> p = head; p != null; ) {          for (Node p = head; p != null; p = succ(p)) {
             boolean isData = p.isData;  
710              Object item = p.item;              Object item = p.item;
711              if (item != p && (item != null) == isData)              if (p.isData) {
712                  return isData ? this.<E>cast(item) : null;                  if (item != null && item != p)
713              Node<E> n = p.next;                      return this.<E>cast(item);
714              p = (n != p) ? n : head;              }
715                else if (item == null)
716                    return null;
717          }          }
718          return null;          return null;
719      }      }
# Line 717  Line 724 
724       */       */
725      private int countOfMode(boolean data) {      private int countOfMode(boolean data) {
726          int count = 0;          int count = 0;
727          for (Node<E> p = head; p != null; ) {          for (Node p = head; p != null; ) {
728              if (!p.isMatched()) {              if (!p.isMatched()) {
729                  if (p.isData != data)                  if (p.isData != data)
730                      return 0;                      return 0;
731                  if (++count == Integer.MAX_VALUE) // saturated                  if (++count == Integer.MAX_VALUE) // saturated
732                      break;                      break;
733              }              }
734              Node<E> n = p.next;              Node n = p.next;
735              if (n != p)              if (n != p)
736                  p = n;                  p = n;
737              else {              else {
# Line 736  Line 743 
743      }      }
744    
745      final class Itr implements Iterator<E> {      final class Itr implements Iterator<E> {
746          private Node<E> nextNode;   // next node to return item for          private Node nextNode;   // next node to return item for
747          private E nextItem;         // the corresponding item          private E nextItem;         // the corresponding item
748          private Node<E> lastRet;    // last returned node, to support remove          private Node lastRet;    // last returned node, to support remove
749          private Node<E> lastPred;   // predecessor to unlink lastRet          private Node lastPred;   // predecessor to unlink lastRet
750    
751          /**          /**
752           * Moves to next node after prev, or first node if prev null.           * Moves to next node after prev, or first node if prev null.
753           */           */
754          private void advance(Node<E> prev) {          private void advance(Node prev) {
755              lastPred = lastRet;              lastPred = lastRet;
756              lastRet = prev;              lastRet = prev;
757              Node<E> p;              for (Node p = (prev == null) ? head : succ(prev);
758              if (prev == null || (p = prev.next) == prev)                   p != null; p = succ(p)) {
                 p = head;  
             while (p != null) {  
759                  Object item = p.item;                  Object item = p.item;
760                  if (p.isData) {                  if (p.isData) {
761                      if (item != null && item != p) {                      if (item != null && item != p) {
# Line 761  Line 766 
766                  }                  }
767                  else if (item == null)                  else if (item == null)
768                      break;                      break;
                 Node<E> n = p.next;  
                 p = (n != p) ? n : head;  
769              }              }
770              nextNode = null;              nextNode = null;
771          }          }
# Line 776  Line 779 
779          }          }
780    
781          public final E next() {          public final E next() {
782              Node<E> p = nextNode;              Node p = nextNode;
783              if (p == null) throw new NoSuchElementException();              if (p == null) throw new NoSuchElementException();
784              E e = nextItem;              E e = nextItem;
785              advance(p);              advance(p);
# Line 784  Line 787 
787          }          }
788    
789          public final void remove() {          public final void remove() {
790              Node<E> p = lastRet;              Node p = lastRet;
791              if (p == null) throw new IllegalStateException();              if (p == null) throw new IllegalStateException();
792              findAndRemoveDataNode(lastPred, p);              findAndRemoveDataNode(lastPred, p);
793          }          }
# Line 799  Line 802 
802       * @param pred predecessor of node to be unspliced       * @param pred predecessor of node to be unspliced
803       * @param s the node to be unspliced       * @param s the node to be unspliced
804       */       */
805      private void unsplice(Node<E> pred, Node<E> s) {      private void unsplice(Node pred, Node s) {
806          s.forgetContents(); // clear unneeded fields          s.forgetContents(); // clear unneeded fields
807          /*          /*
808           * At any given time, exactly one node on list cannot be           * At any given time, exactly one node on list cannot be
# Line 812  Line 815 
815           */           */
816          if (pred != null && pred != s) {          if (pred != null && pred != s) {
817              while (pred.next == s) {              while (pred.next == s) {
818                  Node<E> oldpred = (cleanMe == null) ? null : reclean();                  Node oldpred = (cleanMe == null) ? null : reclean();
819                  Node<E> n = s.next;                  Node n = s.next;
820                  if (n != null) {                  if (n != null) {
821                      if (n != s)                      if (n != s)
822                          pred.casNext(s, n);                          pred.casNext(s, n);
# Line 834  Line 837 
837       *       *
838       * @return current cleanMe node (or null)       * @return current cleanMe node (or null)
839       */       */
840      private Node<E> reclean() {      private Node reclean() {
841          /*          /*
842           * cleanMe is, or at one time was, predecessor of a cancelled           * cleanMe is, or at one time was, predecessor of a cancelled
843           * node s that was the tail so could not be unspliced.  If it           * node s that was the tail so could not be unspliced.  If it
# Line 845  Line 848 
848           * we can (must) clear cleanMe without unsplicing.  This can           * we can (must) clear cleanMe without unsplicing.  This can
849           * loop only due to contention.           * loop only due to contention.
850           */           */
851          Node<E> pred;          Node pred;
852          while ((pred = cleanMe) != null) {          while ((pred = cleanMe) != null) {
853              Node<E> s = pred.next;              Node s = pred.next;
854              Node<E> n;              Node n;
855              if (s == null || s == pred || !s.isMatched())              if (s == null || s == pred || !s.isMatched())
856                  casCleanMe(pred, null); // already gone                  casCleanMe(pred, null); // already gone
857              else if ((n = s.next) != null) {              else if ((n = s.next) != null) {
# Line 868  Line 871 
871       * @param possiblePred possible predecessor of s       * @param possiblePred possible predecessor of s
872       * @param s the node to remove       * @param s the node to remove
873       */       */
874      final void findAndRemoveDataNode(Node<E> possiblePred, Node<E> s) {      final void findAndRemoveDataNode(Node possiblePred, Node s) {
875          assert s.isData;          assert s.isData;
876          if (s.tryMatchData()) {          if (s.tryMatchData()) {
877              if (possiblePred != null && possiblePred.next == s)              if (possiblePred != null && possiblePred.next == s)
878                  unsplice(possiblePred, s); // was actual predecessor                  unsplice(possiblePred, s); // was actual predecessor
879              else {              else {
880                  for (Node<E> pred = null, p = head; p != null; ) {                  for (Node pred = null, p = head; p != null; ) {
881                      if (p == s) {                      if (p == s) {
882                          unsplice(pred, p);                          unsplice(pred, p);
883                          break;                          break;
# Line 896  Line 899 
899       */       */
900      private boolean findAndRemove(Object e) {      private boolean findAndRemove(Object e) {
901          if (e != null) {          if (e != null) {
902              for (Node<E> pred = null, p = head; p != null; ) {              for (Node pred = null, p = head; p != null; ) {
903                  Object item = p.item;                  Object item = p.item;
904                  if (p.isData) {                  if (p.isData) {
905                      if (item != null && item != p && e.equals(item) &&                      if (item != null && item != p && e.equals(item) &&
# Line 1036  Line 1039 
1039       */       */
1040      public boolean tryTransfer(E e, long timeout, TimeUnit unit)      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1041          throws InterruptedException {          throws InterruptedException {
1042          if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)          if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1043              return true;              return true;
1044          if (!Thread.interrupted())          if (!Thread.interrupted())
1045              return false;              return false;
# Line 1052  Line 1055 
1055      }      }
1056    
1057      public E poll(long timeout, TimeUnit unit) throws InterruptedException {      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1058          E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));          E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1059          if (e != null || !Thread.interrupted())          if (e != null || !Thread.interrupted())
1060              return e;              return e;
1061          throw new InterruptedException();          throw new InterruptedException();

Legend:
Removed from v.1.13  
changed lines
  Added in v.1.14

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8