exte
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
- return offer(e);
+ xfer(e, true, ASYNC, 0);
+ return true;
}
/**
- * Transfers the specified element immediately if there exists a
- * consumer already waiting to receive it (in {@link #take} or
- * timed {@link #poll(long,TimeUnit) poll}), otherwise
- * returning {@code false} without enqueuing the element.
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e) {
- if (e == null) throw new NullPointerException();
- return fulfill(e) != null;
+ return xfer(e, true, NOW, 0) == null;
}
/**
- * Inserts the specified element at the tail of this queue,
- * waiting if necessary for the element to be received by a
- * consumer invoking {@code take} or {@code poll}.
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer.
*
* @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (xfer(e, WAIT, 0) == null) {
- Thread.interrupted();
+ if (xfer(e, true, SYNC, 0) != null) {
+ Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
/**
- * Inserts the specified element at the tail of this queue,
- * waiting up to the specified wait time if necessary for the
- * element to be received by a consumer invoking {@code take} or
- * {@code poll}.
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
+ if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
@@ -568,22 +990,22 @@ public class LinkedTransferQueue exte
}
public E take() throws InterruptedException {
- E e = xfer(null, WAIT, 0);
+ Object e = xfer(null, false, SYNC, 0);
if (e != null)
- return e;
+ return (E)e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
+ Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return e;
+ return (E)e;
throw new InterruptedException();
}
public E poll() {
- return fulfill(null);
+ return (E)xfer(null, false, NOW, 0);
}
/**
@@ -622,36 +1044,6 @@ public class LinkedTransferQueue exte
return n;
}
- // Traversal-based methods
-
- /**
- * Returns head after performing any outstanding helping steps.
- */
- private Node traversalHead() {
- for (;;) {
- Node t = tail.get();
- Node h = head.get();
- if (h != null && t != null) {
- Node last = t.next;
- Node first = h.next;
- if (t == tail.get()) {
- if (last != null)
- tail.compareAndSet(t, last);
- else if (first != null) {
- Object x = first.get();
- if (x == first)
- advanceHead(h, first);
- else
- return h;
- }
- else
- return h;
- }
- }
- reclean();
- }
- }
-
/**
* Returns an iterator over the elements in this queue in proper
* sequence, from head to tail.
@@ -669,113 +1061,21 @@ public class LinkedTransferQueue exte
return new Itr();
}
- /**
- * Iterators. Basic strategy is to traverse list, treating
- * non-data (i.e., request) nodes as terminating list.
- * Once a valid data node is found, the item is cached
- * so that the next call to next() will return it even
- * if subsequently removed.
- */
- class Itr implements Iterator {
- Node next; // node to return next
- Node pnext; // predecessor of next
- Node curr; // last returned node, for remove()
- Node pcurr; // predecessor of curr, for remove()
- E nextItem; // Cache of next item, once committed to in next
-
- Itr() {
- advance();
- }
-
- /**
- * Moves to next valid node and returns item to return for
- * next(), or null if no such.
- */
- private E advance() {
- pcurr = pnext;
- curr = next;
- E item = nextItem;
-
- for (;;) {
- pnext = (next == null) ? traversalHead() : next;
- next = pnext.next;
- if (next == pnext) {
- next = null;
- continue; // restart
- }
- if (next == null)
- break;
- Object x = next.get();
- if (x != null && x != next) {
- nextItem = (E) x;
- break;
- }
- }
- return item;
- }
-
- public boolean hasNext() {
- return next != null;
- }
-
- public E next() {
- if (next == null)
- throw new NoSuchElementException();
- return advance();
- }
-
- public void remove() {
- Node p = curr;
- if (p == null)
- throw new IllegalStateException();
- Object x = p.get();
- if (x != null && x != p && p.compareAndSet(x, p))
- clean(pcurr, p);
- }
- }
-
public E peek() {
- for (;;) {
- Node h = traversalHead();
- Node p = h.next;
- if (p == null)
- return null;
- Object x = p.get();
- if (p != x) {
- if (!p.isData)
- return null;
- if (x != null)
- return (E) x;
- }
- }
+ return (E) firstDataItem();
}
+ /**
+ * Returns {@code true} if this queue contains no elements.
+ *
+ * @return {@code true} if this queue contains no elements
+ */
public boolean isEmpty() {
- for (;;) {
- Node h = traversalHead();
- Node p = h.next;
- if (p == null)
- return true;
- Object x = p.get();
- if (p != x) {
- if (!p.isData)
- return true;
- if (x != null)
- return false;
- }
- }
+ return firstOfMode(true) == null;
}
public boolean hasWaitingConsumer() {
- for (;;) {
- Node h = traversalHead();
- Node p = h.next;
- if (p == null)
- return false;
- Object x = p.get();
- if (p != x)
- return !p.isData;
- }
+ return firstOfMode(false) != null;
}
/**
@@ -791,66 +1091,26 @@ public class LinkedTransferQueue exte
* @return the number of elements in this queue
*/
public int size() {
- for (;;) {
- int count = 0;
- Node pred = traversalHead();
- for (;;) {
- Node q = pred.next;
- if (q == pred) // restart
- break;
- if (q == null || !q.isData)
- return count;
- Object x = q.get();
- if (x != null && x != q) {
- if (++count == Integer.MAX_VALUE) // saturated
- return count;
- }
- pred = q;
- }
- }
+ return countOfMode(true);
}
public int getWaitingConsumerCount() {
- // converse of size -- count valid non-data nodes
- for (;;) {
- int count = 0;
- Node pred = traversalHead();
- for (;;) {
- Node q = pred.next;
- if (q == pred) // restart
- break;
- if (q == null || q.isData)
- return count;
- Object x = q.get();
- if (x == null) {
- if (++count == Integer.MAX_VALUE) // saturated
- return count;
- }
- pred = q;
- }
- }
+ return countOfMode(false);
}
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
public boolean remove(Object o) {
- if (o == null)
- return false;
- for (;;) {
- Node pred = traversalHead();
- for (;;) {
- Node q = pred.next;
- if (q == pred) // restart
- break;
- if (q == null || !q.isData)
- return false;
- Object x = q.get();
- if (x != null && x != q && o.equals(x) &&
- q.compareAndSet(x, q)) {
- clean(pred, q);
- return true;
- }
- pred = q;
- }
- }
+ return findAndRemove(o);
}
/**
@@ -865,7 +1125,7 @@ public class LinkedTransferQueue exte
}
/**
- * Save the state to a stream (that is, serialize it).
+ * Saves the state to a stream (that is, serializes it).
*
* @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
@@ -881,15 +1141,14 @@ public class LinkedTransferQueue exte
}
/**
- * Reconstitute the Queue instance from a stream (that is,
- * deserialize it).
+ * Reconstitutes the Queue instance from a stream (that is,
+ * deserializes it).
*
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
- resetHeadAndTail();
for (;;) {
@SuppressWarnings("unchecked") E item = (E) s.readObject();
if (item == null)
@@ -899,16 +1158,6 @@ public class LinkedTransferQueue exte
}
}
- // Support for resetting head/tail while deserializing
- private void resetHeadAndTail() {
- Node dummy = new Node(null, false);
- UNSAFE.putObjectVolatile(this, headOffset,
- new PaddedAtomicReference>(dummy));
- UNSAFE.putObjectVolatile(this, tailOffset,
- new PaddedAtomicReference>(dummy));
- UNSAFE.putObjectVolatile(this, cleanMeOffset,
- new PaddedAtomicReference>(null));
- }
// Unsafe mechanics
@@ -920,7 +1169,6 @@ public class LinkedTransferQueue exte
private static final long cleanMeOffset =
objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
-
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class> klazz) {
try {
@@ -933,13 +1181,6 @@ public class LinkedTransferQueue exte
}
}
- /**
- * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
- * Replace with a simple call to Unsafe.getUnsafe when integrating
- * into a jdk.
- *
- * @return a sun.misc.Unsafe
- */
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
@@ -960,4 +1201,5 @@ public class LinkedTransferQueue exte
}
}
}
+
}