ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.4 by jsr166, Wed Jul 29 02:35:47 2009 UTC vs.
Revision 1.5 by jsr166, Fri Jul 31 20:41:13 2009 UTC

# Line 8 | Line 8 | package java.util.concurrent;
8  
9   import java.util.AbstractQueue;
10   import java.util.Collection;
11 + import java.util.ConcurrentModificationException;
12   import java.util.Iterator;
13   import java.util.NoSuchElementException;
14 + import java.util.Queue;
15   import java.util.concurrent.locks.LockSupport;
16   import java.util.concurrent.atomic.AtomicReference;
17  
# Line 262 | Line 264 | public class LinkedTransferQueue<E> exte
264       * @param e the comparison value for checking match
265       * @param mode mode
266       * @param nanos timeout value
267 <     * @return matched item, or s if cancelled
267 >     * @return matched item, or null if cancelled
268       */
269      private E awaitFulfill(Node<E> pred, Node<E> s, E e,
270                             int mode, long nanos) {
# Line 328 | Line 330 | public class LinkedTransferQueue<E> exte
330          for (;;) {
331              Node<E> h = head.get();
332              Node<E> first = h.next;
333 <            if (first != null && first.next == first) { // help advance
333 >            if (first != null && first.get() == first) { // help advance
334                  advanceHead(h, first);
335                  continue;
336              }
# Line 441 | Line 443 | public class LinkedTransferQueue<E> exte
443      }
444  
445      /**
446 <     * @throws InterruptedException {@inheritDoc}
447 <     * @throws NullPointerException {@inheritDoc}
446 >     * Inserts the specified element at the tail of this queue.
447 >     * As the queue is unbounded, this method will never block.
448 >     *
449 >     * @throws NullPointerException if the specified element is null
450       */
451 <    public void put(E e) throws InterruptedException {
452 <        if (e == null) throw new NullPointerException();
449 <        if (Thread.interrupted()) throw new InterruptedException();
450 <        xfer(e, NOWAIT, 0);
451 >    public void put(E e) {
452 >        offer(e);
453      }
454  
455      /**
456 <     * @throws InterruptedException {@inheritDoc}
457 <     * @throws NullPointerException {@inheritDoc}
456 >     * Inserts the specified element at the tail of this queue.
457 >     * As the queue is unbounded, this method will never block or
458 >     * return {@code false}.
459 >     *
460 >     * @return {@code true} (as specified by
461 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
462 >     * @throws NullPointerException if the specified element is null
463       */
464 <    public boolean offer(E e, long timeout, TimeUnit unit)
465 <        throws InterruptedException {
459 <        if (e == null) throw new NullPointerException();
460 <        if (Thread.interrupted()) throw new InterruptedException();
461 <        xfer(e, NOWAIT, 0);
462 <        return true;
464 >    public boolean offer(E e, long timeout, TimeUnit unit) {
465 >        return offer(e);
466      }
467  
468      /**
469 <     * @throws NullPointerException {@inheritDoc}
469 >     * Inserts the specified element at the tail of this queue.
470 >     * As the queue is unbounded, this method will never return {@code false}.
471 >     *
472 >     * @return {@code true} (as specified by
473 >     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
474 >     * @throws NullPointerException if the specified element is null
475       */
476      public boolean offer(E e) {
477          if (e == null) throw new NullPointerException();
# Line 472 | Line 480 | public class LinkedTransferQueue<E> exte
480      }
481  
482      /**
483 <     * @throws NullPointerException {@inheritDoc}
483 >     * Inserts the specified element at the tail of this queue.
484 >     * As the queue is unbounded, this method will never throw
485 >     * {@link IllegalStateException} or return {@code false}.
486 >     *
487 >     * @return {@code true} (as specified by {@link Collection#add})
488 >     * @throws NullPointerException if the specified element is null
489       */
490      public boolean add(E e) {
491 +        return offer(e);
492 +    }
493 +
494 +    /**
495 +     * Transfers the specified element immediately if there exists a
496 +     * consumer already waiting to receive it (in {@link #take} or
497 +     * timed {@link #poll(long,TimeUnit) poll}), otherwise
498 +     * returning {@code false} without enqueuing the element.
499 +     *
500 +     * @throws NullPointerException if the specified element is null
501 +     */
502 +    public boolean tryTransfer(E e) {
503          if (e == null) throw new NullPointerException();
504 <        xfer(e, NOWAIT, 0);
480 <        return true;
504 >        return fulfill(e) != null;
505      }
506  
507      /**
508 <     * @throws InterruptedException {@inheritDoc}
509 <     * @throws NullPointerException {@inheritDoc}
508 >     * Inserts the specified element at the tail of this queue,
509 >     * waiting if necessary for the element to be received by a
510 >     * consumer invoking {@code take} or {@code poll}.
511 >     *
512 >     * @throws NullPointerException if the specified element is null
513       */
514      public void transfer(E e) throws InterruptedException {
515          if (e == null) throw new NullPointerException();
# Line 493 | Line 520 | public class LinkedTransferQueue<E> exte
520      }
521  
522      /**
523 <     * @throws InterruptedException {@inheritDoc}
524 <     * @throws NullPointerException {@inheritDoc}
523 >     * Inserts the specified element at the tail of this queue,
524 >     * waiting up to the specified wait time if necessary for the
525 >     * element to be received by a consumer invoking {@code take} or
526 >     * {@code poll}.
527 >     *
528 >     * @throws NullPointerException if the specified element is null
529       */
530      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
531          throws InterruptedException {
# Line 506 | Line 537 | public class LinkedTransferQueue<E> exte
537          throw new InterruptedException();
538      }
539  
509    /**
510     * @throws NullPointerException {@inheritDoc}
511     */
512    public boolean tryTransfer(E e) {
513        if (e == null) throw new NullPointerException();
514        return fulfill(e) != null;
515    }
516
517    /**
518     * @throws InterruptedException {@inheritDoc}
519     */
540      public E take() throws InterruptedException {
541 <        Object e = xfer(null, WAIT, 0);
541 >        E e = xfer(null, WAIT, 0);
542          if (e != null)
543 <            return (E) e;
543 >            return e;
544          Thread.interrupted();
545          throw new InterruptedException();
546      }
547  
528    /**
529     * @throws InterruptedException {@inheritDoc}
530     */
548      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
549 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
549 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
550          if (e != null || !Thread.interrupted())
551 <            return (E) e;
551 >            return e;
552          throw new InterruptedException();
553      }
554  
# Line 605 | Line 622 | public class LinkedTransferQueue<E> exte
622          }
623      }
624  
625 <
625 >    /**
626 >     * Returns an iterator over the elements in this queue in proper
627 >     * sequence, from head to tail.
628 >     *
629 >     * <p>The returned iterator is a "weakly consistent" iterator that
630 >     * will never throw
631 >     * {@link ConcurrentModificationException ConcurrentModificationException},
632 >     * and guarantees to traverse elements as they existed upon
633 >     * construction of the iterator, and may (but is not guaranteed
634 >     * to) reflect any modifications subsequent to construction.
635 >     *
636 >     * @return an iterator over the elements in this queue in proper sequence
637 >     */
638      public Iterator<E> iterator() {
639          return new Itr();
640      }
# Line 620 | Line 649 | public class LinkedTransferQueue<E> exte
649      class Itr implements Iterator<E> {
650          Node<E> next;        // node to return next
651          Node<E> pnext;       // predecessor of next
623        Node<E> snext;       // successor of next
652          Node<E> curr;        // last returned node, for remove()
653          Node<E> pcurr;       // predecessor of curr, for remove()
654 <        E nextItem;        // Cache of next item, once committed to in next
654 >        E nextItem;          // Cache of next item, once committed to in next
655  
656          Itr() {
657 <            findNext();
657 >            advance();
658          }
659  
660          /**
661 <         * Ensures next points to next valid node, or null if none.
661 >         * Moves to next valid node and returns item to return for
662 >         * next(), or null if no such.
663           */
664 <        void findNext() {
664 >        private E advance() {
665 >            pcurr = pnext;
666 >            curr = next;
667 >            E item = nextItem;
668 >
669              for (;;) {
670 <                Node<E> pred = pnext;
671 <                Node<E> q = next;
672 <                if (pred == null || pred == q) {
640 <                    pred = traversalHead();
641 <                    q = pred.next;
642 <                }
643 <                if (q == null || !q.isData) {
670 >                pnext = (next == null) ? traversalHead() : next;
671 >                next = pnext.next;
672 >                if (next == pnext) {
673                      next = null;
674 <                    return;
674 >                    continue;  // restart
675                  }
676 <                Object x = q.get();
677 <                Node<E> s = q.next;
678 <                if (x != null && q != x && q != s) {
676 >                if (next == null)
677 >                    break;
678 >                Object x = next.get();
679 >                if (x != null && x != next) {
680                      nextItem = (E) x;
681 <                    snext = s;
652 <                    pnext = pred;
653 <                    next = q;
654 <                    return;
681 >                    break;
682                  }
656                pnext = q;
657                next = s;
683              }
684 +            return item;
685          }
686  
687          public boolean hasNext() {
# Line 663 | Line 689 | public class LinkedTransferQueue<E> exte
689          }
690  
691          public E next() {
692 <            if (next == null) throw new NoSuchElementException();
693 <            pcurr = pnext;
694 <            curr = next;
669 <            pnext = next;
670 <            next = snext;
671 <            E x = nextItem;
672 <            findNext();
673 <            return x;
692 >            if (next == null)
693 >                throw new NoSuchElementException();
694 >            return advance();
695          }
696  
697          public void remove() {
# Line 740 | Line 761 | public class LinkedTransferQueue<E> exte
761       * @return the number of elements in this queue
762       */
763      public int size() {
764 <        int count = 0;
765 <        Node<E> h = traversalHead();
766 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
767 <            Object x = p.get();
768 <            if (x != null && x != p) {
769 <                if (++count == Integer.MAX_VALUE) // saturated
764 >        for (;;) {
765 >            int count = 0;
766 >            Node<E> pred = traversalHead();
767 >            for (;;) {
768 >                Node<E> q = pred.next;
769 >                if (q == pred) // restart
770                      break;
771 +                if (q == null || !q.isData)
772 +                    return count;
773 +                Object x = q.get();
774 +                if (x != null && x != q) {
775 +                    if (++count == Integer.MAX_VALUE) // saturated
776 +                        return count;
777 +                }
778 +                pred = q;
779              }
780          }
752        return count;
781      }
782  
783      public int getWaitingConsumerCount() {
784 <        int count = 0;
785 <        Node<E> h = traversalHead();
786 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
787 <            if (p.get() == null) {
788 <                if (++count == Integer.MAX_VALUE)
784 >        // converse of size -- count valid non-data nodes
785 >        for (;;) {
786 >            int count = 0;
787 >            Node<E> pred = traversalHead();
788 >            for (;;) {
789 >                Node<E> q = pred.next;
790 >                if (q == pred) // restart
791                      break;
792 +                if (q == null || q.isData)
793 +                    return count;
794 +                Object x = q.get();
795 +                if (x == null) {
796 +                    if (++count == Integer.MAX_VALUE) // saturated
797 +                        return count;
798 +                }
799 +                pred = q;
800              }
801          }
764        return count;
765    }
766
767    public int remainingCapacity() {
768        return Integer.MAX_VALUE;
802      }
803  
804      public boolean remove(Object o) {
# Line 775 | Line 808 | public class LinkedTransferQueue<E> exte
808              Node<E> pred = traversalHead();
809              for (;;) {
810                  Node<E> q = pred.next;
778                if (q == null || !q.isData)
779                    return false;
811                  if (q == pred) // restart
812                      break;
813 +                if (q == null || !q.isData)
814 +                    return false;
815                  Object x = q.get();
816                  if (x != null && x != q && o.equals(x) &&
817                      q.compareAndSet(x, q)) {
# Line 791 | Line 824 | public class LinkedTransferQueue<E> exte
824      }
825  
826      /**
827 +     * Always returns {@code Integer.MAX_VALUE} because a
828 +     * {@code LinkedTransferQueue} is not capacity constrained.
829 +     *
830 +     * @return {@code Integer.MAX_VALUE} (as specified by
831 +     *         {@link BlockingQueue#remainingCapacity()})
832 +     */
833 +    public int remainingCapacity() {
834 +        return Integer.MAX_VALUE;
835 +    }
836 +
837 +    /**
838       * Save the state to a stream (that is, serialize it).
839       *
840       * @serialData All of the elements (each an {@code E}) in

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines