--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/01/12 17:16:18 1.12 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/23 23:07:57 1.23 @@ -44,7 +44,6 @@ import java.lang.reflect.*; * @since 1.7 * @author Doug Lea * @param the type of elements held in this collection - * */ public class LinkedTransferQueue extends AbstractQueue implements TransferQueue, java.io.Serializable { @@ -81,7 +80,7 @@ public class LinkedTransferQueue exte * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; + static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * The number of times to spin before blocking in untimed waits. @@ -116,9 +115,14 @@ public class LinkedTransferQueue exte nextUpdater = AtomicReferenceFieldUpdater.newUpdater (QNode.class, QNode.class, "next"); - boolean casNext(QNode cmp, QNode val) { + final boolean casNext(QNode cmp, QNode val) { return nextUpdater.compareAndSet(this, cmp, val); } + + final void clearNext() { + nextUpdater.lazySet(this, this); + } + } /** @@ -135,6 +139,7 @@ public class LinkedTransferQueue exte /** head of the queue */ private transient final PaddedAtomicReference head; + /** tail of the queue */ private transient final PaddedAtomicReference tail; @@ -151,7 +156,7 @@ public class LinkedTransferQueue exte */ private boolean advanceHead(QNode h, QNode nh) { if (h == head.get() && head.compareAndSet(h, nh)) { - h.next = h; // forget old next + h.clearNext(); // forget old next return true; } return false; @@ -161,6 +166,7 @@ public class LinkedTransferQueue exte * Puts or takes an item. Used for most queue operations (except * poll() and tryTransfer()). See the similar code in * SynchronousQueue for detailed explanation. + * * @param e the item or if null, signifies that this is a take * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT @@ -197,7 +203,7 @@ public class LinkedTransferQueue exte Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : x; } } } @@ -207,7 +213,7 @@ public class LinkedTransferQueue exte /** * Version of xfer for poll() and tryTransfer, which - * simplifies control paths both here and in xfer + * simplifies control paths both here and in xfer. */ private Object fulfill(Object e) { boolean isData = (e != null); @@ -235,7 +241,7 @@ public class LinkedTransferQueue exte Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : x; } } } @@ -258,7 +264,7 @@ public class LinkedTransferQueue exte if (mode == NOWAIT) return null; - long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0; + long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = -1; // set to desired spin count below for (;;) { @@ -267,7 +273,7 @@ public class LinkedTransferQueue exte Object x = s.get(); if (x != e) { // Node was matched or cancelled advanceHead(pred, s); // unlink if head - if (x == s) { // was cancelled + if (x == s) { // was cancelled clean(pred, s); return null; } @@ -290,7 +296,7 @@ public class LinkedTransferQueue exte if (spins < 0) { QNode h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? - (mode == TIMEOUT? + ((mode == TIMEOUT) ? maxTimedSpins : maxUntimedSpins) : 0); } if (spins > 0) @@ -311,7 +317,7 @@ public class LinkedTransferQueue exte } /** - * Returns validated tail for use in cleaning methods + * Returns validated tail for use in cleaning methods. */ private QNode getValidatedTail() { for (;;) { @@ -334,6 +340,7 @@ public class LinkedTransferQueue exte /** * Gets rid of cancelled node s with original predecessor pred. + * * @param pred predecessor of cancelled node * @param s the cancelled node */ @@ -344,6 +351,10 @@ public class LinkedTransferQueue exte if (w != Thread.currentThread()) LockSupport.unpark(w); } + + if (pred == null) + return; + /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, if @@ -369,6 +380,7 @@ public class LinkedTransferQueue exte /** * Tries to unsplice the cancelled node held in cleanMe that was * previously uncleanable because it was at tail. + * * @return current cleanMe node (or null) */ private QNode reclean() { @@ -413,6 +425,7 @@ public class LinkedTransferQueue exte * Creates a {@code LinkedTransferQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. + * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null @@ -442,6 +455,12 @@ public class LinkedTransferQueue exte return true; } + public boolean add(E e) { + if (e == null) throw new NullPointerException(); + xfer(e, NOWAIT, 0); + return true; + } + public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { @@ -468,7 +487,7 @@ public class LinkedTransferQueue exte public E take() throws InterruptedException { Object e = xfer(null, WAIT, 0); if (e != null) - return (E)e; + return (E) e; Thread.interrupted(); throw new InterruptedException(); } @@ -476,12 +495,12 @@ public class LinkedTransferQueue exte public E poll(long timeout, TimeUnit unit) throws InterruptedException { Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return (E)e; + return (E) e; throw new InterruptedException(); } public E poll() { - return (E)fulfill(null); + return (E) fulfill(null); } public int drainTo(Collection c) { @@ -515,7 +534,7 @@ public class LinkedTransferQueue exte // Traversal-based methods /** - * Return head after performing any outstanding helping steps + * Returns head after performing any outstanding helping steps. */ private QNode traversalHead() { for (;;) { @@ -538,6 +557,7 @@ public class LinkedTransferQueue exte return h; } } + reclean(); } } @@ -554,56 +574,68 @@ public class LinkedTransferQueue exte * if subsequently removed. */ class Itr implements Iterator { - QNode nextNode; // Next node to return next - QNode currentNode; // last returned node, for remove() - QNode prevNode; // predecessor of last returned node - E nextItem; // Cache of next item, once commited to in next + QNode next; // node to return next + QNode pnext; // predecessor of next + QNode snext; // successor of next + QNode curr; // last returned node, for remove() + QNode pcurr; // predecessor of curr, for remove() + E nextItem; // Cache of next item, once committed to in next Itr() { - nextNode = traversalHead(); - advance(); + findNext(); } - E advance() { - prevNode = currentNode; - currentNode = nextNode; - E x = nextItem; - - QNode p = nextNode.next; + /** + * Ensures next points to next valid node, or null if none. + */ + void findNext() { for (;;) { - if (p == null || !p.isData) { - nextNode = null; - nextItem = null; - return x; - } - Object item = p.get(); - if (item != p && item != null) { - nextNode = p; - nextItem = (E)item; - return x; + QNode pred = pnext; + QNode q = next; + if (pred == null || pred == q) { + pred = traversalHead(); + q = pred.next; + } + if (q == null || !q.isData) { + next = null; + return; + } + Object x = q.get(); + QNode s = q.next; + if (x != null && q != x && q != s) { + nextItem = (E) x; + snext = s; + pnext = pred; + next = q; + return; } - prevNode = p; - p = p.next; + pnext = q; + next = s; } } public boolean hasNext() { - return nextNode != null; + return next != null; } public E next() { - if (nextNode == null) throw new NoSuchElementException(); - return advance(); + if (next == null) throw new NoSuchElementException(); + pcurr = pnext; + curr = next; + pnext = next; + next = snext; + E x = nextItem; + findNext(); + return x; } public void remove() { - QNode p = currentNode; - QNode prev = prevNode; - if (prev == null || p == null) + QNode p = curr; + if (p == null) throw new IllegalStateException(); Object x = p.get(); if (x != null && x != p && p.compareAndSet(x, p)) - clean(prev, p); + clean(pcurr, p); } } @@ -618,7 +650,7 @@ public class LinkedTransferQueue exte if (!p.isData) return null; if (x != null) - return (E)x; + return (E) x; } } } @@ -692,6 +724,28 @@ public class LinkedTransferQueue exte return Integer.MAX_VALUE; } + public boolean remove(Object o) { + if (o == null) + return false; + for (;;) { + QNode pred = traversalHead(); + for (;;) { + QNode q = pred.next; + if (q == null || !q.isData) + return false; + if (q == pred) // restart + break; + Object x = q.get(); + if (x != null && x != q && o.equals(x) && + q.compareAndSet(x, q)) { + clean(pred, q); + return true; + } + pred = q; + } + } + } + /** * Save the state to a stream (that is, serialize it). * @@ -702,8 +756,8 @@ public class LinkedTransferQueue exte private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { s.defaultWriteObject(); - for (Iterator it = iterator(); it.hasNext(); ) - s.writeObject(it.next()); + for (E e : this) + s.writeObject(e); // Use trailing null as sentinel s.writeObject(null); } @@ -711,6 +765,7 @@ public class LinkedTransferQueue exte /** * Reconstitute the Queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -718,7 +773,7 @@ public class LinkedTransferQueue exte s.defaultReadObject(); resetHeadAndTail(); for (;;) { - E item = (E)s.readObject(); + E item = (E) s.readObject(); if (item == null) break; else @@ -730,35 +785,55 @@ public class LinkedTransferQueue exte // Support for resetting head/tail while deserializing private void resetHeadAndTail() { QNode dummy = new QNode(null, false); - _unsafe.putObjectVolatile(this, headOffset, + UNSAFE.putObjectVolatile(this, headOffset, new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, tailOffset, + UNSAFE.putObjectVolatile(this, tailOffset, new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, cleanMeOffset, + UNSAFE.putObjectVolatile(this, cleanMeOffset, new PaddedAtomicReference(null)); } // Temporary Unsafe mechanics for preliminary release - private static final Unsafe _unsafe; + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return UNSAFE.objectFieldOffset + (LinkedTransferQueue.class.getDeclaredField(fieldName)); + } + + private static final Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { - if (LinkedTransferQueue.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - headOffset = _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField("head")); - tailOffset = _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField("tail")); - cleanMeOffset = _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField("cleanMe")); - } catch (Exception e) { + UNSAFE = getUnsafe(); + headOffset = fieldOffset("head"); + tailOffset = fieldOffset("tail"); + cleanMeOffset = fieldOffset("cleanMe"); + } catch (Throwable e) { throw new RuntimeException("Could not initialize intrinsics", e); } }