[cvs] / jsr166 / src / main / java / util / concurrent / LinkedTransferQueue.java Repository:
ViewVC logotype

Diff of /jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.4, Wed Jul 29 02:35:47 2009 UTC revision 1.5, Fri Jul 31 20:41:13 2009 UTC
# Line 8  Line 8 
8    
9  import java.util.AbstractQueue;  import java.util.AbstractQueue;
10  import java.util.Collection;  import java.util.Collection;
11    import java.util.ConcurrentModificationException;
12  import java.util.Iterator;  import java.util.Iterator;
13  import java.util.NoSuchElementException;  import java.util.NoSuchElementException;
14    import java.util.Queue;
15  import java.util.concurrent.locks.LockSupport;  import java.util.concurrent.locks.LockSupport;
16  import java.util.concurrent.atomic.AtomicReference;  import java.util.concurrent.atomic.AtomicReference;
17    
# Line 262  Line 264 
264       * @param e the comparison value for checking match       * @param e the comparison value for checking match
265       * @param mode mode       * @param mode mode
266       * @param nanos timeout value       * @param nanos timeout value
267       * @return matched item, or s if cancelled       * @return matched item, or null if cancelled
268       */       */
269      private E awaitFulfill(Node<E> pred, Node<E> s, E e,      private E awaitFulfill(Node<E> pred, Node<E> s, E e,
270                             int mode, long nanos) {                             int mode, long nanos) {
# Line 328  Line 330 
330          for (;;) {          for (;;) {
331              Node<E> h = head.get();              Node<E> h = head.get();
332              Node<E> first = h.next;              Node<E> first = h.next;
333              if (first != null && first.next == first) { // help advance              if (first != null && first.get() == first) { // help advance
334                  advanceHead(h, first);                  advanceHead(h, first);
335                  continue;                  continue;
336              }              }
# Line 441  Line 443 
443      }      }
444    
445      /**      /**
446       * @throws InterruptedException {@inheritDoc}       * Inserts the specified element at the tail of this queue.
447       * @throws NullPointerException {@inheritDoc}       * As the queue is unbounded, this method will never block.
448         *
449         * @throws NullPointerException if the specified element is null
450       */       */
451      public void put(E e) throws InterruptedException {      public void put(E e) {
452          if (e == null) throw new NullPointerException();          offer(e);
         if (Thread.interrupted()) throw new InterruptedException();  
         xfer(e, NOWAIT, 0);  
453      }      }
454    
455      /**      /**
456       * @throws InterruptedException {@inheritDoc}       * Inserts the specified element at the tail of this queue.
457       * @throws NullPointerException {@inheritDoc}       * As the queue is unbounded, this method will never block or
458         * return {@code false}.
459         *
460         * @return {@code true} (as specified by
461         *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
462         * @throws NullPointerException if the specified element is null
463       */       */
464      public boolean offer(E e, long timeout, TimeUnit unit)      public boolean offer(E e, long timeout, TimeUnit unit) {
465          throws InterruptedException {          return offer(e);
         if (e == null) throw new NullPointerException();  
         if (Thread.interrupted()) throw new InterruptedException();  
         xfer(e, NOWAIT, 0);  
         return true;  
466      }      }
467    
468      /**      /**
469       * @throws NullPointerException {@inheritDoc}       * Inserts the specified element at the tail of this queue.
470         * As the queue is unbounded, this method will never return {@code false}.
471         *
472         * @return {@code true} (as specified by
473         *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
474         * @throws NullPointerException if the specified element is null
475       */       */
476      public boolean offer(E e) {      public boolean offer(E e) {
477          if (e == null) throw new NullPointerException();          if (e == null) throw new NullPointerException();
# Line 472  Line 480 
480      }      }
481    
482      /**      /**
483       * @throws NullPointerException {@inheritDoc}       * Inserts the specified element at the tail of this queue.
484         * As the queue is unbounded, this method will never throw
485         * {@link IllegalStateException} or return {@code false}.
486         *
487         * @return {@code true} (as specified by {@link Collection#add})
488         * @throws NullPointerException if the specified element is null
489       */       */
490      public boolean add(E e) {      public boolean add(E e) {
491            return offer(e);
492        }
493    
494        /**
495         * Transfers the specified element immediately if there exists a
496         * consumer already waiting to receive it (in {@link #take} or
497         * timed {@link #poll(long,TimeUnit) poll}), otherwise
498         * returning {@code false} without enqueuing the element.
499         *
500         * @throws NullPointerException if the specified element is null
501         */
502        public boolean tryTransfer(E e) {
503          if (e == null) throw new NullPointerException();          if (e == null) throw new NullPointerException();
504          xfer(e, NOWAIT, 0);          return fulfill(e) != null;
         return true;  
505      }      }
506    
507      /**      /**
508       * @throws InterruptedException {@inheritDoc}       * Inserts the specified element at the tail of this queue,
509       * @throws NullPointerException {@inheritDoc}       * waiting if necessary for the element to be received by a
510         * consumer invoking {@code take} or {@code poll}.
511         *
512         * @throws NullPointerException if the specified element is null
513       */       */
514      public void transfer(E e) throws InterruptedException {      public void transfer(E e) throws InterruptedException {
515          if (e == null) throw new NullPointerException();          if (e == null) throw new NullPointerException();
# Line 493  Line 520 
520      }      }
521    
522      /**      /**
523       * @throws InterruptedException {@inheritDoc}       * Inserts the specified element at the tail of this queue,
524       * @throws NullPointerException {@inheritDoc}       * waiting up to the specified wait time if necessary for the
525         * element to be received by a consumer invoking {@code take} or
526         * {@code poll}.
527         *
528         * @throws NullPointerException if the specified element is null
529       */       */
530      public boolean tryTransfer(E e, long timeout, TimeUnit unit)      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
531          throws InterruptedException {          throws InterruptedException {
# Line 506  Line 537 
537          throw new InterruptedException();          throw new InterruptedException();
538      }      }
539    
     /**  
      * @throws NullPointerException {@inheritDoc}  
      */  
     public boolean tryTransfer(E e) {  
         if (e == null) throw new NullPointerException();  
         return fulfill(e) != null;  
     }  
   
     /**  
      * @throws InterruptedException {@inheritDoc}  
      */  
540      public E take() throws InterruptedException {      public E take() throws InterruptedException {
541          Object e = xfer(null, WAIT, 0);          E e = xfer(null, WAIT, 0);
542          if (e != null)          if (e != null)
543              return (E) e;              return e;
544          Thread.interrupted();          Thread.interrupted();
545          throw new InterruptedException();          throw new InterruptedException();
546      }      }
547    
     /**  
      * @throws InterruptedException {@inheritDoc}  
      */  
548      public E poll(long timeout, TimeUnit unit) throws InterruptedException {      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
549          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));          E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
550          if (e != null || !Thread.interrupted())          if (e != null || !Thread.interrupted())
551              return (E) e;              return e;
552          throw new InterruptedException();          throw new InterruptedException();
553      }      }
554    
# Line 605  Line 622 
622          }          }
623      }      }
624    
625        /**
626         * Returns an iterator over the elements in this queue in proper
627         * sequence, from head to tail.
628         *
629         * <p>The returned iterator is a "weakly consistent" iterator that
630         * will never throw
631         * {@link ConcurrentModificationException ConcurrentModificationException},
632         * and guarantees to traverse elements as they existed upon
633         * construction of the iterator, and may (but is not guaranteed
634         * to) reflect any modifications subsequent to construction.
635         *
636         * @return an iterator over the elements in this queue in proper sequence
637         */
638      public Iterator<E> iterator() {      public Iterator<E> iterator() {
639          return new Itr();          return new Itr();
640      }      }
# Line 620  Line 649 
649      class Itr implements Iterator<E> {      class Itr implements Iterator<E> {
650          Node<E> next;        // node to return next          Node<E> next;        // node to return next
651          Node<E> pnext;       // predecessor of next          Node<E> pnext;       // predecessor of next
         Node<E> snext;       // successor of next  
652          Node<E> curr;        // last returned node, for remove()          Node<E> curr;        // last returned node, for remove()
653          Node<E> pcurr;       // predecessor of curr, for remove()          Node<E> pcurr;       // predecessor of curr, for remove()
654          E nextItem;        // Cache of next item, once committed to in next          E nextItem;        // Cache of next item, once committed to in next
655    
656          Itr() {          Itr() {
657              findNext();              advance();
658          }          }
659    
660          /**          /**
661           * Ensures next points to next valid node, or null if none.           * Moves to next valid node and returns item to return for
662             * next(), or null if no such.
663           */           */
664          void findNext() {          private E advance() {
665                pcurr = pnext;
666                curr = next;
667                E item = nextItem;
668    
669              for (;;) {              for (;;) {
670                  Node<E> pred = pnext;                  pnext = (next == null) ? traversalHead() : next;
671                  Node<E> q = next;                  next = pnext.next;
672                  if (pred == null || pred == q) {                  if (next == pnext) {
                     pred = traversalHead();  
                     q = pred.next;  
                 }  
                 if (q == null || !q.isData) {  
673                      next = null;                      next = null;
674                      return;                      continue;  // restart
675                  }                  }
676                  Object x = q.get();                  if (next == null)
677                  Node<E> s = q.next;                      break;
678                  if (x != null && q != x && q != s) {                  Object x = next.get();
679                    if (x != null && x != next) {
680                      nextItem = (E) x;                      nextItem = (E) x;
681                      snext = s;                      break;
                     pnext = pred;  
                     next = q;  
                     return;  
682                  }                  }
                 pnext = q;  
                 next = s;  
683              }              }
684                return item;
685          }          }
686    
687          public boolean hasNext() {          public boolean hasNext() {
# Line 663  Line 689 
689          }          }
690    
691          public E next() {          public E next() {
692              if (next == null) throw new NoSuchElementException();              if (next == null)
693              pcurr = pnext;                  throw new NoSuchElementException();
694              curr = next;              return advance();
             pnext = next;  
             next = snext;  
             E x = nextItem;  
             findNext();  
             return x;  
695          }          }
696    
697          public void remove() {          public void remove() {
# Line 740  Line 761 
761       * @return the number of elements in this queue       * @return the number of elements in this queue
762       */       */
763      public int size() {      public int size() {
764            for (;;) {
765          int count = 0;          int count = 0;
766          Node<E> h = traversalHead();              Node<E> pred = traversalHead();
767          for (Node<E> p = h.next; p != null && p.isData; p = p.next) {              for (;;) {
768              Object x = p.get();                  Node<E> q = pred.next;
769              if (x != null && x != p) {                  if (q == pred) // restart
                 if (++count == Integer.MAX_VALUE) // saturated  
770                      break;                      break;
771                    if (q == null || !q.isData)
772                        return count;
773                    Object x = q.get();
774                    if (x != null && x != q) {
775                        if (++count == Integer.MAX_VALUE) // saturated
776                            return count;
777                    }
778                    pred = q;
779              }              }
780          }          }
         return count;  
781      }      }
782    
783      public int getWaitingConsumerCount() {      public int getWaitingConsumerCount() {
784            // converse of size -- count valid non-data nodes
785            for (;;) {
786          int count = 0;          int count = 0;
787          Node<E> h = traversalHead();              Node<E> pred = traversalHead();
788          for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {              for (;;) {
789              if (p.get() == null) {                  Node<E> q = pred.next;
790                  if (++count == Integer.MAX_VALUE)                  if (q == pred) // restart
791                      break;                      break;
792                    if (q == null || q.isData)
793                        return count;
794                    Object x = q.get();
795                    if (x == null) {
796                        if (++count == Integer.MAX_VALUE) // saturated
797                            return count;
798              }              }
799                    pred = q;
800          }          }
         return count;  
801      }      }
   
     public int remainingCapacity() {  
         return Integer.MAX_VALUE;  
802      }      }
803    
804      public boolean remove(Object o) {      public boolean remove(Object o) {
# Line 775  Line 808 
808              Node<E> pred = traversalHead();              Node<E> pred = traversalHead();
809              for (;;) {              for (;;) {
810                  Node<E> q = pred.next;                  Node<E> q = pred.next;
                 if (q == null || !q.isData)  
                     return false;  
811                  if (q == pred) // restart                  if (q == pred) // restart
812                      break;                      break;
813                    if (q == null || !q.isData)
814                        return false;
815                  Object x = q.get();                  Object x = q.get();
816                  if (x != null && x != q && o.equals(x) &&                  if (x != null && x != q && o.equals(x) &&
817                      q.compareAndSet(x, q)) {                      q.compareAndSet(x, q)) {
# Line 791  Line 824 
824      }      }
825    
826      /**      /**
827         * Always returns {@code Integer.MAX_VALUE} because a
828         * {@code LinkedTransferQueue} is not capacity constrained.
829         *
830         * @return {@code Integer.MAX_VALUE} (as specified by
831         *         {@link BlockingQueue#remainingCapacity()})
832         */
833        public int remainingCapacity() {
834            return Integer.MAX_VALUE;
835        }
836    
837        /**
838       * Save the state to a stream (that is, serialize it).       * Save the state to a stream (that is, serialize it).
839       *       *
840       * @serialData All of the elements (each an {@code E}) in       * @serialData All of the elements (each an {@code E}) in

Legend:
Removed from v.1.4  
changed lines
  Added in v.1.5

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8