ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.34 by jsr166, Thu Jul 30 17:30:26 2009 UTC vs.
Revision 1.44 by jsr166, Tue Aug 4 20:32:16 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.*;
10  
11   import java.util.AbstractQueue;
12   import java.util.Collection;
13 + import java.util.ConcurrentModificationException;
14   import java.util.Iterator;
15   import java.util.NoSuchElementException;
16 + import java.util.Queue;
17   import java.util.concurrent.locks.LockSupport;
18   import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21 < * An unbounded {@linkplain TransferQueue} based on linked nodes.
21 > * An unbounded {@link TransferQueue} based on linked nodes.
22   * This queue orders elements FIFO (first-in-first-out) with respect
23   * to any given producer.  The <em>head</em> of the queue is that
24   * element that has been on the queue the longest time for some
# Line 217 | Line 219 | public class LinkedTransferQueue<E> exte
219              Node<E> t = tail.get();
220              Node<E> h = head.get();
221  
222 <            if (t != null && (t == h || t.isData == isData)) {
222 >            if (t == h || t.isData == isData) {
223                  if (s == null)
224                      s = new Node<E>(e, isData);
225                  Node<E> last = t.next;
# Line 229 | Line 231 | public class LinkedTransferQueue<E> exte
231                      tail.compareAndSet(t, s);
232                      return awaitFulfill(t, s, e, mode, nanos);
233                  }
234 <            }
233 <
234 <            else if (h != null) {
234 >            } else {
235                  Node<E> first = h.next;
236                  if (t == tail.get() && first != null &&
237                      advanceHead(h, first)) {
# Line 259 | Line 259 | public class LinkedTransferQueue<E> exte
259              Node<E> t = tail.get();
260              Node<E> h = head.get();
261  
262 <            if (t != null && (t == h || t.isData == isData)) {
262 >            if (t == h || t.isData == isData) {
263                  Node<E> last = t.next;
264                  if (t == tail.get()) {
265                      if (last != null)
# Line 267 | Line 267 | public class LinkedTransferQueue<E> exte
267                      else
268                          return null;
269                  }
270 <            }
271 <            else if (h != null) {
270 >            } else {
271                  Node<E> first = h.next;
272                  if (t == tail.get() &&
273                      first != null &&
# Line 292 | Line 291 | public class LinkedTransferQueue<E> exte
291       * @param e the comparison value for checking match
292       * @param mode mode
293       * @param nanos timeout value
294 <     * @return matched item, or s if cancelled
294 >     * @return matched item, or null if cancelled
295       */
296      private E awaitFulfill(Node<E> pred, Node<E> s, E e,
297                             int mode, long nanos) {
# Line 330 | Line 329 | public class LinkedTransferQueue<E> exte
329              }
330              if (spins < 0) {
331                  Node<E> h = head.get(); // only spin if at head
332 <                spins = ((h != null && h.next == s) ?
333 <                         ((mode == TIMEOUT) ?
334 <                          maxTimedSpins : maxUntimedSpins) : 0);
332 >                spins = ((h.next != s) ? 0 :
333 >                         (mode == TIMEOUT) ? maxTimedSpins :
334 >                         maxUntimedSpins);
335              }
336              if (spins > 0)
337                  --spins;
# Line 358 | Line 357 | public class LinkedTransferQueue<E> exte
357          for (;;) {
358              Node<E> h = head.get();
359              Node<E> first = h.next;
360 <            if (first != null && first.next == first) { // help advance
360 >            if (first != null && first.get() == first) { // help advance
361                  advanceHead(h, first);
362                  continue;
363              }
# Line 471 | Line 470 | public class LinkedTransferQueue<E> exte
470      }
471  
472      /**
473 <     * @throws InterruptedException {@inheritDoc}
474 <     * @throws NullPointerException {@inheritDoc}
473 >     * Inserts the specified element at the tail of this queue.
474 >     * As the queue is unbounded, this method will never block.
475 >     *
476 >     * @throws NullPointerException if the specified element is null
477       */
478 <    public void put(E e) throws InterruptedException {
479 <        if (e == null) throw new NullPointerException();
479 <        if (Thread.interrupted()) throw new InterruptedException();
480 <        xfer(e, NOWAIT, 0);
478 >    public void put(E e) {
479 >        offer(e);
480      }
481  
482      /**
483 <     * @throws InterruptedException {@inheritDoc}
484 <     * @throws NullPointerException {@inheritDoc}
483 >     * Inserts the specified element at the tail of this queue.
484 >     * As the queue is unbounded, this method will never block or
485 >     * return {@code false}.
486 >     *
487 >     * @return {@code true} (as specified by
488 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
489 >     * @throws NullPointerException if the specified element is null
490       */
491 <    public boolean offer(E e, long timeout, TimeUnit unit)
492 <        throws InterruptedException {
489 <        if (e == null) throw new NullPointerException();
490 <        if (Thread.interrupted()) throw new InterruptedException();
491 <        xfer(e, NOWAIT, 0);
492 <        return true;
491 >    public boolean offer(E e, long timeout, TimeUnit unit) {
492 >        return offer(e);
493      }
494  
495      /**
496 <     * @throws NullPointerException {@inheritDoc}
496 >     * Inserts the specified element at the tail of this queue.
497 >     * As the queue is unbounded, this method will never return {@code false}.
498 >     *
499 >     * @return {@code true} (as specified by
500 >     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
501 >     * @throws NullPointerException if the specified element is null
502       */
503      public boolean offer(E e) {
504          if (e == null) throw new NullPointerException();
# Line 502 | Line 507 | public class LinkedTransferQueue<E> exte
507      }
508  
509      /**
510 <     * @throws NullPointerException {@inheritDoc}
510 >     * Inserts the specified element at the tail of this queue.
511 >     * As the queue is unbounded, this method will never throw
512 >     * {@link IllegalStateException} or return {@code false}.
513 >     *
514 >     * @return {@code true} (as specified by {@link Collection#add})
515 >     * @throws NullPointerException if the specified element is null
516       */
517      public boolean add(E e) {
518 +        return offer(e);
519 +    }
520 +
521 +    /**
522 +     * Transfers the element to a waiting consumer immediately, if possible.
523 +     *
524 +     * <p>More precisely, transfers the specified element immediately
525 +     * if there exists a consumer already waiting to receive it (in
526 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
527 +     * otherwise returning {@code false} without enqueuing the element.
528 +     *
529 +     * @throws NullPointerException if the specified element is null
530 +     */
531 +    public boolean tryTransfer(E e) {
532          if (e == null) throw new NullPointerException();
533 <        xfer(e, NOWAIT, 0);
510 <        return true;
533 >        return fulfill(e) != null;
534      }
535  
536      /**
537 <     * @throws InterruptedException {@inheritDoc}
538 <     * @throws NullPointerException {@inheritDoc}
537 >     * Transfers the element to a consumer, waiting if necessary to do so.
538 >     *
539 >     * <p>More precisely, transfers the specified element immediately
540 >     * if there exists a consumer already waiting to receive it (in
541 >     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
542 >     * else inserts the specified element at the tail of this queue
543 >     * and waits until the element is received by a consumer.
544 >     *
545 >     * @throws NullPointerException if the specified element is null
546       */
547      public void transfer(E e) throws InterruptedException {
548          if (e == null) throw new NullPointerException();
# Line 523 | Line 553 | public class LinkedTransferQueue<E> exte
553      }
554  
555      /**
556 <     * @throws InterruptedException {@inheritDoc}
557 <     * @throws NullPointerException {@inheritDoc}
556 >     * Transfers the element to a consumer if it is possible to do so
557 >     * before the timeout elapses.
558 >     *
559 >     * <p>More precisely, transfers the specified element immediately
560 >     * if there exists a consumer already waiting to receive it (in
561 >     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
562 >     * else inserts the specified element at the tail of this queue
563 >     * and waits until the element is received by a consumer,
564 >     * returning {@code false} if the specified wait time elapses
565 >     * before the element can be transferred.
566 >     *
567 >     * @throws NullPointerException if the specified element is null
568       */
569      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
570          throws InterruptedException {
# Line 536 | Line 576 | public class LinkedTransferQueue<E> exte
576          throw new InterruptedException();
577      }
578  
539    /**
540     * @throws NullPointerException {@inheritDoc}
541     */
542    public boolean tryTransfer(E e) {
543        if (e == null) throw new NullPointerException();
544        return fulfill(e) != null;
545    }
546
547    /**
548     * @throws InterruptedException {@inheritDoc}
549     */
579      public E take() throws InterruptedException {
580          E e = xfer(null, WAIT, 0);
581          if (e != null)
# Line 555 | Line 584 | public class LinkedTransferQueue<E> exte
584          throw new InterruptedException();
585      }
586  
558    /**
559     * @throws InterruptedException {@inheritDoc}
560     */
587      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
588          E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
589          if (e != null || !Thread.interrupted())
# Line 614 | Line 640 | public class LinkedTransferQueue<E> exte
640          for (;;) {
641              Node<E> t = tail.get();
642              Node<E> h = head.get();
643 <            if (h != null && t != null) {
644 <                Node<E> last = t.next;
645 <                Node<E> first = h.next;
646 <                if (t == tail.get()) {
647 <                    if (last != null)
648 <                        tail.compareAndSet(t, last);
649 <                    else if (first != null) {
650 <                        Object x = first.get();
651 <                        if (x == first)
626 <                            advanceHead(h, first);
627 <                        else
628 <                            return h;
629 <                    }
643 >            Node<E> last = t.next;
644 >            Node<E> first = h.next;
645 >            if (t == tail.get()) {
646 >                if (last != null)
647 >                    tail.compareAndSet(t, last);
648 >                else if (first != null) {
649 >                    Object x = first.get();
650 >                    if (x == first)
651 >                        advanceHead(h, first);
652                      else
653                          return h;
654                  }
655 +                else
656 +                    return h;
657              }
658              reclean();
659          }
660      }
661  
662 +    /**
663 +     * Returns an iterator over the elements in this queue in proper
664 +     * sequence, from head to tail.
665 +     *
666 +     * <p>The returned iterator is a "weakly consistent" iterator that
667 +     * will never throw
668 +     * {@link ConcurrentModificationException ConcurrentModificationException},
669 +     * and guarantees to traverse elements as they existed upon
670 +     * construction of the iterator, and may (but is not guaranteed
671 +     * to) reflect any modifications subsequent to construction.
672 +     *
673 +     * @return an iterator over the elements in this queue in proper sequence
674 +     */
675      public Iterator<E> iterator() {
676          return new Itr();
677      }
# Line 689 | Line 726 | public class LinkedTransferQueue<E> exte
726          }
727  
728          public E next() {
729 <            if (next == null)
729 >            if (next == null)
730                  throw new NoSuchElementException();
731              return advance();
732          }
# Line 720 | Line 757 | public class LinkedTransferQueue<E> exte
757          }
758      }
759  
760 +    /**
761 +     * Returns {@code true} if this queue contains no elements.
762 +     *
763 +     * @return {@code true} if this queue contains no elements
764 +     */
765      public boolean isEmpty() {
766          for (;;) {
767              Node<E> h = traversalHead();
# Line 801 | Line 843 | public class LinkedTransferQueue<E> exte
843          }
844      }
845  
846 +    /**
847 +     * Removes a single instance of the specified element from this queue,
848 +     * if it is present.  More formally, removes an element {@code e} such
849 +     * that {@code o.equals(e)}, if this queue contains one or more such
850 +     * elements.
851 +     * Returns {@code true} if this queue contained the specified element
852 +     * (or equivalently, if this queue changed as a result of the call).
853 +     *
854 +     * @param o element to be removed from this queue, if present
855 +     * @return {@code true} if this queue changed as a result of the call
856 +     */
857      public boolean remove(Object o) {
858          if (o == null)
859              return false;
# Line 823 | Line 876 | public class LinkedTransferQueue<E> exte
876          }
877      }
878  
879 +    /**
880 +     * Always returns {@code Integer.MAX_VALUE} because a
881 +     * {@code LinkedTransferQueue} is not capacity constrained.
882 +     *
883 +     * @return {@code Integer.MAX_VALUE} (as specified by
884 +     *         {@link BlockingQueue#remainingCapacity()})
885 +     */
886      public int remainingCapacity() {
887          return Integer.MAX_VALUE;
888      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines