exte
}
/**
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block.
+ *
+ * @throws NullPointerException if the specified element is null
*/
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (Thread.interrupted()) throw new InterruptedException();
- xfer(e, NOWAIT, 0);
+ public void put(E e) {
+ xfer(e, true, ASYNC, 0);
}
/**
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block or
+ * return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
*/
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (Thread.interrupted()) throw new InterruptedException();
- xfer(e, NOWAIT, 0);
+ public boolean offer(E e, long timeout, TimeUnit unit) {
+ xfer(e, true, ASYNC, 0);
return true;
}
/**
- * @throws NullPointerException {@inheritDoc}
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- xfer(e, NOWAIT, 0);
+ xfer(e, true, ASYNC, 0);
return true;
}
/**
- * @throws NullPointerException {@inheritDoc}
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
+ *
+ * @return {@code true} (as specified by {@link Collection#add})
+ * @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
- if (e == null) throw new NullPointerException();
- xfer(e, NOWAIT, 0);
+ xfer(e, true, ASYNC, 0);
return true;
}
/**
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean tryTransfer(E e) {
+ return xfer(e, true, NOW, 0) == null;
+ }
+
+ /**
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer.
+ *
+ * @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (xfer(e, WAIT, 0) == null) {
- Thread.interrupted();
+ if (xfer(e, true, SYNC, 0) != null) {
+ Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
/**
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
+ *
+ * @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
+ if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
- /**
- * @throws NullPointerException {@inheritDoc}
- */
- public boolean tryTransfer(E e) {
- if (e == null) throw new NullPointerException();
- return fulfill(e) != null;
- }
-
- /**
- * @throws InterruptedException {@inheritDoc}
- */
public E take() throws InterruptedException {
- Object e = xfer(null, WAIT, 0);
+ E e = xfer(null, false, SYNC, 0);
if (e != null)
- return (E) e;
+ return e;
Thread.interrupted();
throw new InterruptedException();
}
- /**
- * @throws InterruptedException {@inheritDoc}
- */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
+ E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return (E) e;
+ return e;
throw new InterruptedException();
}
public E poll() {
- return fulfill(null);
+ return xfer(null, false, NOW, 0);
}
/**
@@ -605,156 +1093,38 @@ public class LinkedTransferQueue exte
return n;
}
- // Traversal-based methods
-
/**
- * Returns head after performing any outstanding helping steps.
+ * Returns an iterator over the elements in this queue in proper
+ * sequence, from head to tail.
+ *
+ * The returned iterator is a "weakly consistent" iterator that
+ * will never throw
+ * {@link ConcurrentModificationException ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed
+ * to) reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
*/
- private Node traversalHead() {
- for (;;) {
- Node t = tail.get();
- Node h = head.get();
- if (h != null && t != null) {
- Node last = t.next;
- Node first = h.next;
- if (t == tail.get()) {
- if (last != null)
- tail.compareAndSet(t, last);
- else if (first != null) {
- Object x = first.get();
- if (x == first)
- advanceHead(h, first);
- else
- return h;
- }
- else
- return h;
- }
- }
- reclean();
- }
- }
-
-
public Iterator iterator() {
return new Itr();
}
- /**
- * Iterators. Basic strategy is to traverse list, treating
- * non-data (i.e., request) nodes as terminating list.
- * Once a valid data node is found, the item is cached
- * so that the next call to next() will return it even
- * if subsequently removed.
- */
- class Itr implements Iterator {
- Node next; // node to return next
- Node pnext; // predecessor of next
- Node snext; // successor of next
- Node curr; // last returned node, for remove()
- Node pcurr; // predecessor of curr, for remove()
- E nextItem; // Cache of next item, once committed to in next
-
- Itr() {
- findNext();
- }
-
- /**
- * Ensures next points to next valid node, or null if none.
- */
- void findNext() {
- for (;;) {
- Node pred = pnext;
- Node q = next;
- if (pred == null || pred == q) {
- pred = traversalHead();
- q = pred.next;
- }
- if (q == null || !q.isData) {
- next = null;
- return;
- }
- Object x = q.get();
- Node s = q.next;
- if (x != null && q != x && q != s) {
- nextItem = (E) x;
- snext = s;
- pnext = pred;
- next = q;
- return;
- }
- pnext = q;
- next = s;
- }
- }
-
- public boolean hasNext() {
- return next != null;
- }
-
- public E next() {
- if (next == null) throw new NoSuchElementException();
- pcurr = pnext;
- curr = next;
- pnext = next;
- next = snext;
- E x = nextItem;
- findNext();
- return x;
- }
-
- public void remove() {
- Node p = curr;
- if (p == null)
- throw new IllegalStateException();
- Object x = p.get();
- if (x != null && x != p && p.compareAndSet(x, p))
- clean(pcurr, p);
- }
- }
-
public E peek() {
- for (;;) {
- Node h = traversalHead();
- Node p = h.next;
- if (p == null)
- return null;
- Object x = p.get();
- if (p != x) {
- if (!p.isData)
- return null;
- if (x != null)
- return (E) x;
- }
- }
+ return firstDataItem();
}
+ /**
+ * Returns {@code true} if this queue contains no elements.
+ *
+ * @return {@code true} if this queue contains no elements
+ */
public boolean isEmpty() {
- for (;;) {
- Node h = traversalHead();
- Node p = h.next;
- if (p == null)
- return true;
- Object x = p.get();
- if (p != x) {
- if (!p.isData)
- return true;
- if (x != null)
- return false;
- }
- }
+ return firstOfMode(true) == null;
}
public boolean hasWaitingConsumer() {
- for (;;) {
- Node h = traversalHead();
- Node p = h.next;
- if (p == null)
- return false;
- Object x = p.get();
- if (p != x)
- return !p.isData;
- }
+ return firstOfMode(false) != null;
}
/**
@@ -770,58 +1140,41 @@ public class LinkedTransferQueue exte
* @return the number of elements in this queue
*/
public int size() {
- int count = 0;
- Node h = traversalHead();
- for (Node p = h.next; p != null && p.isData; p = p.next) {
- Object x = p.get();
- if (x != null && x != p) {
- if (++count == Integer.MAX_VALUE) // saturated
- break;
- }
- }
- return count;
+ return countOfMode(true);
}
public int getWaitingConsumerCount() {
- int count = 0;
- Node h = traversalHead();
- for (Node p = h.next; p != null && !p.isData; p = p.next) {
- if (p.get() == null) {
- if (++count == Integer.MAX_VALUE)
- break;
- }
- }
- return count;
+ return countOfMode(false);
}
- public int remainingCapacity() {
- return Integer.MAX_VALUE;
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
+ public boolean remove(Object o) {
+ return findAndRemove(o);
}
- public boolean remove(Object o) {
- if (o == null)
- return false;
- for (;;) {
- Node pred = traversalHead();
- for (;;) {
- Node q = pred.next;
- if (q == null || !q.isData)
- return false;
- if (q == pred) // restart
- break;
- Object x = q.get();
- if (x != null && x != q && o.equals(x) &&
- q.compareAndSet(x, q)) {
- clean(pred, q);
- return true;
- }
- pred = q;
- }
- }
+ /**
+ * Always returns {@code Integer.MAX_VALUE} because a
+ * {@code LinkedTransferQueue} is not capacity constrained.
+ *
+ * @return {@code Integer.MAX_VALUE} (as specified by
+ * {@link BlockingQueue#remainingCapacity()})
+ */
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
}
/**
- * Save the state to a stream (that is, serialize it).
+ * Saves the state to a stream (that is, serializes it).
*
* @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
@@ -837,15 +1190,14 @@ public class LinkedTransferQueue exte
}
/**
- * Reconstitute the Queue instance from a stream (that is,
- * deserialize it).
+ * Reconstitutes the Queue instance from a stream (that is,
+ * deserializes it).
*
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
- resetHeadAndTail();
for (;;) {
@SuppressWarnings("unchecked") E item = (E) s.readObject();
if (item == null)
@@ -855,17 +1207,6 @@ public class LinkedTransferQueue exte
}
}
- // Support for resetting head/tail while deserializing
- private void resetHeadAndTail() {
- Node dummy = new Node(null, false);
- UNSAFE.putObjectVolatile(this, headOffset,
- new PaddedAtomicReference>(dummy));
- UNSAFE.putObjectVolatile(this, tailOffset,
- new PaddedAtomicReference>(dummy));
- UNSAFE.putObjectVolatile(this, cleanMeOffset,
- new PaddedAtomicReference>(null));
- }
-
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = getUnsafe();
@@ -876,7 +1217,6 @@ public class LinkedTransferQueue exte
private static final long cleanMeOffset =
objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
-
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class> klazz) {
try {
@@ -896,7 +1236,7 @@ public class LinkedTransferQueue exte
*
* @return a sun.misc.Unsafe
*/
- private static sun.misc.Unsafe getUnsafe() {
+ static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
@@ -916,4 +1256,5 @@ public class LinkedTransferQueue exte
}
}
}
+
}