--- jsr166/src/jsr166y/LinkedTransferQueue.java 2008/07/25 18:11:53 1.5 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/01/12 17:16:18 1.12 @@ -10,6 +10,8 @@ import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; import java.util.*; import java.io.*; +import sun.misc.Unsafe; +import java.lang.reflect.*; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -19,7 +21,7 @@ import java.io.*; * producer. The tail of the queue is that element that has * been on the queue the shortest time for some producer. * - *

Beware that, unlike in most collections, the size + *

Beware that, unlike in most collections, the {@code size} * method is NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements. @@ -49,19 +51,19 @@ public class LinkedTransferQueue exte private static final long serialVersionUID = -3223113410248163686L; /* - * This is still a work in progress... - * * This class extends the approach used in FIFO-mode * SynchronousQueues. See the internal documentation, as well as * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, * Lea & Scott * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) * - * The main extension is to provide different Wait modes - * for the main "xfer" method that puts or takes items. - * These don't impact the basic dual-queue logic, but instead - * control whether or how threads block upon insertion - * of request or data nodes into the dual queue. + * The main extension is to provide different Wait modes for the + * main "xfer" method that puts or takes items. These don't + * impact the basic dual-queue logic, but instead control whether + * or how threads block upon insertion of request or data nodes + * into the dual queue. It also uses slightly different + * conventions for tracking whether nodes are off-list or + * cancelled. */ // Wait modes for xfer method @@ -95,11 +97,11 @@ public class LinkedTransferQueue exte static final long spinForTimeoutThreshold = 1000L; /** - * Node class for LinkedTransferQueue. Opportunistically subclasses from - * AtomicReference to represent item. Uses Object, not E, to allow - * setting item to "this" after use, to avoid garbage - * retention. Similarly, setting the next field to this is used as - * sentinel that node is off list. + * Node class for LinkedTransferQueue. Opportunistically + * subclasses from AtomicReference to represent item. Uses Object, + * not E, to allow setting item to "this" after use, to avoid + * garbage retention. Similarly, setting the next field to this is + * used as sentinel that node is off list. */ static final class QNode extends AtomicReference { volatile QNode next; @@ -131,19 +133,17 @@ public class LinkedTransferQueue exte } - private final QNode dummy = new QNode(null, false); - private final PaddedAtomicReference head = - new PaddedAtomicReference(dummy); - private final PaddedAtomicReference tail = - new PaddedAtomicReference(dummy); + /** head of the queue */ + private transient final PaddedAtomicReference head; + /** tail of the queue */ + private transient final PaddedAtomicReference tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private final PaddedAtomicReference cleanMe = - new PaddedAtomicReference(null); + private transient final PaddedAtomicReference cleanMe; /** * Tries to cas nh as new head; if successful, unlink @@ -159,7 +159,8 @@ public class LinkedTransferQueue exte /** * Puts or takes an item. Used for most queue operations (except - * poll() and tryTransfer()) + * poll() and tryTransfer()). See the similar code in + * SynchronousQueue for detailed explanation. * @param e the item or if null, signifies that this is a take * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT @@ -266,8 +267,10 @@ public class LinkedTransferQueue exte Object x = s.get(); if (x != e) { // Node was matched or cancelled advanceHead(pred, s); // unlink if head - if (x == s) // was cancelled - return clean(pred, s); + if (x == s) { // was cancelled + clean(pred, s); + return null; + } else if (x != null) { s.set(s); // avoid garbage retention return x; @@ -275,7 +278,6 @@ public class LinkedTransferQueue exte else return e; } - if (mode == TIMEOUT) { long now = System.nanoTime(); nanos -= now - lastTime; @@ -296,14 +298,12 @@ public class LinkedTransferQueue exte else if (s.waiter == null) s.waiter = w; else if (mode != TIMEOUT) { - // LockSupport.park(this); - LockSupport.park(); // allows run on java5 + LockSupport.park(this); s.waiter = null; spins = -1; } else if (nanos > spinForTimeoutThreshold) { - // LockSupport.parkNanos(this, nanos); - LockSupport.parkNanos(nanos); + LockSupport.parkNanos(this, nanos); s.waiter = null; spins = -1; } @@ -311,69 +311,106 @@ public class LinkedTransferQueue exte } /** + * Returns validated tail for use in cleaning methods + */ + private QNode getValidatedTail() { + for (;;) { + QNode h = head.get(); + QNode first = h.next; + if (first != null && first.next == first) { // help advance + advanceHead(h, first); + continue; + } + QNode t = tail.get(); + QNode last = t.next; + if (t == tail.get()) { + if (last != null) + tail.compareAndSet(t, last); // help advance + else + return t; + } + } + } + + /** * Gets rid of cancelled node s with original predecessor pred. - * @return null (to simplify use by callers) + * @param pred predecessor of cancelled node + * @param s the cancelled node */ - private Object clean(QNode pred, QNode s) { + private void clean(QNode pred, QNode s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; if (w != Thread.currentThread()) LockSupport.unpark(w); } - - for (;;) { - if (pred.next != s) // already cleaned - return null; - QNode h = head.get(); - QNode hn = h.next; // Absorb cancelled first node as head - if (hn != null && hn.next == hn) { - advanceHead(h, hn); - continue; - } - QNode t = tail.get(); // Ensure consistent read for tail - if (t == h) - return null; - QNode tn = t.next; - if (t != tail.get()) - continue; - if (tn != null) { // Help advance tail - tail.compareAndSet(t, tn); - continue; - } - if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; + /* + * At any given time, exactly one node on list cannot be + * deleted -- the last inserted node. To accommodate this, if + * we cannot delete s, we save its predecessor as "cleanMe", + * processing the previously saved version first. At least one + * of node s or the node previously saved can always be + * processed, so this always terminates. + */ + while (pred.next == s) { + QNode oldpred = reclean(); // First, help get rid of cleanMe + QNode t = getValidatedTail(); + if (s != t) { // If not tail, try to unsplice + QNode sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) - return null; + break; } - QNode dp = cleanMe.get(); - if (dp != null) { // Try unlinking previous cancelled node - QNode d = dp.next; - QNode dn; - if (d == null || // d is gone or - d == dp || // d is off list or - d.get() != d || // d not cancelled or - (d != t && // d not tail and - (dn = d.next) != null && // has successor - dn != d && // that is on list - dp.casNext(d, dn))) // d unspliced - cleanMe.compareAndSet(dp, null); - if (dp == pred) - return null; // s is already saved node + else if (oldpred == pred || // Already saved + (oldpred == null && cleanMe.compareAndSet(null, pred))) + break; // Postpone cleaning + } + } + + /** + * Tries to unsplice the cancelled node held in cleanMe that was + * previously uncleanable because it was at tail. + * @return current cleanMe node (or null) + */ + private QNode reclean() { + /* + * cleanMe is, or at one time was, predecessor of cancelled + * node s that was the tail so could not be unspliced. If s + * is no longer the tail, try to unsplice if necessary and + * make cleanMe slot available. This differs from similar + * code in clean() because we must check that pred still + * points to a cancelled node that must be unspliced -- if + * not, we can (must) clear cleanMe without unsplicing. + * This can loop only due to contention on casNext or + * clearing cleanMe. + */ + QNode pred; + while ((pred = cleanMe.get()) != null) { + QNode t = getValidatedTail(); + QNode s = pred.next; + if (s != t) { + QNode sn; + if (s == null || s == pred || s.get() != s || + (sn = s.next) == s || pred.casNext(s, sn)) + cleanMe.compareAndSet(pred, null); } - else if (cleanMe.compareAndSet(null, pred)) - return null; // Postpone cleaning s + else // s is still tail; cannot clean + break; } + return pred; } /** - * Creates an initially empty LinkedTransferQueue. + * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { + QNode dummy = new QNode(null, false); + head = new PaddedAtomicReference(dummy); + tail = new PaddedAtomicReference(dummy); + cleanMe = new PaddedAtomicReference(null); } /** - * Creates a LinkedTransferQueue + * Creates a {@code LinkedTransferQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. * @param c the collection of elements to initially contain @@ -381,6 +418,7 @@ public class LinkedTransferQueue exte * of its elements are null */ public LinkedTransferQueue(Collection c) { + this(); addAll(c); } @@ -407,9 +445,9 @@ public class LinkedTransferQueue exte public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { - Thread.interrupted(); + Thread.interrupted(); throw new InterruptedException(); - } + } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) @@ -431,7 +469,7 @@ public class LinkedTransferQueue exte Object e = xfer(null, WAIT, 0); if (e != null) return (E)e; - Thread.interrupted(); + Thread.interrupted(); throw new InterruptedException(); } @@ -615,8 +653,8 @@ public class LinkedTransferQueue exte /** * Returns the number of elements in this queue. If this queue - * contains more than Integer.MAX_VALUE elements, returns - * Integer.MAX_VALUE. + * contains more than {@code Integer.MAX_VALUE} elements, returns + * {@code Integer.MAX_VALUE}. * *

