--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/03/30 04:32:23 1.16 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/26 05:55:34 1.27 @@ -5,13 +5,16 @@ */ package jsr166y; + import java.util.concurrent.*; -import java.util.concurrent.locks.*; -import java.util.concurrent.atomic.*; -import java.util.*; -import java.io.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -44,7 +47,6 @@ import java.lang.reflect.*; * @since 1.7 * @author Doug Lea * @param the type of elements held in this collection - * */ public class LinkedTransferQueue extends AbstractQueue implements TransferQueue, java.io.Serializable { @@ -81,7 +83,7 @@ public class LinkedTransferQueue exte * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; + static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * The number of times to spin before blocking in untimed waits. @@ -103,20 +105,22 @@ public class LinkedTransferQueue exte * garbage retention. Similarly, setting the next field to this is * used as sentinel that node is off list. */ - static final class QNode extends AtomicReference { - volatile QNode next; + static final class Node extends AtomicReference { + volatile Node next; volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { + + Node(E item, boolean isData) { super(item); this.isData = isData; } - static final AtomicReferenceFieldUpdater + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); + (Node.class, Node.class, "next"); - final boolean casNext(QNode cmp, QNode val) { + final boolean casNext(Node cmp, Node val) { return nextUpdater.compareAndSet(this, cmp, val); } @@ -124,6 +128,7 @@ public class LinkedTransferQueue exte nextUpdater.lazySet(this, this); } + private static final long serialVersionUID = -3375979862319811754L; } /** @@ -135,26 +140,28 @@ public class LinkedTransferQueue exte // enough padding for 64bytes with 4byte refs Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; PaddedAtomicReference(T r) { super(r); } + private static final long serialVersionUID = 8170090609809740854L; } /** head of the queue */ - private transient final PaddedAtomicReference head; + private transient final PaddedAtomicReference> head; + /** tail of the queue */ - private transient final PaddedAtomicReference tail; + private transient final PaddedAtomicReference> tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private transient final PaddedAtomicReference cleanMe; + private transient final PaddedAtomicReference> cleanMe; /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ - private boolean advanceHead(QNode h, QNode nh) { + private boolean advanceHead(Node h, Node nh) { if (h == head.get() && head.compareAndSet(h, nh)) { h.clearNext(); // forget old next return true; @@ -166,25 +173,26 @@ public class LinkedTransferQueue exte * Puts or takes an item. Used for most queue operations (except * poll() and tryTransfer()). See the similar code in * SynchronousQueue for detailed explanation. + * * @param e the item or if null, signifies that this is a take * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure */ - private Object xfer(Object e, int mode, long nanos) { + private E xfer(E e, int mode, long nanos) { boolean isData = (e != null); - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + Node s = null; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { if (s == null) - s = new QNode(e, isData); - QNode last = t.next; + s = new Node(e, isData); + Node last = t.next; if (last != null) { if (t == tail.get()) tail.compareAndSet(t, last); @@ -196,13 +204,13 @@ public class LinkedTransferQueue exte } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : (E) x; } } } @@ -212,19 +220,19 @@ public class LinkedTransferQueue exte /** * Version of xfer for poll() and tryTransfer, which - * simplifies control paths both here and in xfer + * simplifies control paths both here and in xfer. */ - private Object fulfill(Object e) { + private E fulfill(E e) { boolean isData = (e != null); - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -233,14 +241,14 @@ public class LinkedTransferQueue exte } } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : (E) x; } } } @@ -258,12 +266,12 @@ public class LinkedTransferQueue exte * @param nanos timeout value * @return matched item, or s if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { + private E awaitFulfill(Node pred, Node s, E e, + int mode, long nanos) { if (mode == NOWAIT) return null; - long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0; + long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = -1; // set to desired spin count below for (;;) { @@ -272,13 +280,13 @@ public class LinkedTransferQueue exte Object x = s.get(); if (x != e) { // Node was matched or cancelled advanceHead(pred, s); // unlink if head - if (x == s) { // was cancelled + if (x == s) { // was cancelled clean(pred, s); return null; } else if (x != null) { s.set(s); // avoid garbage retention - return x; + return (E) x; } else return e; @@ -293,9 +301,9 @@ public class LinkedTransferQueue exte } } if (spins < 0) { - QNode h = head.get(); // only spin if at head + Node h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? - (mode == TIMEOUT? + ((mode == TIMEOUT) ? maxTimedSpins : maxUntimedSpins) : 0); } if (spins > 0) @@ -316,18 +324,18 @@ public class LinkedTransferQueue exte } /** - * Returns validated tail for use in cleaning methods + * Returns validated tail for use in cleaning methods. */ - private QNode getValidatedTail() { + private Node getValidatedTail() { for (;;) { - QNode h = head.get(); - QNode first = h.next; + Node h = head.get(); + Node first = h.next; if (first != null && first.next == first) { // help advance advanceHead(h, first); continue; } - QNode t = tail.get(); - QNode last = t.next; + Node t = tail.get(); + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); // help advance @@ -339,10 +347,11 @@ public class LinkedTransferQueue exte /** * Gets rid of cancelled node s with original predecessor pred. + * * @param pred predecessor of cancelled node * @param s the cancelled node */ - private void clean(QNode pred, QNode s) { + private void clean(Node pred, Node s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -362,10 +371,10 @@ public class LinkedTransferQueue exte * processed, so this always terminates. */ while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); + Node oldpred = reclean(); // First, help get rid of cleanMe + Node t = getValidatedTail(); if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list + Node sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) break; } @@ -378,9 +387,10 @@ public class LinkedTransferQueue exte /** * Tries to unsplice the cancelled node held in cleanMe that was * previously uncleanable because it was at tail. + * * @return current cleanMe node (or null) */ - private QNode reclean() { + private Node reclean() { /* * cleanMe is, or at one time was, predecessor of cancelled * node s that was the tail so could not be unspliced. If s @@ -392,12 +402,12 @@ public class LinkedTransferQueue exte * This can loop only due to contention on casNext or * clearing cleanMe. */ - QNode pred; + Node pred; while ((pred = cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; + Node t = getValidatedTail(); + Node s = pred.next; if (s != t) { - QNode sn; + Node sn; if (s == null || s == pred || s.get() != s || (sn = s.next) == s || pred.casNext(s, sn)) cleanMe.compareAndSet(pred, null); @@ -412,16 +422,17 @@ public class LinkedTransferQueue exte * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + Node dummy = new Node(null, false); + head = new PaddedAtomicReference>(dummy); + tail = new PaddedAtomicReference>(dummy); + cleanMe = new PaddedAtomicReference>(null); } /** * Creates a {@code LinkedTransferQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. + * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null @@ -483,7 +494,7 @@ public class LinkedTransferQueue exte public E take() throws InterruptedException { Object e = xfer(null, WAIT, 0); if (e != null) - return (E)e; + return (E) e; Thread.interrupted(); throw new InterruptedException(); } @@ -491,12 +502,12 @@ public class LinkedTransferQueue exte public E poll(long timeout, TimeUnit unit) throws InterruptedException { Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return (E)e; + return (E) e; throw new InterruptedException(); } public E poll() { - return (E)fulfill(null); + return fulfill(null); } public int drainTo(Collection c) { @@ -530,15 +541,15 @@ public class LinkedTransferQueue exte // Traversal-based methods /** - * Return head after performing any outstanding helping steps + * Returns head after performing any outstanding helping steps. */ - private QNode traversalHead() { + private Node traversalHead() { for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; + Node last = t.next; + Node first = h.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -570,24 +581,24 @@ public class LinkedTransferQueue exte * if subsequently removed. */ class Itr implements Iterator { - QNode next; // node to return next - QNode pnext; // predecessor of next - QNode snext; // successor of next - QNode curr; // last returned node, for remove() - QNode pcurr; // predecessor of curr, for remove() - E nextItem; // Cache of next item, once commited to in next + Node next; // node to return next + Node pnext; // predecessor of next + Node snext; // successor of next + Node curr; // last returned node, for remove() + Node pcurr; // predecessor of curr, for remove() + E nextItem; // Cache of next item, once committed to in next Itr() { findNext(); } /** - * Ensure next points to next valid node, or null if none. + * Ensures next points to next valid node, or null if none. */ void findNext() { for (;;) { - QNode pred = pnext; - QNode q = next; + Node pred = pnext; + Node q = next; if (pred == null || pred == q) { pred = traversalHead(); q = pred.next; @@ -597,9 +608,9 @@ public class LinkedTransferQueue exte return; } Object x = q.get(); - QNode s = q.next; + Node s = q.next; if (x != null && q != x && q != s) { - nextItem = (E)x; + nextItem = (E) x; snext = s; pnext = pred; next = q; @@ -626,7 +637,7 @@ public class LinkedTransferQueue exte } public void remove() { - QNode p = curr; + Node p = curr; if (p == null) throw new IllegalStateException(); Object x = p.get(); @@ -637,8 +648,8 @@ public class LinkedTransferQueue exte public E peek() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return null; Object x = p.get(); @@ -646,15 +657,15 @@ public class LinkedTransferQueue exte if (!p.isData) return null; if (x != null) - return (E)x; + return (E) x; } } } public boolean isEmpty() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return true; Object x = p.get(); @@ -669,8 +680,8 @@ public class LinkedTransferQueue exte public boolean hasWaitingConsumer() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return false; Object x = p.get(); @@ -693,8 +704,8 @@ public class LinkedTransferQueue exte */ public int size() { int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { + Node h = traversalHead(); + for (Node p = h.next; p != null && p.isData; p = p.next) { Object x = p.get(); if (x != null && x != p) { if (++count == Integer.MAX_VALUE) // saturated @@ -706,8 +717,8 @@ public class LinkedTransferQueue exte public int getWaitingConsumerCount() { int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { + Node h = traversalHead(); + for (Node p = h.next; p != null && !p.isData; p = p.next) { if (p.get() == null) { if (++count == Integer.MAX_VALUE) break; @@ -724,15 +735,15 @@ public class LinkedTransferQueue exte if (o == null) return false; for (;;) { - QNode pred = traversalHead(); + Node pred = traversalHead(); for (;;) { - QNode q = pred.next; + Node q = pred.next; if (q == null || !q.isData) return false; if (q == pred) // restart break; Object x = q.get(); - if (x != null && x != q && o.equals(x) && + if (x != null && x != q && o.equals(x) && q.compareAndSet(x, q)) { clean(pred, q); return true; @@ -761,6 +772,7 @@ public class LinkedTransferQueue exte /** * Reconstitute the Queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -768,7 +780,7 @@ public class LinkedTransferQueue exte s.defaultReadObject(); resetHeadAndTail(); for (;;) { - E item = (E)s.readObject(); + @SuppressWarnings("unchecked") E item = (E) s.readObject(); if (item == null) break; else @@ -776,61 +788,60 @@ public class LinkedTransferQueue exte } } - // Support for resetting head/tail while deserializing private void resetHeadAndTail() { - QNode dummy = new QNode(null, false); - _unsafe.putObjectVolatile(this, headOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference(null)); + Node dummy = new Node(null, false); + UNSAFE.putObjectVolatile(this, headOffset, + new PaddedAtomicReference>(dummy)); + UNSAFE.putObjectVolatile(this, tailOffset, + new PaddedAtomicReference>(dummy)); + UNSAFE.putObjectVolatile(this, cleanMeOffset, + new PaddedAtomicReference>(null)); } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + // Unsafe mechanics for jsr166y 3rd party package. + private static sun.misc.Unsafe getUnsafe() { try { - return Unsafe.getUnsafe(); + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + return getUnsafeByReflection(); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - private static Unsafe getUnsafePrivileged() + private static sun.misc.Unsafe getUnsafeByReflection() throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); + java.lang.reflect.Field f = + sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); - return (Unsafe) f.get(null); + return (sun.misc.Unsafe) f.get(null); } - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField(fieldName)); - } - - private static final Unsafe _unsafe; - private static final long headOffset; - private static final long tailOffset; - private static final long cleanMeOffset; - static { + private static long fieldOffset(String fieldName, Class klazz) { try { - _unsafe = getUnsafe(); - headOffset = fieldOffset("head"); - tailOffset = fieldOffset("tail"); - cleanMeOffset = fieldOffset("cleanMe"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName)); + } catch (NoSuchFieldException e) { + // Convert Exception to Error + NoSuchFieldError error = new NoSuchFieldError(fieldName); + error.initCause(e); + throw error; } } + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long headOffset = + fieldOffset("head", LinkedTransferQueue.class); + private static final long tailOffset = + fieldOffset("tail", LinkedTransferQueue.class); + private static final long cleanMeOffset = + fieldOffset("cleanMe", LinkedTransferQueue.class); + }