--- jsr166/src/jsr166y/LinkedTransferQueue.java 2007/05/29 09:55:32 1.1
+++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/08/01 20:26:50 1.40
@@ -5,11 +5,17 @@
*/
package jsr166y;
+
import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.atomic.*;
-import java.util.*;
-import java.io.*;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.atomic.AtomicReference;
/**
* An unbounded {@linkplain TransferQueue} based on linked nodes.
@@ -19,7 +25,7 @@ import java.io.*;
* producer. The tail of the queue is that element that has
* been on the queue the shortest time for some producer.
*
- *
Beware that, unlike in most collections, the size
+ *
Beware that, unlike in most collections, the {@code size}
* method is NOT a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
@@ -39,29 +45,28 @@ import java.io.*;
*
* Java Collections Framework .
*
- * @since 1.5
+ * @since 1.7
* @author Doug Lea
* @param the type of elements held in this collection
- *
*/
public class LinkedTransferQueue extends AbstractQueue
implements TransferQueue, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L;
/*
- * This is still a work in prgress...
- *
* This class extends the approach used in FIFO-mode
* SynchronousQueues. See the internal documentation, as well as
* the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
* Lea & Scott
* (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
*
- * The main extension is to provide different Wait modes
- * for the main "xfer" method that puts or takes items.
- * These don't impact the basic dual-queue logic, but instead
- * control whether or how threads block upon insertion
- * of request or data nodes into the dual queue.
+ * The main extension is to provide different Wait modes for the
+ * main "xfer" method that puts or takes items. These don't
+ * impact the basic dual-queue logic, but instead control whether
+ * or how threads block upon insertion of request or data nodes
+ * into the dual queue. It also uses slightly different
+ * conventions for tracking whether nodes are off-list or
+ * cancelled.
*/
// Wait modes for xfer method
@@ -79,7 +84,7 @@ public class LinkedTransferQueue exte
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
- static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
+ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
@@ -94,29 +99,66 @@ public class LinkedTransferQueue exte
*/
static final long spinForTimeoutThreshold = 1000L;
- /**
- * Node class for LinkedTransferQueue. Opportunistically subclasses from
- * AtomicReference to represent item. Uses Object, not E, to allow
- * setting item to "this" after use, to avoid garbage
- * retention. Similarly, setting the next field to this is used as
- * sentinel that node is off list.
+ /**
+ * Node class for LinkedTransferQueue. Opportunistically
+ * subclasses from AtomicReference to represent item. Uses Object,
+ * not E, to allow setting item to "this" after use, to avoid
+ * garbage retention. Similarly, setting the next field to this is
+ * used as sentinel that node is off list.
*/
- static final class QNode extends AtomicReference {
- volatile QNode next;
+ static final class Node extends AtomicReference {
+ volatile Node next;
volatile Thread waiter; // to control park/unpark
final boolean isData;
- QNode(Object item, boolean isData) {
+
+ Node(E item, boolean isData) {
super(item);
this.isData = isData;
}
- static final AtomicReferenceFieldUpdater
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, QNode.class, "next");
+ // Unsafe mechanics
- boolean casNext(QNode cmp, QNode val) {
- return nextUpdater.compareAndSet(this, cmp, val);
+ private static final sun.misc.Unsafe UNSAFE = getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", Node.class);
+
+ final boolean casNext(Node cmp, Node val) {
+ return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
+ }
+
+ final void clearNext() {
+ UNSAFE.putOrderedObject(this, nextOffset, this);
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ java.lang.reflect.Field f = sun.misc
+ .Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (sun.misc.Unsafe) f.get(null);
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
+ }
+ }
}
+
+ private static final long serialVersionUID = -3375979862319811754L;
}
/**
@@ -128,57 +170,59 @@ public class LinkedTransferQueue exte
// enough padding for 64bytes with 4byte refs
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
PaddedAtomicReference(T r) { super(r); }
+ private static final long serialVersionUID = 8170090609809740854L;
}
- private final QNode dummy = new QNode(null, false);
- private final PaddedAtomicReference head =
- new PaddedAtomicReference(dummy);
- private final PaddedAtomicReference tail =
- new PaddedAtomicReference(dummy);
+ /** head of the queue */
+ private transient final PaddedAtomicReference> head;
+
+ /** tail of the queue */
+ private transient final PaddedAtomicReference> tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it cancelled.
*/
- private final PaddedAtomicReference cleanMe =
- new PaddedAtomicReference(null);
+ private transient final PaddedAtomicReference> cleanMe;
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
- private boolean advanceHead(QNode h, QNode nh) {
+ private boolean advanceHead(Node h, Node nh) {
if (h == head.get() && head.compareAndSet(h, nh)) {
- h.next = h; // forget old next
+ h.clearNext(); // forget old next
return true;
}
return false;
}
-
+
/**
* Puts or takes an item. Used for most queue operations (except
- * poll() and tryTransfer())
- * @param e the item or if null, signfies that this is a take
+ * poll() and tryTransfer()). See the similar code in
+ * SynchronousQueue for detailed explanation.
+ *
+ * @param e the item or if null, signifies that this is a take
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
* @return an item, or null on failure
*/
- private Object xfer(Object e, int mode, long nanos) {
+ private E xfer(E e, int mode, long nanos) {
boolean isData = (e != null);
- QNode s = null;
- final PaddedAtomicReference head = this.head;
- final PaddedAtomicReference tail = this.tail;
+ Node s = null;
+ final PaddedAtomicReference> head = this.head;
+ final PaddedAtomicReference> tail = this.tail;
for (;;) {
- QNode t = tail.get();
- QNode h = head.get();
+ Node t = tail.get();
+ Node h = head.get();
if (t != null && (t == h || t.isData == isData)) {
if (s == null)
- s = new QNode(e, isData);
- QNode last = t.next;
+ s = new Node(e, isData);
+ Node last = t.next;
if (last != null) {
if (t == tail.get())
tail.compareAndSet(t, last);
@@ -188,15 +232,15 @@ public class LinkedTransferQueue exte
return awaitFulfill(t, s, e, mode, nanos);
}
}
-
+
else if (h != null) {
- QNode first = h.next;
- if (t == tail.get() && first != null &&
+ Node first = h.next;
+ if (t == tail.get() && first != null &&
advanceHead(h, first)) {
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
- return isData? e : x;
+ return isData ? e : (E) x;
}
}
}
@@ -206,19 +250,19 @@ public class LinkedTransferQueue exte
/**
* Version of xfer for poll() and tryTransfer, which
- * simpifies control paths both here and in xfer
+ * simplifies control paths both here and in xfer.
*/
- private Object fulfill(Object e) {
+ private E fulfill(E e) {
boolean isData = (e != null);
- final PaddedAtomicReference head = this.head;
- final PaddedAtomicReference tail = this.tail;
+ final PaddedAtomicReference> head = this.head;
+ final PaddedAtomicReference> tail = this.tail;
for (;;) {
- QNode t = tail.get();
- QNode h = head.get();
+ Node t = tail.get();
+ Node h = head.get();
if (t != null && (t == h || t.isData == isData)) {
- QNode last = t.next;
+ Node last = t.next;
if (t == tail.get()) {
if (last != null)
tail.compareAndSet(t, last);
@@ -227,14 +271,14 @@ public class LinkedTransferQueue exte
}
}
else if (h != null) {
- QNode first = h.next;
- if (t == tail.get() &&
+ Node first = h.next;
+ if (t == tail.get() &&
first != null &&
advanceHead(h, first)) {
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
- return isData? e : x;
+ return isData ? e : (E) x;
}
}
}
@@ -250,14 +294,14 @@ public class LinkedTransferQueue exte
* @param e the comparison value for checking match
* @param mode mode
* @param nanos timeout value
- * @return matched item, or s if cancelled
+ * @return matched item, or null if cancelled
*/
- private Object awaitFulfill(QNode pred, QNode s, Object e,
- int mode, long nanos) {
+ private E awaitFulfill(Node pred, Node s, E e,
+ int mode, long nanos) {
if (mode == NOWAIT)
return null;
- long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
+ long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = -1; // set to desired spin count below
for (;;) {
@@ -266,16 +310,17 @@ public class LinkedTransferQueue exte
Object x = s.get();
if (x != e) { // Node was matched or cancelled
advanceHead(pred, s); // unlink if head
- if (x == s) // was cancelled
- return clean(pred, s);
- else if (x != null) {
+ if (x == s) { // was cancelled
+ clean(pred, s);
+ return null;
+ }
+ else if (x != null) {
s.set(s); // avoid garbage retention
- return x;
+ return (E) x;
}
else
return e;
}
-
if (mode == TIMEOUT) {
long now = System.nanoTime();
nanos -= now - lastTime;
@@ -286,9 +331,9 @@ public class LinkedTransferQueue exte
}
}
if (spins < 0) {
- QNode h = head.get(); // only spin if at head
+ Node h = head.get(); // only spin if at head
spins = ((h != null && h.next == s) ?
- (mode == TIMEOUT?
+ ((mode == TIMEOUT) ?
maxTimedSpins : maxUntimedSpins) : 0);
}
if (spins > 0)
@@ -296,14 +341,12 @@ public class LinkedTransferQueue exte
else if (s.waiter == null)
s.waiter = w;
else if (mode != TIMEOUT) {
- // LockSupport.park(this);
- LockSupport.park(); // allows run on java5
+ LockSupport.park(this);
s.waiter = null;
spins = -1;
}
else if (nanos > spinForTimeoutThreshold) {
- // LockSupport.parkNanos(this, nanos);
- LockSupport.parkNanos(nanos);
+ LockSupport.parkNanos(this, nanos);
s.waiter = null;
spins = -1;
}
@@ -311,107 +354,221 @@ public class LinkedTransferQueue exte
}
/**
+ * Returns validated tail for use in cleaning methods.
+ */
+ private Node getValidatedTail() {
+ for (;;) {
+ Node h = head.get();
+ Node first = h.next;
+ if (first != null && first.get() == first) { // help advance
+ advanceHead(h, first);
+ continue;
+ }
+ Node t = tail.get();
+ Node last = t.next;
+ if (t == tail.get()) {
+ if (last != null)
+ tail.compareAndSet(t, last); // help advance
+ else
+ return t;
+ }
+ }
+ }
+
+ /**
* Gets rid of cancelled node s with original predecessor pred.
- * @return null (to simplify use by callers)
+ *
+ * @param pred predecessor of cancelled node
+ * @param s the cancelled node
*/
- private Object clean(QNode pred, QNode s) {
+ private void clean(Node pred, Node s) {
Thread w = s.waiter;
if (w != null) { // Wake up thread
s.waiter = null;
if (w != Thread.currentThread())
LockSupport.unpark(w);
}
-
- for (;;) {
- if (pred.next != s) // already cleaned
- return null;
- QNode h = head.get();
- QNode hn = h.next; // Absorb cancelled first node as head
- if (hn != null && hn.next == hn) {
- advanceHead(h, hn);
- continue;
- }
- QNode t = tail.get(); // Ensure consistent read for tail
- if (t == h)
- return null;
- QNode tn = t.next;
- if (t != tail.get())
- continue;
- if (tn != null) { // Help advance tail
- tail.compareAndSet(t, tn);
- continue;
- }
- if (s != t) { // If not tail, try to unsplice
- QNode sn = s.next;
+
+ if (pred == null)
+ return;
+
+ /*
+ * At any given time, exactly one node on list cannot be
+ * deleted -- the last inserted node. To accommodate this, if
+ * we cannot delete s, we save its predecessor as "cleanMe",
+ * processing the previously saved version first. At least one
+ * of node s or the node previously saved can always be
+ * processed, so this always terminates.
+ */
+ while (pred.next == s) {
+ Node oldpred = reclean(); // First, help get rid of cleanMe
+ Node t = getValidatedTail();
+ if (s != t) { // If not tail, try to unsplice
+ Node sn = s.next; // s.next == s means s already off list
if (sn == s || pred.casNext(s, sn))
- return null;
+ break;
}
- QNode dp = cleanMe.get();
- if (dp != null) { // Try unlinking previous cancelled node
- QNode d = dp.next;
- QNode dn;
- if (d == null || // d is gone or
- d == dp || // d is off list or
- d.get() != d || // d not cancelled or
- (d != t && // d not tail and
- (dn = d.next) != null && // has successor
- dn != d && // that is on list
- dp.casNext(d, dn))) // d unspliced
- cleanMe.compareAndSet(dp, null);
- if (dp == pred)
- return null; // s is already saved node
- }
- else if (cleanMe.compareAndSet(null, pred))
- return null; // Postpone cleaning s
+ else if (oldpred == pred || // Already saved
+ (oldpred == null && cleanMe.compareAndSet(null, pred)))
+ break; // Postpone cleaning
}
}
-
+
/**
- * Creates an initially empty LinkedTransferQueue .
+ * Tries to unsplice the cancelled node held in cleanMe that was
+ * previously uncleanable because it was at tail.
+ *
+ * @return current cleanMe node (or null)
+ */
+ private Node reclean() {
+ /*
+ * cleanMe is, or at one time was, predecessor of cancelled
+ * node s that was the tail so could not be unspliced. If s
+ * is no longer the tail, try to unsplice if necessary and
+ * make cleanMe slot available. This differs from similar
+ * code in clean() because we must check that pred still
+ * points to a cancelled node that must be unspliced -- if
+ * not, we can (must) clear cleanMe without unsplicing.
+ * This can loop only due to contention on casNext or
+ * clearing cleanMe.
+ */
+ Node pred;
+ while ((pred = cleanMe.get()) != null) {
+ Node t = getValidatedTail();
+ Node s = pred.next;
+ if (s != t) {
+ Node sn;
+ if (s == null || s == pred || s.get() != s ||
+ (sn = s.next) == s || pred.casNext(s, sn))
+ cleanMe.compareAndSet(pred, null);
+ }
+ else // s is still tail; cannot clean
+ break;
+ }
+ return pred;
+ }
+
+ /**
+ * Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {
+ Node dummy = new Node(null, false);
+ head = new PaddedAtomicReference>(dummy);
+ tail = new PaddedAtomicReference>(dummy);
+ cleanMe = new PaddedAtomicReference>(null);
}
/**
- * Creates a LinkedTransferQueue
+ * Creates a {@code LinkedTransferQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
+ *
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedTransferQueue(Collection extends E> c) {
+ this();
addAll(c);
}
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (Thread.interrupted()) throw new InterruptedException();
- xfer(e, NOWAIT, 0);
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public void put(E e) {
+ offer(e);
}
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (Thread.interrupted()) throw new InterruptedException();
- xfer(e, NOWAIT, 0);
- return true;
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never block or
+ * return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit) {
+ return offer(e);
}
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never return {@code false}.
+ *
+ * @return {@code true} (as specified by
+ * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
+ * @throws NullPointerException if the specified element is null
+ */
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
xfer(e, NOWAIT, 0);
return true;
}
+ /**
+ * Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
+ *
+ * @return {@code true} (as specified by {@link Collection#add})
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean add(E e) {
+ return offer(e);
+ }
+
+ /**
+ * Transfers the element to a waiting consumer immediately, if possible.
+ *
+ * More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * otherwise returning {@code false} without enqueuing the element.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean tryTransfer(E e) {
+ if (e == null) throw new NullPointerException();
+ return fulfill(e) != null;
+ }
+
+ /**
+ * Transfers the element to a consumer, waiting if necessary to do so.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (xfer(e, WAIT, 0) == null) {
- Thread.interrupted();
+ Thread.interrupted();
throw new InterruptedException();
- }
+ }
}
+ /**
+ * Transfers the element to a consumer if it is possible to do so
+ * before the timeout elapses.
+ *
+ *
More precisely, transfers the specified element immediately
+ * if there exists a consumer already waiting to receive it (in
+ * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
+ * else inserts the specified element at the tail of this queue
+ * and waits until the element is received by a consumer,
+ * returning {@code false} if the specified wait time elapses
+ * before the element can be transferred.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
@@ -422,30 +579,29 @@ public class LinkedTransferQueue exte
throw new InterruptedException();
}
- public boolean tryTransfer(E e) {
- if (e == null) throw new NullPointerException();
- return fulfill(e) != null;
- }
-
public E take() throws InterruptedException {
- Object e = xfer(null, WAIT, 0);
+ E e = xfer(null, WAIT, 0);
if (e != null)
- return (E)e;
- Thread.interrupted();
+ return e;
+ Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
+ E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return (E)e;
+ return e;
throw new InterruptedException();
}
public E poll() {
- return (E)fulfill(null);
+ return fulfill(null);
}
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
public int drainTo(Collection super E> c) {
if (c == null)
throw new NullPointerException();
@@ -460,6 +616,10 @@ public class LinkedTransferQueue exte
return n;
}
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
public int drainTo(Collection super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
@@ -477,22 +637,22 @@ public class LinkedTransferQueue exte
// Traversal-based methods
/**
- * Return head after performing any outstanding helping steps
+ * Returns head after performing any outstanding helping steps.
*/
- private QNode traversalHead() {
+ private Node traversalHead() {
for (;;) {
- QNode t = tail.get();
- QNode h = head.get();
+ Node t = tail.get();
+ Node h = head.get();
if (h != null && t != null) {
- QNode last = t.next;
- QNode first = h.next;
+ Node last = t.next;
+ Node first = h.next;
if (t == tail.get()) {
- if (last != null)
+ if (last != null)
tail.compareAndSet(t, last);
else if (first != null) {
Object x = first.get();
- if (x == first)
- advanceHead(h, first);
+ if (x == first)
+ advanceHead(h, first);
else
return h;
}
@@ -500,79 +660,96 @@ public class LinkedTransferQueue exte
return h;
}
}
+ reclean();
}
}
-
+ /**
+ * Returns an iterator over the elements in this queue in proper
+ * sequence, from head to tail.
+ *
+ * The returned iterator is a "weakly consistent" iterator that
+ * will never throw
+ * {@link ConcurrentModificationException ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed
+ * to) reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
+ */
public Iterator iterator() {
return new Itr();
}
/**
- * Iterators. Basic strategy os to travers list, treating
+ * Iterators. Basic strategy is to traverse list, treating
* non-data (i.e., request) nodes as terminating list.
* Once a valid data node is found, the item is cached
* so that the next call to next() will return it even
* if subsequently removed.
*/
class Itr implements Iterator {
- QNode nextNode; // Next node to return next
- QNode currentNode; // last returned node, for remove()
- QNode prevNode; // predecessor of last returned node
- E nextItem; // Cache of next item, once commited to in next
-
+ Node next; // node to return next
+ Node pnext; // predecessor of next
+ Node curr; // last returned node, for remove()
+ Node pcurr; // predecessor of curr, for remove()
+ E nextItem; // Cache of next item, once committed to in next
+
Itr() {
- nextNode = traversalHead();
advance();
}
-
- E advance() {
- prevNode = currentNode;
- currentNode = nextNode;
- E x = nextItem;
-
- QNode p = nextNode.next;
+
+ /**
+ * Moves to next valid node and returns item to return for
+ * next(), or null if no such.
+ */
+ private E advance() {
+ pcurr = pnext;
+ curr = next;
+ E item = nextItem;
+
for (;;) {
- if (p == null || !p.isData) {
- nextNode = null;
- nextItem = null;
- return x;
- }
- Object item = p.get();
- if (item != p && item != null) {
- nextNode = p;
- nextItem = (E)item;
- return x;
- }
- prevNode = p;
- p = p.next;
+ pnext = (next == null) ? traversalHead() : next;
+ next = pnext.next;
+ if (next == pnext) {
+ next = null;
+ continue; // restart
+ }
+ if (next == null)
+ break;
+ Object x = next.get();
+ if (x != null && x != next) {
+ nextItem = (E) x;
+ break;
+ }
}
+ return item;
}
-
+
public boolean hasNext() {
- return nextNode != null;
+ return next != null;
}
-
+
public E next() {
- if (nextNode == null) throw new NoSuchElementException();
+ if (next == null)
+ throw new NoSuchElementException();
return advance();
}
-
+
public void remove() {
- QNode p = currentNode;
- QNode prev = prevNode;
- if (prev == null || p == null)
+ Node p = curr;
+ if (p == null)
throw new IllegalStateException();
Object x = p.get();
if (x != null && x != p && p.compareAndSet(x, p))
- clean(prev, p);
+ clean(pcurr, p);
}
}
public E peek() {
for (;;) {
- QNode h = traversalHead();
- QNode p = h.next;
+ Node h = traversalHead();
+ Node p = h.next;
if (p == null)
return null;
Object x = p.get();
@@ -580,27 +757,43 @@ public class LinkedTransferQueue exte
if (!p.isData)
return null;
if (x != null)
- return (E)x;
+ return (E) x;
+ }
+ }
+ }
+
+ public boolean isEmpty() {
+ for (;;) {
+ Node h = traversalHead();
+ Node p = h.next;
+ if (p == null)
+ return true;
+ Object x = p.get();
+ if (p != x) {
+ if (!p.isData)
+ return true;
+ if (x != null)
+ return false;
}
}
}
public boolean hasWaitingConsumer() {
for (;;) {
- QNode h = traversalHead();
- QNode p = h.next;
+ Node h = traversalHead();
+ Node p = h.next;
if (p == null)
return false;
Object x = p.get();
- if (p != x)
+ if (p != x)
return !p.isData;
}
}
-
+
/**
* Returns the number of elements in this queue. If this queue
- * contains more than Integer.MAX_VALUE elements, returns
- * Integer.MAX_VALUE .
+ * contains more than {@code Integer.MAX_VALUE} elements, returns
+ * {@code Integer.MAX_VALUE}.
*
* Beware that, unlike in most collections, this method is
* NOT a constant-time operation. Because of the
@@ -610,30 +803,75 @@ public class LinkedTransferQueue exte
* @return the number of elements in this queue
*/
public int size() {
- int count = 0;
- QNode h = traversalHead();
- for (QNode p = h.next; p != null && p.isData; p = p.next) {
- Object x = p.get();
- if (x != null && x != p) {
- if (++count == Integer.MAX_VALUE) // saturated
+ for (;;) {
+ int count = 0;
+ Node pred = traversalHead();
+ for (;;) {
+ Node q = pred.next;
+ if (q == pred) // restart
break;
+ if (q == null || !q.isData)
+ return count;
+ Object x = q.get();
+ if (x != null && x != q) {
+ if (++count == Integer.MAX_VALUE) // saturated
+ return count;
+ }
+ pred = q;
}
}
- return count;
}
public int getWaitingConsumerCount() {
- int count = 0;
- QNode h = traversalHead();
- for (QNode p = h.next; p != null && !p.isData; p = p.next) {
- if (p.get() == null) {
- if (++count == Integer.MAX_VALUE)
+ // converse of size -- count valid non-data nodes
+ for (;;) {
+ int count = 0;
+ Node pred = traversalHead();
+ for (;;) {
+ Node q = pred.next;
+ if (q == pred) // restart
+ break;
+ if (q == null || q.isData)
+ return count;
+ Object x = q.get();
+ if (x == null) {
+ if (++count == Integer.MAX_VALUE) // saturated
+ return count;
+ }
+ pred = q;
+ }
+ }
+ }
+
+ public boolean remove(Object o) {
+ if (o == null)
+ return false;
+ for (;;) {
+ Node pred = traversalHead();
+ for (;;) {
+ Node q = pred.next;
+ if (q == pred) // restart
break;
+ if (q == null || !q.isData)
+ return false;
+ Object x = q.get();
+ if (x != null && x != q && o.equals(x) &&
+ q.compareAndSet(x, q)) {
+ clean(pred, q);
+ return true;
+ }
+ pred = q;
}
}
- return count;
}
+ /**
+ * Always returns {@code Integer.MAX_VALUE} because a
+ * {@code LinkedTransferQueue} is not capacity constrained.
+ *
+ * @return {@code Integer.MAX_VALUE} (as specified by
+ * {@link BlockingQueue#remainingCapacity()})
+ */
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@@ -641,15 +879,15 @@ public class LinkedTransferQueue exte
/**
* Save the state to a stream (that is, serialize it).
*
- * @serialData All of the elements (each an E ) in
+ * @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
- for (Iterator it = iterator(); it.hasNext(); )
- s.writeObject(it.next());
+ for (E e : this)
+ s.writeObject(e);
// Use trailing null as sentinel
s.writeObject(null);
}
@@ -657,17 +895,81 @@ public class LinkedTransferQueue exte
/**
* Reconstitute the Queue instance from a stream (that is,
* deserialize it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
+ resetHeadAndTail();
for (;;) {
- E item = (E)s.readObject();
+ @SuppressWarnings("unchecked") E item = (E) s.readObject();
if (item == null)
break;
else
offer(item);
}
}
+
+ // Support for resetting head/tail while deserializing
+ private void resetHeadAndTail() {
+ Node dummy = new Node(null, false);
+ UNSAFE.putObjectVolatile(this, headOffset,
+ new PaddedAtomicReference>(dummy));
+ UNSAFE.putObjectVolatile(this, tailOffset,
+ new PaddedAtomicReference>(dummy));
+ UNSAFE.putObjectVolatile(this, cleanMeOffset,
+ new PaddedAtomicReference>(null));
+ }
+
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
+ private static final long tailOffset =
+ objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
+ private static final long cleanMeOffset =
+ objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
+
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ java.lang.reflect.Field f = sun.misc
+ .Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (sun.misc.Unsafe) f.get(null);
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
+ }
+ }
+ }
}