Beware that, unlike in most collections, this method is * NOT a constant-time operation. Because of the @@ -657,7 +695,7 @@ public class LinkedTransferQueue exte /** * Save the state to a stream (that is, serialize it). * - * @serialData All of the elements (each an E) in + * @serialData All of the elements (each an {@code E}) in * the proper order, followed by a null * @param s the stream */ @@ -678,6 +716,7 @@ public class LinkedTransferQueue exte private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); + resetHeadAndTail(); for (;;) { E item = (E)s.readObject(); if (item == null) @@ -686,4 +725,42 @@ public class LinkedTransferQueue exte offer(item); } } + + + // Support for resetting head/tail while deserializing + private void resetHeadAndTail() { + QNode dummy = new QNode(null, false); + _unsafe.putObjectVolatile(this, headOffset, + new PaddedAtomicReference(dummy)); + _unsafe.putObjectVolatile(this, tailOffset, + new PaddedAtomicReference(dummy)); + _unsafe.putObjectVolatile(this, cleanMeOffset, + new PaddedAtomicReference(null)); + } + + // Temporary Unsafe mechanics for preliminary release + private static final Unsafe _unsafe; + private static final long headOffset; + private static final long tailOffset; + private static final long cleanMeOffset; + static { + try { + if (LinkedTransferQueue.class.getClassLoader() != null) { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + _unsafe = (Unsafe)f.get(null); + } + else + _unsafe = Unsafe.getUnsafe(); + headOffset = _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField("head")); + tailOffset = _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField("tail")); + cleanMeOffset = _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField("cleanMe")); + } catch (Exception e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } + } + }