--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/01/05 03:43:07 1.10
+++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/23 19:25:45 1.22
@@ -21,7 +21,7 @@ import java.lang.reflect.*;
* producer. The tail of the queue is that element that has
* been on the queue the shortest time for some producer.
*
- *
Beware that, unlike in most collections, the size
+ *
Beware that, unlike in most collections, the {@code size}
* method is NOT a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
@@ -44,7 +44,6 @@ import java.lang.reflect.*;
* @since 1.7
* @author Doug Lea
* @param the type of elements held in this collection
- *
*/
public class LinkedTransferQueue extends AbstractQueue
implements TransferQueue, java.io.Serializable {
@@ -81,7 +80,7 @@ public class LinkedTransferQueue exte
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
- static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
+ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
@@ -116,9 +115,14 @@ public class LinkedTransferQueue exte
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, QNode.class, "next");
- boolean casNext(QNode cmp, QNode val) {
+ final boolean casNext(QNode cmp, QNode val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
+
+ final void clearNext() {
+ nextUpdater.lazySet(this, this);
+ }
+
}
/**
@@ -151,7 +155,7 @@ public class LinkedTransferQueue exte
*/
private boolean advanceHead(QNode h, QNode nh) {
if (h == head.get() && head.compareAndSet(h, nh)) {
- h.next = h; // forget old next
+ h.clearNext(); // forget old next
return true;
}
return false;
@@ -161,6 +165,7 @@ public class LinkedTransferQueue exte
* Puts or takes an item. Used for most queue operations (except
* poll() and tryTransfer()). See the similar code in
* SynchronousQueue for detailed explanation.
+ *
* @param e the item or if null, signifies that this is a take
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
@@ -197,7 +202,7 @@ public class LinkedTransferQueue exte
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
- return isData? e : x;
+ return isData ? e : x;
}
}
}
@@ -207,7 +212,7 @@ public class LinkedTransferQueue exte
/**
* Version of xfer for poll() and tryTransfer, which
- * simplifies control paths both here and in xfer
+ * simplifies control paths both here and in xfer.
*/
private Object fulfill(Object e) {
boolean isData = (e != null);
@@ -235,7 +240,7 @@ public class LinkedTransferQueue exte
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
- return isData? e : x;
+ return isData ? e : x;
}
}
}
@@ -258,7 +263,7 @@ public class LinkedTransferQueue exte
if (mode == NOWAIT)
return null;
- long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
+ long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = -1; // set to desired spin count below
for (;;) {
@@ -267,7 +272,7 @@ public class LinkedTransferQueue exte
Object x = s.get();
if (x != e) { // Node was matched or cancelled
advanceHead(pred, s); // unlink if head
- if (x == s) { // was cancelled
+ if (x == s) { // was cancelled
clean(pred, s);
return null;
}
@@ -290,7 +295,7 @@ public class LinkedTransferQueue exte
if (spins < 0) {
QNode h = head.get(); // only spin if at head
spins = ((h != null && h.next == s) ?
- (mode == TIMEOUT?
+ ((mode == TIMEOUT) ?
maxTimedSpins : maxUntimedSpins) : 0);
}
if (spins > 0)
@@ -298,14 +303,12 @@ public class LinkedTransferQueue exte
else if (s.waiter == null)
s.waiter = w;
else if (mode != TIMEOUT) {
- // LockSupport.park(this);
- LockSupport.park(); // allows run on java5
+ LockSupport.park(this);
s.waiter = null;
spins = -1;
}
else if (nanos > spinForTimeoutThreshold) {
- // LockSupport.parkNanos(this, nanos);
- LockSupport.parkNanos(nanos);
+ LockSupport.parkNanos(this, nanos);
s.waiter = null;
spins = -1;
}
@@ -313,7 +316,7 @@ public class LinkedTransferQueue exte
}
/**
- * Returns validated tail for use in cleaning methods
+ * Returns validated tail for use in cleaning methods.
*/
private QNode getValidatedTail() {
for (;;) {
@@ -336,6 +339,7 @@ public class LinkedTransferQueue exte
/**
* Gets rid of cancelled node s with original predecessor pred.
+ *
* @param pred predecessor of cancelled node
* @param s the cancelled node
*/
@@ -346,6 +350,10 @@ public class LinkedTransferQueue exte
if (w != Thread.currentThread())
LockSupport.unpark(w);
}
+
+ if (pred == null)
+ return;
+
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this, if
@@ -371,6 +379,7 @@ public class LinkedTransferQueue exte
/**
* Tries to unsplice the cancelled node held in cleanMe that was
* previously uncleanable because it was at tail.
+ *
* @return current cleanMe node (or null)
*/
private QNode reclean() {
@@ -402,7 +411,7 @@ public class LinkedTransferQueue exte
}
/**
- * Creates an initially empty LinkedTransferQueue.
+ * Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {
QNode dummy = new QNode(null, false);
@@ -412,9 +421,10 @@ public class LinkedTransferQueue exte
}
/**
- * Creates a LinkedTransferQueue
+ * Creates a {@code LinkedTransferQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
+ *
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
@@ -444,6 +454,12 @@ public class LinkedTransferQueue exte
return true;
}
+ public boolean add(E e) {
+ if (e == null) throw new NullPointerException();
+ xfer(e, NOWAIT, 0);
+ return true;
+ }
+
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (xfer(e, WAIT, 0) == null) {
@@ -470,7 +486,7 @@ public class LinkedTransferQueue exte
public E take() throws InterruptedException {
Object e = xfer(null, WAIT, 0);
if (e != null)
- return (E)e;
+ return (E) e;
Thread.interrupted();
throw new InterruptedException();
}
@@ -478,12 +494,12 @@ public class LinkedTransferQueue exte
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return (E)e;
+ return (E) e;
throw new InterruptedException();
}
public E poll() {
- return (E)fulfill(null);
+ return (E) fulfill(null);
}
public int drainTo(Collection super E> c) {
@@ -517,7 +533,7 @@ public class LinkedTransferQueue exte
// Traversal-based methods
/**
- * Return head after performing any outstanding helping steps
+ * Returns head after performing any outstanding helping steps.
*/
private QNode traversalHead() {
for (;;) {
@@ -540,6 +556,7 @@ public class LinkedTransferQueue exte
return h;
}
}
+ reclean();
}
}
@@ -556,56 +573,68 @@ public class LinkedTransferQueue exte
* if subsequently removed.
*/
class Itr implements Iterator {
- QNode nextNode; // Next node to return next
- QNode currentNode; // last returned node, for remove()
- QNode prevNode; // predecessor of last returned node
- E nextItem; // Cache of next item, once commited to in next
+ QNode next; // node to return next
+ QNode pnext; // predecessor of next
+ QNode snext; // successor of next
+ QNode curr; // last returned node, for remove()
+ QNode pcurr; // predecessor of curr, for remove()
+ E nextItem; // Cache of next item, once committed to in next
Itr() {
- nextNode = traversalHead();
- advance();
+ findNext();
}
- E advance() {
- prevNode = currentNode;
- currentNode = nextNode;
- E x = nextItem;
-
- QNode p = nextNode.next;
+ /**
+ * Ensures next points to next valid node, or null if none.
+ */
+ void findNext() {
for (;;) {
- if (p == null || !p.isData) {
- nextNode = null;
- nextItem = null;
- return x;
+ QNode pred = pnext;
+ QNode q = next;
+ if (pred == null || pred == q) {
+ pred = traversalHead();
+ q = pred.next;
+ }
+ if (q == null || !q.isData) {
+ next = null;
+ return;
+ }
+ Object x = q.get();
+ QNode s = q.next;
+ if (x != null && q != x && q != s) {
+ nextItem = (E) x;
+ snext = s;
+ pnext = pred;
+ next = q;
+ return;
}
- Object item = p.get();
- if (item != p && item != null) {
- nextNode = p;
- nextItem = (E)item;
- return x;
- }
- prevNode = p;
- p = p.next;
+ pnext = q;
+ next = s;
}
}
public boolean hasNext() {
- return nextNode != null;
+ return next != null;
}
public E next() {
- if (nextNode == null) throw new NoSuchElementException();
- return advance();
+ if (next == null) throw new NoSuchElementException();
+ pcurr = pnext;
+ curr = next;
+ pnext = next;
+ next = snext;
+ E x = nextItem;
+ findNext();
+ return x;
}
public void remove() {
- QNode p = currentNode;
- QNode prev = prevNode;
- if (prev == null || p == null)
+ QNode p = curr;
+ if (p == null)
throw new IllegalStateException();
Object x = p.get();
if (x != null && x != p && p.compareAndSet(x, p))
- clean(prev, p);
+ clean(pcurr, p);
}
}
@@ -620,7 +649,7 @@ public class LinkedTransferQueue exte
if (!p.isData)
return null;
if (x != null)
- return (E)x;
+ return (E) x;
}
}
}
@@ -655,8 +684,8 @@ public class LinkedTransferQueue exte
/**
* Returns the number of elements in this queue. If this queue
- * contains more than Integer.MAX_VALUE elements, returns
- * Integer.MAX_VALUE.
+ * contains more than {@code Integer.MAX_VALUE} elements, returns
+ * {@code Integer.MAX_VALUE}.
*
* Beware that, unlike in most collections, this method is
* NOT a constant-time operation. Because of the
@@ -694,18 +723,40 @@ public class LinkedTransferQueue exte
return Integer.MAX_VALUE;
}
+ public boolean remove(Object o) {
+ if (o == null)
+ return false;
+ for (;;) {
+ QNode pred = traversalHead();
+ for (;;) {
+ QNode q = pred.next;
+ if (q == null || !q.isData)
+ return false;
+ if (q == pred) // restart
+ break;
+ Object x = q.get();
+ if (x != null && x != q && o.equals(x) &&
+ q.compareAndSet(x, q)) {
+ clean(pred, q);
+ return true;
+ }
+ pred = q;
+ }
+ }
+ }
+
/**
* Save the state to a stream (that is, serialize it).
*
- * @serialData All of the elements (each an E) in
+ * @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
- for (Iterator it = iterator(); it.hasNext(); )
- s.writeObject(it.next());
+ for (E e : this)
+ s.writeObject(e);
// Use trailing null as sentinel
s.writeObject(null);
}
@@ -713,6 +764,7 @@ public class LinkedTransferQueue exte
/**
* Reconstitute the Queue instance from a stream (that is,
* deserialize it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
@@ -720,7 +772,7 @@ public class LinkedTransferQueue exte
s.defaultReadObject();
resetHeadAndTail();
for (;;) {
- E item = (E)s.readObject();
+ E item = (E) s.readObject();
if (item == null)
break;
else
@@ -730,41 +782,59 @@ public class LinkedTransferQueue exte
// Support for resetting head/tail while deserializing
+ private void resetHeadAndTail() {
+ QNode dummy = new QNode(null, false);
+ UNSAFE.putObjectVolatile(this, headOffset,
+ new PaddedAtomicReference(dummy));
+ UNSAFE.putObjectVolatile(this, tailOffset,
+ new PaddedAtomicReference(dummy));
+ UNSAFE.putObjectVolatile(this, cleanMeOffset,
+ new PaddedAtomicReference(null));
+ }
// Temporary Unsafe mechanics for preliminary release
- private static final Unsafe _unsafe;
+ private static Unsafe getUnsafe() throws Throwable {
+ try {
+ return Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security.PrivilegedExceptionAction() {
+ public Unsafe run() throws Exception {
+ return getUnsafePrivileged();
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw e.getCause();
+ }
+ }
+ }
+
+ private static Unsafe getUnsafePrivileged()
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (Unsafe) f.get(null);
+ }
+
+ private static long fieldOffset(String fieldName)
+ throws NoSuchFieldException {
+ return UNSAFE.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField(fieldName));
+ }
+
+ private static final Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
- if (LinkedTransferQueue.class.getClassLoader() != null) {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- _unsafe = (Unsafe)f.get(null);
- }
- else
- _unsafe = Unsafe.getUnsafe();
- headOffset = _unsafe.objectFieldOffset
- (LinkedTransferQueue.class.getDeclaredField("head"));
- tailOffset = _unsafe.objectFieldOffset
- (LinkedTransferQueue.class.getDeclaredField("tail"));
- cleanMeOffset = _unsafe.objectFieldOffset
- (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
- } catch (Exception e) {
+ UNSAFE = getUnsafe();
+ headOffset = fieldOffset("head");
+ tailOffset = fieldOffset("tail");
+ cleanMeOffset = fieldOffset("cleanMe");
+ } catch (Throwable e) {
throw new RuntimeException("Could not initialize intrinsics", e);
}
}
- private void resetHeadAndTail() {
- QNode dummy = new QNode(null, false);
- _unsafe.putObjectVolatile(this, headOffset,
- new PaddedAtomicReference(dummy));
- _unsafe.putObjectVolatile(this, tailOffset,
- new PaddedAtomicReference(dummy));
- _unsafe.putObjectVolatile(this, cleanMeOffset,
- new PaddedAtomicReference(null));
-
- }
-
}