ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.32 by jsr166, Wed Jul 29 02:19:56 2009 UTC vs.
Revision 1.35 by jsr166, Thu Jul 30 22:45:39 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.*;
10  
11   import java.util.AbstractQueue;
12   import java.util.Collection;
13 + import java.util.ConcurrentModificationException;
14   import java.util.Iterator;
15   import java.util.NoSuchElementException;
16 + import java.util.Queue;
17   import java.util.concurrent.locks.LockSupport;
18   import java.util.concurrent.atomic.AtomicReference;
19  
# Line 471 | Line 473 | public class LinkedTransferQueue<E> exte
473      }
474  
475      /**
476 <     * @throws InterruptedException {@inheritDoc}
477 <     * @throws NullPointerException {@inheritDoc}
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480       */
481 <    public void put(E e) throws InterruptedException {
482 <        if (e == null) throw new NullPointerException();
479 <        if (Thread.interrupted()) throw new InterruptedException();
480 <        xfer(e, NOWAIT, 0);
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485      /**
486 <     * @throws InterruptedException {@inheritDoc}
487 <     * @throws NullPointerException {@inheritDoc}
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493       */
494 <    public boolean offer(E e, long timeout, TimeUnit unit)
495 <        throws InterruptedException {
489 <        if (e == null) throw new NullPointerException();
490 <        if (Thread.interrupted()) throw new InterruptedException();
491 <        xfer(e, NOWAIT, 0);
492 <        return true;
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498      /**
499 <     * @throws NullPointerException {@inheritDoc}
499 >     * Inserts the specified element at the tail of this queue.
500 >     * As the queue is unbounded, this method will never return {@code false}.
501 >     *
502 >     * @return {@code true} (as specified by
503 >     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 >     * @throws NullPointerException if the specified element is null
505       */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
# Line 502 | Line 510 | public class LinkedTransferQueue<E> exte
510      }
511  
512      /**
513 <     * @throws NullPointerException {@inheritDoc}
513 >     * Inserts the specified element at the tail of this queue.
514 >     * As the queue is unbounded this method will never throw
515 >     * {@link IllegalStateException} or return {@code false}.
516 >     *
517 >     * @return {@code true} (as specified by {@link Collection#add})
518 >     * @throws NullPointerException if the specified element is null
519       */
520      public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the specified element immediately if there exists a
526 +     * consumer already waiting to receive it (in {@link #take} or
527 +     * timed {@link #poll(Object,long,TimeUnit) poll}), otherwise
528 +     * returning {@code false} without enqueuing the element.
529 +     *
530 +     * @throws NullPointerException if the specified element is null
531 +     */
532 +    public boolean tryTransfer(E e) {
533          if (e == null) throw new NullPointerException();
534 <        xfer(e, NOWAIT, 0);
510 <        return true;
534 >        return fulfill(e) != null;
535      }
536  
537      /**
538 +     * Inserts the specified element at the tail of this queue,
539 +     * waiting if necessary for the element to be received by a
540 +     * consumer invoking {@code take} or {@code poll}.
541 +     *
542       * @throws InterruptedException {@inheritDoc}
543 <     * @throws NullPointerException {@inheritDoc}
543 >     * @throws NullPointerException if the specified element is null
544       */
545      public void transfer(E e) throws InterruptedException {
546          if (e == null) throw new NullPointerException();
# Line 523 | Line 551 | public class LinkedTransferQueue<E> exte
551      }
552  
553      /**
554 +     * Inserts the specified element at the tail of this queue,
555 +     * waiting up to the specified wait time for the element to be
556 +     * received by a consumer invoking {@code take} or {@code poll}.
557 +     *
558       * @throws InterruptedException {@inheritDoc}
559 <     * @throws NullPointerException {@inheritDoc}
559 >     * @throws NullPointerException if the specified element is null
560       */
561      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
562          throws InterruptedException {
# Line 536 | Line 568 | public class LinkedTransferQueue<E> exte
568          throw new InterruptedException();
569      }
570  
539    /**
540     * @throws NullPointerException {@inheritDoc}
541     */
542    public boolean tryTransfer(E e) {
543        if (e == null) throw new NullPointerException();
544        return fulfill(e) != null;
545    }
546
547    /**
548     * @throws InterruptedException {@inheritDoc}
549     */
571      public E take() throws InterruptedException {
572 <        Object e = xfer(null, WAIT, 0);
572 >        E e = xfer(null, WAIT, 0);
573          if (e != null)
574 <            return (E) e;
574 >            return e;
575          Thread.interrupted();
576          throw new InterruptedException();
577      }
# Line 559 | Line 580 | public class LinkedTransferQueue<E> exte
580       * @throws InterruptedException {@inheritDoc}
581       */
582      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
583 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
583 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
584          if (e != null || !Thread.interrupted())
585 <            return (E) e;
585 >            return e;
586          throw new InterruptedException();
587      }
588  
# Line 635 | Line 656 | public class LinkedTransferQueue<E> exte
656          }
657      }
658  
659 <
659 >    /**
660 >     * Returns an iterator over the elements in this queue in proper
661 >     * sequence, from head to tail.
662 >     *
663 >     * <p>The returned iterator is a "weakly consistent" iterator that
664 >     * will never throw
665 >     * {@link ConcurrentModificationException ConcurrentModificationException},
666 >     * and guarantees to traverse elements as they existed upon
667 >     * construction of the iterator, and may (but is not guaranteed
668 >     * to) reflect any modifications subsequent to construction.
669 >     *
670 >     * @return an iterator over the elements in this queue in proper sequence
671 >     */
672      public Iterator<E> iterator() {
673          return new Itr();
674      }
# Line 650 | Line 683 | public class LinkedTransferQueue<E> exte
683      class Itr implements Iterator<E> {
684          Node<E> next;        // node to return next
685          Node<E> pnext;       // predecessor of next
653        Node<E> snext;       // successor of next
686          Node<E> curr;        // last returned node, for remove()
687          Node<E> pcurr;       // predecessor of curr, for remove()
688 <        E nextItem;        // Cache of next item, once committed to in next
688 >        E nextItem;          // Cache of next item, once committed to in next
689  
690          Itr() {
691 <            findNext();
691 >            advance();
692          }
693  
694          /**
695 <         * Ensures next points to next valid node, or null if none.
695 >         * Moves to next valid node and returns item to return for
696 >         * next(), or null if no such.
697           */
698 <        void findNext() {
698 >        private E advance() {
699 >            pcurr = pnext;
700 >            curr = next;
701 >            E item = nextItem;
702 >
703              for (;;) {
704 <                Node<E> pred = pnext;
705 <                Node<E> q = next;
706 <                if (pred == null || pred == q) {
670 <                    pred = traversalHead();
671 <                    q = pred.next;
672 <                }
673 <                if (q == null || !q.isData) {
704 >                pnext = (next == null) ? traversalHead() : next;
705 >                next = pnext.next;
706 >                if (next == pnext) {
707                      next = null;
708 <                    return;
708 >                    continue;  // restart
709                  }
710 <                Object x = q.get();
711 <                Node<E> s = q.next;
712 <                if (x != null && q != x && q != s) {
710 >                if (next == null)
711 >                    break;
712 >                Object x = next.get();
713 >                if (x != null && x != next) {
714                      nextItem = (E) x;
715 <                    snext = s;
682 <                    pnext = pred;
683 <                    next = q;
684 <                    return;
715 >                    break;
716                  }
686                pnext = q;
687                next = s;
717              }
718 +            return item;
719          }
720  
721          public boolean hasNext() {
# Line 693 | Line 723 | public class LinkedTransferQueue<E> exte
723          }
724  
725          public E next() {
726 <            if (next == null) throw new NoSuchElementException();
727 <            pcurr = pnext;
728 <            curr = next;
699 <            pnext = next;
700 <            next = snext;
701 <            E x = nextItem;
702 <            findNext();
703 <            return x;
726 >            if (next == null)
727 >                throw new NoSuchElementException();
728 >            return advance();
729          }
730  
731          public void remove() {
# Line 770 | Line 795 | public class LinkedTransferQueue<E> exte
795       * @return the number of elements in this queue
796       */
797      public int size() {
798 <        int count = 0;
799 <        Node<E> h = traversalHead();
800 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
801 <            Object x = p.get();
802 <            if (x != null && x != p) {
803 <                if (++count == Integer.MAX_VALUE) // saturated
798 >        for (;;) {
799 >            int count = 0;
800 >            Node<E> pred = traversalHead();
801 >            for (;;) {
802 >                Node<E> q = pred.next;
803 >                if (q == pred) // restart
804                      break;
805 +                if (q == null || !q.isData)
806 +                    return count;
807 +                Object x = q.get();
808 +                if (x != null && x != q) {
809 +                    if (++count == Integer.MAX_VALUE) // saturated
810 +                        return count;
811 +                }
812 +                pred = q;
813              }
814          }
782        return count;
815      }
816  
817      public int getWaitingConsumerCount() {
818 <        int count = 0;
819 <        Node<E> h = traversalHead();
820 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
821 <            if (p.get() == null) {
822 <                if (++count == Integer.MAX_VALUE)
818 >        // converse of size -- count valid non-data nodes
819 >        for (;;) {
820 >            int count = 0;
821 >            Node<E> pred = traversalHead();
822 >            for (;;) {
823 >                Node<E> q = pred.next;
824 >                if (q == pred) // restart
825                      break;
826 +                if (q == null || q.isData)
827 +                    return count;
828 +                Object x = q.get();
829 +                if (x == null) {
830 +                    if (++count == Integer.MAX_VALUE) // saturated
831 +                        return count;
832 +                }
833 +                pred = q;
834              }
835          }
794        return count;
795    }
796
797    public int remainingCapacity() {
798        return Integer.MAX_VALUE;
836      }
837  
838      public boolean remove(Object o) {
# Line 805 | Line 842 | public class LinkedTransferQueue<E> exte
842              Node<E> pred = traversalHead();
843              for (;;) {
844                  Node<E> q = pred.next;
808                if (q == null || !q.isData)
809                    return false;
845                  if (q == pred) // restart
846                      break;
847 +                if (q == null || !q.isData)
848 +                    return false;
849                  Object x = q.get();
850                  if (x != null && x != q && o.equals(x) &&
851                      q.compareAndSet(x, q)) {
# Line 821 | Line 858 | public class LinkedTransferQueue<E> exte
858      }
859  
860      /**
861 +     * Always returns {@code Integer.MAX_VALUE} because a
862 +     * {@code LinkedTransferQueue} is not capacity constrained.
863 +     *
864 +     * @return {@code Integer.MAX_VALUE} (as specified by
865 +     *         {@link BlockingQueue#remainingCapacity()})
866 +     */
867 +    public int remainingCapacity() {
868 +        return Integer.MAX_VALUE;
869 +    }
870 +
871 +    /**
872       * Save the state to a stream (that is, serialize it).
873       *
874       * @serialData All of the elements (each an {@code E}) in

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines