--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/20 22:26:03 1.18 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/29 02:17:02 1.31 @@ -5,13 +5,15 @@ */ package jsr166y; + import java.util.concurrent.*; -import java.util.concurrent.locks.*; -import java.util.concurrent.atomic.*; -import java.util.*; -import java.io.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.atomic.AtomicReference; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -44,7 +46,6 @@ import java.lang.reflect.*; * @since 1.7 * @author Doug Lea * @param the type of elements held in this collection - * */ public class LinkedTransferQueue extends AbstractQueue implements TransferQueue, java.io.Serializable { @@ -81,7 +82,7 @@ public class LinkedTransferQueue exte * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; + static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * The number of times to spin before blocking in untimed waits. @@ -103,27 +104,59 @@ public class LinkedTransferQueue exte * garbage retention. Similarly, setting the next field to this is * used as sentinel that node is off list. */ - static final class QNode extends AtomicReference { - volatile QNode next; + static final class Node extends AtomicReference { + volatile Node next; volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { + + Node(E item, boolean isData) { super(item); this.isData = isData; } - static final AtomicReferenceFieldUpdater - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); - - final boolean casNext(QNode cmp, QNode val) { - return nextUpdater.compareAndSet(this, cmp, val); + final boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final void clearNext() { - nextUpdater.lazySet(this, this); + UNSAFE.putOrderedObject(this, nextOffset, this); + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } } + private static final long serialVersionUID = -3375979862319811754L; } /** @@ -135,26 +168,28 @@ public class LinkedTransferQueue exte // enough padding for 64bytes with 4byte refs Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; PaddedAtomicReference(T r) { super(r); } + private static final long serialVersionUID = 8170090609809740854L; } /** head of the queue */ - private transient final PaddedAtomicReference head; + private transient final PaddedAtomicReference> head; + /** tail of the queue */ - private transient final PaddedAtomicReference tail; + private transient final PaddedAtomicReference> tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private transient final PaddedAtomicReference cleanMe; + private transient final PaddedAtomicReference> cleanMe; /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ - private boolean advanceHead(QNode h, QNode nh) { + private boolean advanceHead(Node h, Node nh) { if (h == head.get() && head.compareAndSet(h, nh)) { h.clearNext(); // forget old next return true; @@ -172,20 +207,20 @@ public class LinkedTransferQueue exte * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure */ - private Object xfer(Object e, int mode, long nanos) { + private E xfer(E e, int mode, long nanos) { boolean isData = (e != null); - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + Node s = null; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { if (s == null) - s = new QNode(e, isData); - QNode last = t.next; + s = new Node(e, isData); + Node last = t.next; if (last != null) { if (t == tail.get()) tail.compareAndSet(t, last); @@ -197,13 +232,13 @@ public class LinkedTransferQueue exte } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : (E) x; } } } @@ -215,17 +250,17 @@ public class LinkedTransferQueue exte * Version of xfer for poll() and tryTransfer, which * simplifies control paths both here and in xfer. */ - private Object fulfill(Object e) { + private E fulfill(E e) { boolean isData = (e != null); - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -234,14 +269,14 @@ public class LinkedTransferQueue exte } } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : (E) x; } } } @@ -259,12 +294,12 @@ public class LinkedTransferQueue exte * @param nanos timeout value * @return matched item, or s if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { + private E awaitFulfill(Node pred, Node s, E e, + int mode, long nanos) { if (mode == NOWAIT) return null; - long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0; + long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = -1; // set to desired spin count below for (;;) { @@ -279,7 +314,7 @@ public class LinkedTransferQueue exte } else if (x != null) { s.set(s); // avoid garbage retention - return x; + return (E) x; } else return e; @@ -294,9 +329,9 @@ public class LinkedTransferQueue exte } } if (spins < 0) { - QNode h = head.get(); // only spin if at head + Node h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? - (mode == TIMEOUT? + ((mode == TIMEOUT) ? maxTimedSpins : maxUntimedSpins) : 0); } if (spins > 0) @@ -319,16 +354,16 @@ public class LinkedTransferQueue exte /** * Returns validated tail for use in cleaning methods. */ - private QNode getValidatedTail() { + private Node getValidatedTail() { for (;;) { - QNode h = head.get(); - QNode first = h.next; + Node h = head.get(); + Node first = h.next; if (first != null && first.next == first) { // help advance advanceHead(h, first); continue; } - QNode t = tail.get(); - QNode last = t.next; + Node t = tail.get(); + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); // help advance @@ -344,7 +379,7 @@ public class LinkedTransferQueue exte * @param pred predecessor of cancelled node * @param s the cancelled node */ - private void clean(QNode pred, QNode s) { + private void clean(Node pred, Node s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -364,10 +399,10 @@ public class LinkedTransferQueue exte * processed, so this always terminates. */ while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); + Node oldpred = reclean(); // First, help get rid of cleanMe + Node t = getValidatedTail(); if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list + Node sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) break; } @@ -383,7 +418,7 @@ public class LinkedTransferQueue exte * * @return current cleanMe node (or null) */ - private QNode reclean() { + private Node reclean() { /* * cleanMe is, or at one time was, predecessor of cancelled * node s that was the tail so could not be unspliced. If s @@ -395,12 +430,12 @@ public class LinkedTransferQueue exte * This can loop only due to contention on casNext or * clearing cleanMe. */ - QNode pred; + Node pred; while ((pred = cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; + Node t = getValidatedTail(); + Node s = pred.next; if (s != t) { - QNode sn; + Node sn; if (s == null || s == pred || s.get() != s || (sn = s.next) == s || pred.casNext(s, sn)) cleanMe.compareAndSet(pred, null); @@ -415,10 +450,10 @@ public class LinkedTransferQueue exte * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + Node dummy = new Node(null, false); + head = new PaddedAtomicReference>(dummy); + tail = new PaddedAtomicReference>(dummy); + cleanMe = new PaddedAtomicReference>(null); } /** @@ -435,12 +470,20 @@ public class LinkedTransferQueue exte addAll(c); } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (Thread.interrupted()) throw new InterruptedException(); xfer(e, NOWAIT, 0); } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -449,18 +492,28 @@ public class LinkedTransferQueue exte return true; } + /** + * @throws NullPointerException {@inheritDoc} + */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); xfer(e, NOWAIT, 0); return true; } + /** + * @throws NullPointerException {@inheritDoc} + */ public boolean add(E e) { if (e == null) throw new NullPointerException(); xfer(e, NOWAIT, 0); return true; } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { @@ -469,6 +522,10 @@ public class LinkedTransferQueue exte } } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -479,30 +536,43 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } + /** + * @throws NullPointerException {@inheritDoc} + */ public boolean tryTransfer(E e) { if (e == null) throw new NullPointerException(); return fulfill(e) != null; } + /** + * @throws InterruptedException {@inheritDoc} + */ public E take() throws InterruptedException { Object e = xfer(null, WAIT, 0); if (e != null) - return (E)e; + return (E) e; Thread.interrupted(); throw new InterruptedException(); } + /** + * @throws InterruptedException {@inheritDoc} + */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return (E)e; + return (E) e; throw new InterruptedException(); } public E poll() { - return (E)fulfill(null); + return fulfill(null); } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); @@ -517,6 +587,10 @@ public class LinkedTransferQueue exte return n; } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); @@ -536,13 +610,13 @@ public class LinkedTransferQueue exte /** * Returns head after performing any outstanding helping steps. */ - private QNode traversalHead() { + private Node traversalHead() { for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; + Node last = t.next; + Node first = h.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -574,11 +648,11 @@ public class LinkedTransferQueue exte * if subsequently removed. */ class Itr implements Iterator { - QNode next; // node to return next - QNode pnext; // predecessor of next - QNode snext; // successor of next - QNode curr; // last returned node, for remove() - QNode pcurr; // predecessor of curr, for remove() + Node next; // node to return next + Node pnext; // predecessor of next + Node snext; // successor of next + Node curr; // last returned node, for remove() + Node pcurr; // predecessor of curr, for remove() E nextItem; // Cache of next item, once committed to in next Itr() { @@ -590,8 +664,8 @@ public class LinkedTransferQueue exte */ void findNext() { for (;;) { - QNode pred = pnext; - QNode q = next; + Node pred = pnext; + Node q = next; if (pred == null || pred == q) { pred = traversalHead(); q = pred.next; @@ -601,9 +675,9 @@ public class LinkedTransferQueue exte return; } Object x = q.get(); - QNode s = q.next; + Node s = q.next; if (x != null && q != x && q != s) { - nextItem = (E)x; + nextItem = (E) x; snext = s; pnext = pred; next = q; @@ -630,7 +704,7 @@ public class LinkedTransferQueue exte } public void remove() { - QNode p = curr; + Node p = curr; if (p == null) throw new IllegalStateException(); Object x = p.get(); @@ -641,8 +715,8 @@ public class LinkedTransferQueue exte public E peek() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return null; Object x = p.get(); @@ -650,15 +724,15 @@ public class LinkedTransferQueue exte if (!p.isData) return null; if (x != null) - return (E)x; + return (E) x; } } } public boolean isEmpty() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return true; Object x = p.get(); @@ -673,8 +747,8 @@ public class LinkedTransferQueue exte public boolean hasWaitingConsumer() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return false; Object x = p.get(); @@ -697,8 +771,8 @@ public class LinkedTransferQueue exte */ public int size() { int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { + Node h = traversalHead(); + for (Node p = h.next; p != null && p.isData; p = p.next) { Object x = p.get(); if (x != null && x != p) { if (++count == Integer.MAX_VALUE) // saturated @@ -710,8 +784,8 @@ public class LinkedTransferQueue exte public int getWaitingConsumerCount() { int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { + Node h = traversalHead(); + for (Node p = h.next; p != null && !p.isData; p = p.next) { if (p.get() == null) { if (++count == Integer.MAX_VALUE) break; @@ -728,9 +802,9 @@ public class LinkedTransferQueue exte if (o == null) return false; for (;;) { - QNode pred = traversalHead(); + Node pred = traversalHead(); for (;;) { - QNode q = pred.next; + Node q = pred.next; if (q == null || !q.isData) return false; if (q == pred) // restart @@ -765,6 +839,7 @@ public class LinkedTransferQueue exte /** * Reconstitute the Queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -772,7 +847,7 @@ public class LinkedTransferQueue exte s.defaultReadObject(); resetHeadAndTail(); for (;;) { - E item = (E)s.readObject(); + @SuppressWarnings("unchecked") E item = (E) s.readObject(); if (item == null) break; else @@ -780,61 +855,65 @@ public class LinkedTransferQueue exte } } - // Support for resetting head/tail while deserializing private void resetHeadAndTail() { - QNode dummy = new QNode(null, false); - _unsafe.putObjectVolatile(this, headOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference(null)); + Node dummy = new Node(null, false); + UNSAFE.putObjectVolatile(this, headOffset, + new PaddedAtomicReference>(dummy)); + UNSAFE.putObjectVolatile(this, tailOffset, + new PaddedAtomicReference>(dummy)); + UNSAFE.putObjectVolatile(this, cleanMeOffset, + new PaddedAtomicReference>(null)); + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); + private static final long cleanMeOffset = + objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); + + + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { try { - return Unsafe.getUnsafe(); + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField(fieldName)); - } - - private static final Unsafe _unsafe; - private static final long headOffset; - private static final long tailOffset; - private static final long cleanMeOffset; - static { - try { - _unsafe = getUnsafe(); - headOffset = fieldOffset("head"); - tailOffset = fieldOffset("tail"); - cleanMeOffset = fieldOffset("cleanMe"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } - }