--- jsr166/src/jsr166y/LinkedTransferQueue.java 2008/07/25 18:13:15 1.6
+++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/01/12 17:16:18 1.12
@@ -10,6 +10,8 @@ import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.*;
import java.io.*;
+import sun.misc.Unsafe;
+import java.lang.reflect.*;
/**
* An unbounded {@linkplain TransferQueue} based on linked nodes.
@@ -19,7 +21,7 @@ import java.io.*;
* producer. The tail of the queue is that element that has
* been on the queue the shortest time for some producer.
*
- *
Beware that, unlike in most collections, the size
+ *
Beware that, unlike in most collections, the {@code size}
* method is NOT a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
@@ -49,19 +51,19 @@ public class LinkedTransferQueue exte
private static final long serialVersionUID = -3223113410248163686L;
/*
- * This is still a work in progress...
- *
* This class extends the approach used in FIFO-mode
* SynchronousQueues. See the internal documentation, as well as
* the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
* Lea & Scott
* (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
*
- * The main extension is to provide different Wait modes
- * for the main "xfer" method that puts or takes items.
- * These don't impact the basic dual-queue logic, but instead
- * control whether or how threads block upon insertion
- * of request or data nodes into the dual queue.
+ * The main extension is to provide different Wait modes for the
+ * main "xfer" method that puts or takes items. These don't
+ * impact the basic dual-queue logic, but instead control whether
+ * or how threads block upon insertion of request or data nodes
+ * into the dual queue. It also uses slightly different
+ * conventions for tracking whether nodes are off-list or
+ * cancelled.
*/
// Wait modes for xfer method
@@ -95,11 +97,11 @@ public class LinkedTransferQueue exte
static final long spinForTimeoutThreshold = 1000L;
/**
- * Node class for LinkedTransferQueue. Opportunistically subclasses from
- * AtomicReference to represent item. Uses Object, not E, to allow
- * setting item to "this" after use, to avoid garbage
- * retention. Similarly, setting the next field to this is used as
- * sentinel that node is off list.
+ * Node class for LinkedTransferQueue. Opportunistically
+ * subclasses from AtomicReference to represent item. Uses Object,
+ * not E, to allow setting item to "this" after use, to avoid
+ * garbage retention. Similarly, setting the next field to this is
+ * used as sentinel that node is off list.
*/
static final class QNode extends AtomicReference {
volatile QNode next;
@@ -131,19 +133,17 @@ public class LinkedTransferQueue exte
}
- private final QNode dummy = new QNode(null, false);
- private final PaddedAtomicReference head =
- new PaddedAtomicReference(dummy);
- private final PaddedAtomicReference tail =
- new PaddedAtomicReference(dummy);
+ /** head of the queue */
+ private transient final PaddedAtomicReference head;
+ /** tail of the queue */
+ private transient final PaddedAtomicReference tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it cancelled.
*/
- private final PaddedAtomicReference cleanMe =
- new PaddedAtomicReference(null);
+ private transient final PaddedAtomicReference cleanMe;
/**
* Tries to cas nh as new head; if successful, unlink
@@ -159,7 +159,8 @@ public class LinkedTransferQueue exte
/**
* Puts or takes an item. Used for most queue operations (except
- * poll() and tryTransfer())
+ * poll() and tryTransfer()). See the similar code in
+ * SynchronousQueue for detailed explanation.
* @param e the item or if null, signifies that this is a take
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
@@ -266,8 +267,10 @@ public class LinkedTransferQueue exte
Object x = s.get();
if (x != e) { // Node was matched or cancelled
advanceHead(pred, s); // unlink if head
- if (x == s) // was cancelled
- return clean(pred, s);
+ if (x == s) { // was cancelled
+ clean(pred, s);
+ return null;
+ }
else if (x != null) {
s.set(s); // avoid garbage retention
return x;
@@ -275,7 +278,6 @@ public class LinkedTransferQueue exte
else
return e;
}
-
if (mode == TIMEOUT) {
long now = System.nanoTime();
nanos -= now - lastTime;
@@ -296,14 +298,12 @@ public class LinkedTransferQueue exte
else if (s.waiter == null)
s.waiter = w;
else if (mode != TIMEOUT) {
- // LockSupport.park(this);
- LockSupport.park(); // allows run on java5
+ LockSupport.park(this);
s.waiter = null;
spins = -1;
}
else if (nanos > spinForTimeoutThreshold) {
- // LockSupport.parkNanos(this, nanos);
- LockSupport.parkNanos(nanos);
+ LockSupport.parkNanos(this, nanos);
s.waiter = null;
spins = -1;
}
@@ -311,69 +311,106 @@ public class LinkedTransferQueue exte
}
/**
+ * Returns validated tail for use in cleaning methods
+ */
+ private QNode getValidatedTail() {
+ for (;;) {
+ QNode h = head.get();
+ QNode first = h.next;
+ if (first != null && first.next == first) { // help advance
+ advanceHead(h, first);
+ continue;
+ }
+ QNode t = tail.get();
+ QNode last = t.next;
+ if (t == tail.get()) {
+ if (last != null)
+ tail.compareAndSet(t, last); // help advance
+ else
+ return t;
+ }
+ }
+ }
+
+ /**
* Gets rid of cancelled node s with original predecessor pred.
- * @return null (to simplify use by callers)
+ * @param pred predecessor of cancelled node
+ * @param s the cancelled node
*/
- private Object clean(QNode pred, QNode s) {
+ private void clean(QNode pred, QNode s) {
Thread w = s.waiter;
if (w != null) { // Wake up thread
s.waiter = null;
if (w != Thread.currentThread())
LockSupport.unpark(w);
}
-
- for (;;) {
- if (pred.next != s) // already cleaned
- return null;
- QNode h = head.get();
- QNode hn = h.next; // Absorb cancelled first node as head
- if (hn != null && hn.next == hn) {
- advanceHead(h, hn);
- continue;
- }
- QNode t = tail.get(); // Ensure consistent read for tail
- if (t == h)
- return null;
- QNode tn = t.next;
- if (t != tail.get())
- continue;
- if (tn != null) { // Help advance tail
- tail.compareAndSet(t, tn);
- continue;
- }
- if (s != t) { // If not tail, try to unsplice
- QNode sn = s.next;
+ /*
+ * At any given time, exactly one node on list cannot be
+ * deleted -- the last inserted node. To accommodate this, if
+ * we cannot delete s, we save its predecessor as "cleanMe",
+ * processing the previously saved version first. At least one
+ * of node s or the node previously saved can always be
+ * processed, so this always terminates.
+ */
+ while (pred.next == s) {
+ QNode oldpred = reclean(); // First, help get rid of cleanMe
+ QNode t = getValidatedTail();
+ if (s != t) { // If not tail, try to unsplice
+ QNode sn = s.next; // s.next == s means s already off list
if (sn == s || pred.casNext(s, sn))
- return null;
+ break;
}
- QNode dp = cleanMe.get();
- if (dp != null) { // Try unlinking previous cancelled node
- QNode d = dp.next;
- QNode dn;
- if (d == null || // d is gone or
- d == dp || // d is off list or
- d.get() != d || // d not cancelled or
- (d != t && // d not tail and
- (dn = d.next) != null && // has successor
- dn != d && // that is on list
- dp.casNext(d, dn))) // d unspliced
- cleanMe.compareAndSet(dp, null);
- if (dp == pred)
- return null; // s is already saved node
+ else if (oldpred == pred || // Already saved
+ (oldpred == null && cleanMe.compareAndSet(null, pred)))
+ break; // Postpone cleaning
+ }
+ }
+
+ /**
+ * Tries to unsplice the cancelled node held in cleanMe that was
+ * previously uncleanable because it was at tail.
+ * @return current cleanMe node (or null)
+ */
+ private QNode reclean() {
+ /*
+ * cleanMe is, or at one time was, predecessor of cancelled
+ * node s that was the tail so could not be unspliced. If s
+ * is no longer the tail, try to unsplice if necessary and
+ * make cleanMe slot available. This differs from similar
+ * code in clean() because we must check that pred still
+ * points to a cancelled node that must be unspliced -- if
+ * not, we can (must) clear cleanMe without unsplicing.
+ * This can loop only due to contention on casNext or
+ * clearing cleanMe.
+ */
+ QNode pred;
+ while ((pred = cleanMe.get()) != null) {
+ QNode t = getValidatedTail();
+ QNode s = pred.next;
+ if (s != t) {
+ QNode sn;
+ if (s == null || s == pred || s.get() != s ||
+ (sn = s.next) == s || pred.casNext(s, sn))
+ cleanMe.compareAndSet(pred, null);
}
- else if (cleanMe.compareAndSet(null, pred))
- return null; // Postpone cleaning s
+ else // s is still tail; cannot clean
+ break;
}
+ return pred;
}
/**
- * Creates an initially empty LinkedTransferQueue .
+ * Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {
+ QNode dummy = new QNode(null, false);
+ head = new PaddedAtomicReference(dummy);
+ tail = new PaddedAtomicReference(dummy);
+ cleanMe = new PaddedAtomicReference(null);
}
/**
- * Creates a LinkedTransferQueue
+ * Creates a {@code LinkedTransferQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
* @param c the collection of elements to initially contain
@@ -381,6 +418,7 @@ public class LinkedTransferQueue exte
* of its elements are null
*/
public LinkedTransferQueue(Collection extends E> c) {
+ this();
addAll(c);
}
@@ -615,8 +653,8 @@ public class LinkedTransferQueue exte
/**
* Returns the number of elements in this queue. If this queue
- * contains more than Integer.MAX_VALUE elements, returns
- * Integer.MAX_VALUE .
+ * contains more than {@code Integer.MAX_VALUE} elements, returns
+ * {@code Integer.MAX_VALUE}.
*
* Beware that, unlike in most collections, this method is
* NOT a constant-time operation. Because of the
@@ -657,7 +695,7 @@ public class LinkedTransferQueue exte
/**
* Save the state to a stream (that is, serialize it).
*
- * @serialData All of the elements (each an E ) in
+ * @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
* @param s the stream
*/
@@ -678,6 +716,7 @@ public class LinkedTransferQueue exte
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
+ resetHeadAndTail();
for (;;) {
E item = (E)s.readObject();
if (item == null)
@@ -686,4 +725,42 @@ public class LinkedTransferQueue exte
offer(item);
}
}
+
+
+ // Support for resetting head/tail while deserializing
+ private void resetHeadAndTail() {
+ QNode dummy = new QNode(null, false);
+ _unsafe.putObjectVolatile(this, headOffset,
+ new PaddedAtomicReference(dummy));
+ _unsafe.putObjectVolatile(this, tailOffset,
+ new PaddedAtomicReference(dummy));
+ _unsafe.putObjectVolatile(this, cleanMeOffset,
+ new PaddedAtomicReference(null));
+ }
+
+ // Temporary Unsafe mechanics for preliminary release
+ private static final Unsafe _unsafe;
+ private static final long headOffset;
+ private static final long tailOffset;
+ private static final long cleanMeOffset;
+ static {
+ try {
+ if (LinkedTransferQueue.class.getClassLoader() != null) {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ _unsafe = (Unsafe)f.get(null);
+ }
+ else
+ _unsafe = Unsafe.getUnsafe();
+ headOffset = _unsafe.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField("head"));
+ tailOffset = _unsafe.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField("tail"));
+ cleanMeOffset = _unsafe.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
+ } catch (Exception e) {
+ throw new RuntimeException("Could not initialize intrinsics", e);
+ }
+ }
+
}