exte
}
/**
- * @throws NullPointerException {@inheritDoc}
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
+ *
+ * @return {@code true} (as specified by {@link Collection#add})
+ * @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
+ return offer(e);
+ }
+
+ /**
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean tryTransfer(E e) {
if (e == null) throw new NullPointerException();
- xfer(e, NOWAIT, 0);
- return true;
+ return fulfill(e) != null;
}
/**
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer.
+ *
+ * @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
@@ -495,8 +553,18 @@ public class LinkedTransferQueue exte
}
/**
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ * More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
+ *
+ * @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
@@ -508,32 +576,18 @@ public class LinkedTransferQueue exte
throw new InterruptedException();
}
- /**
- * @throws NullPointerException {@inheritDoc}
- */
- public boolean tryTransfer(E e) {
- if (e == null) throw new NullPointerException();
- return fulfill(e) != null;
- }
-
- /**
- * @throws InterruptedException {@inheritDoc}
- */
public E take() throws InterruptedException {
- Object e = xfer(null, WAIT, 0);
+ E e = xfer(null, WAIT, 0);
if (e != null)
- return (E) e;
+ return e;
Thread.interrupted();
throw new InterruptedException();
}
- /**
- * @throws InterruptedException {@inheritDoc}
- */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
+ E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return (E) e;
+ return e;
throw new InterruptedException();
}
@@ -586,28 +640,38 @@ public class LinkedTransferQueue exte
for (;;) {
Node t = tail.get();
Node h = head.get();
- if (h != null && t != null) {
- Node last = t.next;
- Node first = h.next;
- if (t == tail.get()) {
- if (last != null)
- tail.compareAndSet(t, last);
- else if (first != null) {
- Object x = first.get();
- if (x == first)
- advanceHead(h, first);
- else
- return h;
- }
+ Node last = t.next;
+ Node first = h.next;
+ if (t == tail.get()) {
+ if (last != null)
+ tail.compareAndSet(t, last);
+ else if (first != null) {
+ Object x = first.get();
+ if (x == first)
+ advanceHead(h, first);
else
return h;
}
+ else
+ return h;
}
reclean();
}
}
-
+ /**
+ * Returns an iterator over the elements in this queue in proper
+ * sequence, from head to tail.
+ *
+ * The returned iterator is a "weakly consistent" iterator that
+ * will never throw
+ * {@link ConcurrentModificationException ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed
+ * to) reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
+ */
public Iterator iterator() {
return new Itr();
}
@@ -622,42 +686,39 @@ public class LinkedTransferQueue exte
class Itr implements Iterator {
Node next; // node to return next
Node pnext; // predecessor of next
- Node snext; // successor of next
Node curr; // last returned node, for remove()
Node pcurr; // predecessor of curr, for remove()
- E nextItem; // Cache of next item, once committed to in next
+ E nextItem; // Cache of next item, once committed to in next
Itr() {
- findNext();
+ advance();
}
/**
- * Ensures next points to next valid node, or null if none.
+ * Moves to next valid node and returns item to return for
+ * next(), or null if no such.
*/
- void findNext() {
+ private E advance() {
+ pcurr = pnext;
+ curr = next;
+ E item = nextItem;
+
for (;;) {
- Node pred = pnext;
- Node q = next;
- if (pred == null || pred == q) {
- pred = traversalHead();
- q = pred.next;
- }
- if (q == null || !q.isData) {
+ pnext = (next == null) ? traversalHead() : next;
+ next = pnext.next;
+ if (next == pnext) {
next = null;
- return;
+ continue; // restart
}
- Object x = q.get();
- Node s = q.next;
- if (x != null && q != x && q != s) {
+ if (next == null)
+ break;
+ Object x = next.get();
+ if (x != null && x != next) {
nextItem = (E) x;
- snext = s;
- pnext = pred;
- next = q;
- return;
+ break;
}
- pnext = q;
- next = s;
}
+ return item;
}
public boolean hasNext() {
@@ -665,14 +726,9 @@ public class LinkedTransferQueue exte
}
public E next() {
- if (next == null) throw new NoSuchElementException();
- pcurr = pnext;
- curr = next;
- pnext = next;
- next = snext;
- E x = nextItem;
- findNext();
- return x;
+ if (next == null)
+ throw new NoSuchElementException();
+ return advance();
}
public void remove() {
@@ -701,6 +757,11 @@ public class LinkedTransferQueue exte
}
}
+ /**
+ * Returns {@code true} if this queue contains no elements.
+ *
+ * @return {@code true} if this queue contains no elements
+ */
public boolean isEmpty() {
for (;;) {
Node h = traversalHead();
@@ -742,34 +803,57 @@ public class LinkedTransferQueue exte
* @return the number of elements in this queue
*/
public int size() {
- int count = 0;
- Node h = traversalHead();
- for (Node p = h.next; p != null && p.isData; p = p.next) {
- Object x = p.get();
- if (x != null && x != p) {
- if (++count == Integer.MAX_VALUE) // saturated
+ for (;;) {
+ int count = 0;
+ Node pred = traversalHead();
+ for (;;) {
+ Node q = pred.next;
+ if (q == pred) // restart
break;
+ if (q == null || !q.isData)
+ return count;
+ Object x = q.get();
+ if (x != null && x != q) {
+ if (++count == Integer.MAX_VALUE) // saturated
+ return count;
+ }
+ pred = q;
}
}
- return count;
}
public int getWaitingConsumerCount() {
- int count = 0;
- Node h = traversalHead();
- for (Node p = h.next; p != null && !p.isData; p = p.next) {
- if (p.get() == null) {
- if (++count == Integer.MAX_VALUE)
+ // converse of size -- count valid non-data nodes
+ for (;;) {
+ int count = 0;
+ Node pred = traversalHead();
+ for (;;) {
+ Node q = pred.next;
+ if (q == pred) // restart
break;
+ if (q == null || q.isData)
+ return count;
+ Object x = q.get();
+ if (x == null) {
+ if (++count == Integer.MAX_VALUE) // saturated
+ return count;
+ }
+ pred = q;
}
}
- return count;
- }
-
- public int remainingCapacity() {
- return Integer.MAX_VALUE;
}
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
public boolean remove(Object o) {
if (o == null)
return false;
@@ -777,10 +861,10 @@ public class LinkedTransferQueue exte
Node pred = traversalHead();
for (;;) {
Node q = pred.next;
- if (q == null || !q.isData)
- return false;
if (q == pred) // restart
break;
+ if (q == null || !q.isData)
+ return false;
Object x = q.get();
if (x != null && x != q && o.equals(x) &&
q.compareAndSet(x, q)) {
@@ -793,6 +877,17 @@ public class LinkedTransferQueue exte
}
/**
+ * Always returns {@code Integer.MAX_VALUE} because a
+ * {@code LinkedTransferQueue} is not capacity constrained.
+ *
+ * @return {@code Integer.MAX_VALUE} (as specified by
+ * {@link BlockingQueue#remainingCapacity()})
+ */
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ /**
* Save the state to a stream (that is, serialize it).
*
* @serialData All of the elements (each an {@code E}) in
@@ -842,13 +937,15 @@ public class LinkedTransferQueue exte
private static final sun.misc.Unsafe UNSAFE = getUnsafe();
private static final long headOffset =
- objectFieldOffset("head", LinkedTransferQueue.class);
+ objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
private static final long tailOffset =
- objectFieldOffset("tail", LinkedTransferQueue.class);
+ objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
private static final long cleanMeOffset =
- objectFieldOffset("cleanMe", LinkedTransferQueue.class);
+ objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
+
- private static long objectFieldOffset(String field, Class> klazz) {
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {