--- jsr166/src/jsr166y/LinkedTransferQueue.java 2007/08/27 19:48:36 1.3 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2008/10/03 00:39:48 1.8 @@ -10,6 +10,8 @@ import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; import java.util.*; import java.io.*; +import sun.misc.Unsafe; +import java.lang.reflect.*; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -49,7 +51,7 @@ public class LinkedTransferQueue exte private static final long serialVersionUID = -3223113410248163686L; /* - * This is still a work in prgress... + * This is still a work in progress... * * This class extends the approach used in FIFO-mode * SynchronousQueues. See the internal documentation, as well as @@ -79,7 +81,7 @@ public class LinkedTransferQueue exte * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; + static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; /** * The number of times to spin before blocking in untimed waits. @@ -94,7 +96,7 @@ public class LinkedTransferQueue exte */ static final long spinForTimeoutThreshold = 1000L; - /** + /** * Node class for LinkedTransferQueue. Opportunistically subclasses from * AtomicReference to represent item. Uses Object, not E, to allow * setting item to "this" after use, to avoid garbage @@ -131,19 +133,17 @@ public class LinkedTransferQueue exte } - private final QNode dummy = new QNode(null, false); - private final PaddedAtomicReference head = - new PaddedAtomicReference(dummy); - private final PaddedAtomicReference tail = - new PaddedAtomicReference(dummy); + /** head of the queue */ + private transient final PaddedAtomicReference head; + /** tail of the queue */ + private transient final PaddedAtomicReference tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private final PaddedAtomicReference cleanMe = - new PaddedAtomicReference(null); + private transient final PaddedAtomicReference cleanMe; /** * Tries to cas nh as new head; if successful, unlink @@ -156,11 +156,11 @@ public class LinkedTransferQueue exte } return false; } - + /** * Puts or takes an item. Used for most queue operations (except * poll() and tryTransfer()) - * @param e the item or if null, signfies that this is a take + * @param e the item or if null, signifies that this is a take * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure @@ -188,10 +188,10 @@ public class LinkedTransferQueue exte return awaitFulfill(t, s, e, mode, nanos); } } - + else if (h != null) { QNode first = h.next; - if (t == tail.get() && first != null && + if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { @@ -206,7 +206,7 @@ public class LinkedTransferQueue exte /** * Version of xfer for poll() and tryTransfer, which - * simpifies control paths both here and in xfer + * simplifies control paths both here and in xfer */ private Object fulfill(Object e) { boolean isData = (e != null); @@ -228,7 +228,7 @@ public class LinkedTransferQueue exte } else if (h != null) { QNode first = h.next; - if (t == tail.get() && + if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); @@ -252,7 +252,7 @@ public class LinkedTransferQueue exte * @param nanos timeout value * @return matched item, or s if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, + private Object awaitFulfill(QNode pred, QNode s, Object e, int mode, long nanos) { if (mode == NOWAIT) return null; @@ -268,7 +268,7 @@ public class LinkedTransferQueue exte advanceHead(pred, s); // unlink if head if (x == s) // was cancelled return clean(pred, s); - else if (x != null) { + else if (x != null) { s.set(s); // avoid garbage retention return x; } @@ -288,7 +288,7 @@ public class LinkedTransferQueue exte if (spins < 0) { QNode h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? - (mode == TIMEOUT? + (mode == TIMEOUT? maxTimedSpins : maxUntimedSpins) : 0); } if (spins > 0) @@ -321,10 +321,10 @@ public class LinkedTransferQueue exte if (w != Thread.currentThread()) LockSupport.unpark(w); } - + for (;;) { if (pred.next != s) // already cleaned - return null; + return null; QNode h = head.get(); QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.next == hn) { @@ -360,16 +360,20 @@ public class LinkedTransferQueue exte cleanMe.compareAndSet(dp, null); if (dp == pred) return null; // s is already saved node - } + } else if (cleanMe.compareAndSet(null, pred)) return null; // Postpone cleaning s } } - + /** * Creates an initially empty LinkedTransferQueue. */ public LinkedTransferQueue() { + QNode dummy = new QNode(null, false); + head = new PaddedAtomicReference(dummy); + tail = new PaddedAtomicReference(dummy); + cleanMe = new PaddedAtomicReference(null); } /** @@ -381,6 +385,7 @@ public class LinkedTransferQueue exte * of its elements are null */ public LinkedTransferQueue(Collection c) { + this(); addAll(c); } @@ -390,7 +395,7 @@ public class LinkedTransferQueue exte xfer(e, NOWAIT, 0); } - public boolean offer(E e, long timeout, TimeUnit unit) + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (Thread.interrupted()) throw new InterruptedException(); @@ -407,9 +412,9 @@ public class LinkedTransferQueue exte public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { - Thread.interrupted(); + Thread.interrupted(); throw new InterruptedException(); - } + } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) @@ -431,7 +436,7 @@ public class LinkedTransferQueue exte Object e = xfer(null, WAIT, 0); if (e != null) return (E)e; - Thread.interrupted(); + Thread.interrupted(); throw new InterruptedException(); } @@ -487,12 +492,12 @@ public class LinkedTransferQueue exte QNode last = t.next; QNode first = h.next; if (t == tail.get()) { - if (last != null) + if (last != null) tail.compareAndSet(t, last); else if (first != null) { Object x = first.get(); - if (x == first) - advanceHead(h, first); + if (x == first) + advanceHead(h, first); else return h; } @@ -509,7 +514,7 @@ public class LinkedTransferQueue exte } /** - * Iterators. Basic strategy os to travers list, treating + * Iterators. Basic strategy is to traverse list, treating * non-data (i.e., request) nodes as terminating list. * Once a valid data node is found, the item is cached * so that the next call to next() will return it even @@ -520,17 +525,17 @@ public class LinkedTransferQueue exte QNode currentNode; // last returned node, for remove() QNode prevNode; // predecessor of last returned node E nextItem; // Cache of next item, once commited to in next - + Itr() { nextNode = traversalHead(); advance(); } - + E advance() { prevNode = currentNode; currentNode = nextNode; E x = nextItem; - + QNode p = nextNode.next; for (;;) { if (p == null || !p.isData) { @@ -543,25 +548,25 @@ public class LinkedTransferQueue exte nextNode = p; nextItem = (E)item; return x; - } + } prevNode = p; p = p.next; } } - + public boolean hasNext() { return nextNode != null; } - + public E next() { if (nextNode == null) throw new NoSuchElementException(); return advance(); } - + public void remove() { QNode p = currentNode; QNode prev = prevNode; - if (prev == null || p == null) + if (prev == null || p == null) throw new IllegalStateException(); Object x = p.get(); if (x != null && x != p && p.compareAndSet(x, p)) @@ -608,11 +613,11 @@ public class LinkedTransferQueue exte if (p == null) return false; Object x = p.get(); - if (p != x) + if (p != x) return !p.isData; } } - + /** * Returns the number of elements in this queue. If this queue * contains more than Integer.MAX_VALUE elements, returns @@ -630,7 +635,7 @@ public class LinkedTransferQueue exte QNode h = traversalHead(); for (QNode p = h.next; p != null && p.isData; p = p.next) { Object x = p.get(); - if (x != null && x != p) { + if (x != null && x != p) { if (++count == Integer.MAX_VALUE) // saturated break; } @@ -678,6 +683,7 @@ public class LinkedTransferQueue exte private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); + resetHeadAndTail(); for (;;) { E item = (E)s.readObject(); if (item == null) @@ -686,4 +692,44 @@ public class LinkedTransferQueue exte offer(item); } } + + + // Support for resetting head/tail while deserializing + + // Temporary Unsafe mechanics for preliminary release + private static final Unsafe _unsafe; + private static final long headOffset; + private static final long tailOffset; + private static final long cleanMeOffset; + static { + try { + if (LinkedTransferQueue.class.getClassLoader() != null) { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + _unsafe = (Unsafe)f.get(null); + } + else + _unsafe = Unsafe.getUnsafe(); + headOffset = _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField("head")); + tailOffset = _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField("tail")); + cleanMeOffset = _unsafe.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField("cleanMe")); + } catch (Exception e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } + } + + private void resetHeadAndTail() { + QNode dummy = new QNode(null, false); + _unsafe.putObjectVolatile(this, headOffset, + new PaddedAtomicReference(dummy)); + _unsafe.putObjectVolatile(this, tailOffset, + new PaddedAtomicReference(dummy)); + _unsafe.putObjectVolatile(this, cleanMeOffset, + new PaddedAtomicReference(null)); + + } + }