--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/30 17:30:26 1.34 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/08/04 20:32:16 1.44 @@ -10,13 +10,15 @@ import java.util.concurrent.*; import java.util.AbstractQueue; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Queue; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.atomic.AtomicReference; /** - * An unbounded {@linkplain TransferQueue} based on linked nodes. + * An unbounded {@link TransferQueue} based on linked nodes. * This queue orders elements FIFO (first-in-first-out) with respect * to any given producer. The head of the queue is that * element that has been on the queue the longest time for some @@ -217,7 +219,7 @@ public class LinkedTransferQueue exte Node t = tail.get(); Node h = head.get(); - if (t != null && (t == h || t.isData == isData)) { + if (t == h || t.isData == isData) { if (s == null) s = new Node(e, isData); Node last = t.next; @@ -229,9 +231,7 @@ public class LinkedTransferQueue exte tail.compareAndSet(t, s); return awaitFulfill(t, s, e, mode, nanos); } - } - - else if (h != null) { + } else { Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { @@ -259,7 +259,7 @@ public class LinkedTransferQueue exte Node t = tail.get(); Node h = head.get(); - if (t != null && (t == h || t.isData == isData)) { + if (t == h || t.isData == isData) { Node last = t.next; if (t == tail.get()) { if (last != null) @@ -267,8 +267,7 @@ public class LinkedTransferQueue exte else return null; } - } - else if (h != null) { + } else { Node first = h.next; if (t == tail.get() && first != null && @@ -292,7 +291,7 @@ public class LinkedTransferQueue exte * @param e the comparison value for checking match * @param mode mode * @param nanos timeout value - * @return matched item, or s if cancelled + * @return matched item, or null if cancelled */ private E awaitFulfill(Node pred, Node s, E e, int mode, long nanos) { @@ -330,9 +329,9 @@ public class LinkedTransferQueue exte } if (spins < 0) { Node h = head.get(); // only spin if at head - spins = ((h != null && h.next == s) ? - ((mode == TIMEOUT) ? - maxTimedSpins : maxUntimedSpins) : 0); + spins = ((h.next != s) ? 0 : + (mode == TIMEOUT) ? maxTimedSpins : + maxUntimedSpins); } if (spins > 0) --spins; @@ -358,7 +357,7 @@ public class LinkedTransferQueue exte for (;;) { Node h = head.get(); Node first = h.next; - if (first != null && first.next == first) { // help advance + if (first != null && first.get() == first) { // help advance advanceHead(h, first); continue; } @@ -471,29 +470,35 @@ public class LinkedTransferQueue exte } /** - * @throws InterruptedException {@inheritDoc} - * @throws NullPointerException {@inheritDoc} + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block. + * + * @throws NullPointerException if the specified element is null */ - public void put(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); + public void put(E e) { + offer(e); } /** - * @throws InterruptedException {@inheritDoc} - * @throws NullPointerException {@inheritDoc} + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block or + * return {@code false}. + * + * @return {@code true} (as specified by + * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null */ - public boolean offer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); - return true; + public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); } /** - * @throws NullPointerException {@inheritDoc} + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never return {@code false}. + * + * @return {@code true} (as specified by + * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); @@ -502,17 +507,42 @@ public class LinkedTransferQueue exte } /** - * @throws NullPointerException {@inheritDoc} + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never throw + * {@link IllegalStateException} or return {@code false}. + * + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null */ public boolean add(E e) { + return offer(e); + } + + /** + * Transfers the element to a waiting consumer immediately, if possible. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * otherwise returning {@code false} without enqueuing the element. + * + * @throws NullPointerException if the specified element is null + */ + public boolean tryTransfer(E e) { if (e == null) throw new NullPointerException(); - xfer(e, NOWAIT, 0); - return true; + return fulfill(e) != null; } /** - * @throws InterruptedException {@inheritDoc} - * @throws NullPointerException {@inheritDoc} + * Transfers the element to a consumer, waiting if necessary to do so. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * else inserts the specified element at the tail of this queue + * and waits until the element is received by a consumer. + * + * @throws NullPointerException if the specified element is null */ public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -523,8 +553,18 @@ public class LinkedTransferQueue exte } /** - * @throws InterruptedException {@inheritDoc} - * @throws NullPointerException {@inheritDoc} + * Transfers the element to a consumer if it is possible to do so + * before the timeout elapses. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * else inserts the specified element at the tail of this queue + * and waits until the element is received by a consumer, + * returning {@code false} if the specified wait time elapses + * before the element can be transferred. + * + * @throws NullPointerException if the specified element is null */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { @@ -536,17 +576,6 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } - /** - * @throws NullPointerException {@inheritDoc} - */ - public boolean tryTransfer(E e) { - if (e == null) throw new NullPointerException(); - return fulfill(e) != null; - } - - /** - * @throws InterruptedException {@inheritDoc} - */ public E take() throws InterruptedException { E e = xfer(null, WAIT, 0); if (e != null) @@ -555,9 +584,6 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } - /** - * @throws InterruptedException {@inheritDoc} - */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) @@ -614,27 +640,38 @@ public class LinkedTransferQueue exte for (;;) { Node t = tail.get(); Node h = head.get(); - if (h != null && t != null) { - Node last = t.next; - Node first = h.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); - else if (first != null) { - Object x = first.get(); - if (x == first) - advanceHead(h, first); - else - return h; - } + Node last = t.next; + Node first = h.next; + if (t == tail.get()) { + if (last != null) + tail.compareAndSet(t, last); + else if (first != null) { + Object x = first.get(); + if (x == first) + advanceHead(h, first); else return h; } + else + return h; } reclean(); } } + /** + * Returns an iterator over the elements in this queue in proper + * sequence, from head to tail. + * + *

The returned iterator is a "weakly consistent" iterator that + * will never throw + * {@link ConcurrentModificationException ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed + * to) reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence + */ public Iterator iterator() { return new Itr(); } @@ -689,7 +726,7 @@ public class LinkedTransferQueue exte } public E next() { - if (next == null) + if (next == null) throw new NoSuchElementException(); return advance(); } @@ -720,6 +757,11 @@ public class LinkedTransferQueue exte } } + /** + * Returns {@code true} if this queue contains no elements. + * + * @return {@code true} if this queue contains no elements + */ public boolean isEmpty() { for (;;) { Node h = traversalHead(); @@ -801,6 +843,17 @@ public class LinkedTransferQueue exte } } + /** + * Removes a single instance of the specified element from this queue, + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such + * elements. + * Returns {@code true} if this queue contained the specified element + * (or equivalently, if this queue changed as a result of the call). + * + * @param o element to be removed from this queue, if present + * @return {@code true} if this queue changed as a result of the call + */ public boolean remove(Object o) { if (o == null) return false; @@ -823,6 +876,13 @@ public class LinkedTransferQueue exte } } + /** + * Always returns {@code Integer.MAX_VALUE} because a + * {@code LinkedTransferQueue} is not capacity constrained. + * + * @return {@code Integer.MAX_VALUE} (as specified by + * {@link BlockingQueue#remainingCapacity()}) + */ public int remainingCapacity() { return Integer.MAX_VALUE; }