exte
addAll(c);
}
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (Thread.interrupted()) throw new InterruptedException();
- xfer(e, NOWAIT, 0);
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public void put(E e) {
+ xfer(e, true, ASYNC, 0);
}
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (Thread.interrupted()) throw new InterruptedException();
- xfer(e, NOWAIT, 0);
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block or
+ * return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit) {
+ xfer(e, true, ASYNC, 0);
return true;
}
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
+ */
public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- xfer(e, NOWAIT, 0);
+ xfer(e, true, ASYNC, 0);
+ return true;
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
+ *
+ * @return {@code true} (as specified by {@link Collection#add})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean add(E e) {
+ xfer(e, true, ASYNC, 0);
return true;
}
+ /**
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean tryTransfer(E e) {
+ return xfer(e, true, NOW, 0) == null;
+ }
+
+ /**
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
public void transfer(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (xfer(e, WAIT, 0) == null) {
- Thread.interrupted();
+ if (xfer(e, true, SYNC, 0) != null) {
+ Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
+ /**
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
+ if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
- public boolean tryTransfer(E e) {
- if (e == null) throw new NullPointerException();
- return fulfill(e) != null;
- }
-
public E take() throws InterruptedException {
- Object e = xfer(null, WAIT, 0);
+ E e = xfer(null, false, SYNC, 0);
if (e != null)
- return (E)e;
+ return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
+ E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return (E)e;
+ return e;
throw new InterruptedException();
}
public E poll() {
- return (E)fulfill(null);
+ return xfer(null, false, NOW, 0);
}
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
public int drainTo(Collection super E> c) {
if (c == null)
throw new NullPointerException();
@@ -498,6 +1084,10 @@ public class LinkedTransferQueue exte
return n;
}
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
public int drainTo(Collection super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
@@ -512,143 +1102,42 @@ public class LinkedTransferQueue exte
return n;
}
- // Traversal-based methods
-
/**
- * Return head after performing any outstanding helping steps
+ * Returns an iterator over the elements in this queue in proper
+ * sequence, from head to tail.
+ *
+ * The returned iterator is a "weakly consistent" iterator that
+ * will never throw
+ * {@link ConcurrentModificationException ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed
+ * to) reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
*/
- private QNode traversalHead() {
- for (;;) {
- QNode t = tail.get();
- QNode h = head.get();
- if (h != null && t != null) {
- QNode last = t.next;
- QNode first = h.next;
- if (t == tail.get()) {
- if (last != null)
- tail.compareAndSet(t, last);
- else if (first != null) {
- Object x = first.get();
- if (x == first)
- advanceHead(h, first);
- else
- return h;
- }
- else
- return h;
- }
- }
- }
- }
-
-
public Iterator iterator() {
return new Itr();
}
- /**
- * Iterators. Basic strategy is to traverse list, treating
- * non-data (i.e., request) nodes as terminating list.
- * Once a valid data node is found, the item is cached
- * so that the next call to next() will return it even
- * if subsequently removed.
- */
- class Itr implements Iterator {
- QNode nextNode; // Next node to return next
- QNode currentNode; // last returned node, for remove()
- QNode prevNode; // predecessor of last returned node
- E nextItem; // Cache of next item, once commited to in next
-
- Itr() {
- nextNode = traversalHead();
- advance();
- }
-
- E advance() {
- prevNode = currentNode;
- currentNode = nextNode;
- E x = nextItem;
-
- QNode p = nextNode.next;
- for (;;) {
- if (p == null || !p.isData) {
- nextNode = null;
- nextItem = null;
- return x;
- }
- Object item = p.get();
- if (item != p && item != null) {
- nextNode = p;
- nextItem = (E)item;
- return x;
- }
- prevNode = p;
- p = p.next;
- }
- }
-
- public boolean hasNext() {
- return nextNode != null;
- }
-
- public E next() {
- if (nextNode == null) throw new NoSuchElementException();
- return advance();
- }
-
- public void remove() {
- QNode p = currentNode;
- QNode prev = prevNode;
- if (prev == null || p == null)
- throw new IllegalStateException();
- Object x = p.get();
- if (x != null && x != p && p.compareAndSet(x, p))
- clean(prev, p);
- }
- }
-
public E peek() {
- for (;;) {
- QNode h = traversalHead();
- QNode p = h.next;
- if (p == null)
- return null;
- Object x = p.get();
- if (p != x) {
- if (!p.isData)
- return null;
- if (x != null)
- return (E)x;
- }
- }
+ return firstDataItem();
}
+ /**
+ * Returns {@code true} if this queue contains no elements.
+ *
+ * @return {@code true} if this queue contains no elements
+ */
public boolean isEmpty() {
- for (;;) {
- QNode h = traversalHead();
- QNode p = h.next;
- if (p == null)
- return true;
- Object x = p.get();
- if (p != x) {
- if (!p.isData)
- return true;
- if (x != null)
- return false;
- }
+ for (Node p = head; p != null; p = succ(p)) {
+ if (!p.isMatched())
+ return !p.isData;
}
+ return true;
}
public boolean hasWaitingConsumer() {
- for (;;) {
- QNode h = traversalHead();
- QNode p = h.next;
- if (p == null)
- return false;
- Object x = p.get();
- if (p != x)
- return !p.isData;
- }
+ return firstOfMode(false) != null;
}
/**
@@ -664,36 +1153,41 @@ public class LinkedTransferQueue exte
* @return the number of elements in this queue
*/
public int size() {
- int count = 0;
- QNode h = traversalHead();
- for (QNode p = h.next; p != null && p.isData; p = p.next) {
- Object x = p.get();
- if (x != null && x != p) {
- if (++count == Integer.MAX_VALUE) // saturated
- break;
- }
- }
- return count;
+ return countOfMode(true);
}
public int getWaitingConsumerCount() {
- int count = 0;
- QNode h = traversalHead();
- for (QNode p = h.next; p != null && !p.isData; p = p.next) {
- if (p.get() == null) {
- if (++count == Integer.MAX_VALUE)
- break;
- }
- }
- return count;
+ return countOfMode(false);
}
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
+ public boolean remove(Object o) {
+ return findAndRemove(o);
+ }
+
+ /**
+ * Always returns {@code Integer.MAX_VALUE} because a
+ * {@code LinkedTransferQueue} is not capacity constrained.
+ *
+ * @return {@code Integer.MAX_VALUE} (as specified by
+ * {@link BlockingQueue#remainingCapacity()})
+ */
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
/**
- * Save the state to a stream (that is, serialize it).
+ * Saves the state to a stream (that is, serializes it).
*
* @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
@@ -702,23 +1196,23 @@ public class LinkedTransferQueue exte
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
- for (Iterator it = iterator(); it.hasNext(); )
- s.writeObject(it.next());
+ for (E e : this)
+ s.writeObject(e);
// Use trailing null as sentinel
s.writeObject(null);
}
/**
- * Reconstitute the Queue instance from a stream (that is,
- * deserialize it).
+ * Reconstitutes the Queue instance from a stream (that is,
+ * deserializes it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
- resetHeadAndTail();
for (;;) {
- E item = (E)s.readObject();
+ @SuppressWarnings("unchecked") E item = (E) s.readObject();
if (item == null)
break;
else
@@ -726,40 +1220,53 @@ public class LinkedTransferQueue exte
}
}
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
+ private static final long tailOffset =
+ objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
+ private static final long sweepVotesOffset =
+ objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
- // Support for resetting head/tail while deserializing
- private void resetHeadAndTail() {
- QNode dummy = new QNode(null, false);
- _unsafe.putObjectVolatile(this, headOffset,
- new PaddedAtomicReference(dummy));
- _unsafe.putObjectVolatile(this, tailOffset,
- new PaddedAtomicReference(dummy));
- _unsafe.putObjectVolatile(this, cleanMeOffset,
- new PaddedAtomicReference(null));
- }
-
- // Temporary Unsafe mechanics for preliminary release
- private static final Unsafe _unsafe;
- private static final long headOffset;
- private static final long tailOffset;
- private static final long cleanMeOffset;
- static {
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class> klazz) {
try {
- if (LinkedTransferQueue.class.getClassLoader() != null) {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- _unsafe = (Unsafe)f.get(null);
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ java.lang.reflect.Field f = sun.misc
+ .Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (sun.misc.Unsafe) f.get(null);
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
}
- else
- _unsafe = Unsafe.getUnsafe();
- headOffset = _unsafe.objectFieldOffset
- (LinkedTransferQueue.class.getDeclaredField("head"));
- tailOffset = _unsafe.objectFieldOffset
- (LinkedTransferQueue.class.getDeclaredField("tail"));
- cleanMeOffset = _unsafe.objectFieldOffset
- (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
- } catch (Exception e) {
- throw new RuntimeException("Could not initialize intrinsics", e);
}
}