--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/23 23:23:41 1.24 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/30 13:30:19 1.33 @@ -5,13 +5,15 @@ */ package jsr166y; + import java.util.concurrent.*; -import java.util.concurrent.locks.*; -import java.util.concurrent.atomic.*; -import java.util.*; -import java.io.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.atomic.AtomicReference; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -102,25 +104,56 @@ public class LinkedTransferQueue exte * garbage retention. Similarly, setting the next field to this is * used as sentinel that node is off list. */ - static final class QNode extends AtomicReference { - volatile QNode next; + static final class Node extends AtomicReference { + volatile Node next; volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { + + Node(E item, boolean isData) { super(item); this.isData = isData; } - static final AtomicReferenceFieldUpdater - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); + // Unsafe mechanics - final boolean casNext(QNode cmp, QNode val) { - return nextUpdater.compareAndSet(this, cmp, val); + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); + + final boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final void clearNext() { - nextUpdater.lazySet(this, this); + UNSAFE.putOrderedObject(this, nextOffset, this); + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } } private static final long serialVersionUID = -3375979862319811754L; @@ -140,23 +173,23 @@ public class LinkedTransferQueue exte /** head of the queue */ - private transient final PaddedAtomicReference head; + private transient final PaddedAtomicReference> head; /** tail of the queue */ - private transient final PaddedAtomicReference tail; + private transient final PaddedAtomicReference> tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private transient final PaddedAtomicReference cleanMe; + private transient final PaddedAtomicReference> cleanMe; /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ - private boolean advanceHead(QNode h, QNode nh) { + private boolean advanceHead(Node h, Node nh) { if (h == head.get() && head.compareAndSet(h, nh)) { h.clearNext(); // forget old next return true; @@ -174,20 +207,20 @@ public class LinkedTransferQueue exte * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure */ - private Object xfer(Object e, int mode, long nanos) { + private E xfer(E e, int mode, long nanos) { boolean isData = (e != null); - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + Node s = null; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { if (s == null) - s = new QNode(e, isData); - QNode last = t.next; + s = new Node(e, isData); + Node last = t.next; if (last != null) { if (t == tail.get()) tail.compareAndSet(t, last); @@ -199,13 +232,13 @@ public class LinkedTransferQueue exte } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData ? e : x; + return isData ? e : (E) x; } } } @@ -217,17 +250,17 @@ public class LinkedTransferQueue exte * Version of xfer for poll() and tryTransfer, which * simplifies control paths both here and in xfer. */ - private Object fulfill(Object e) { + private E fulfill(E e) { boolean isData = (e != null); - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -236,14 +269,14 @@ public class LinkedTransferQueue exte } } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData ? e : x; + return isData ? e : (E) x; } } } @@ -261,8 +294,8 @@ public class LinkedTransferQueue exte * @param nanos timeout value * @return matched item, or s if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { + private E awaitFulfill(Node pred, Node s, E e, + int mode, long nanos) { if (mode == NOWAIT) return null; @@ -281,7 +314,7 @@ public class LinkedTransferQueue exte } else if (x != null) { s.set(s); // avoid garbage retention - return x; + return (E) x; } else return e; @@ -296,7 +329,7 @@ public class LinkedTransferQueue exte } } if (spins < 0) { - QNode h = head.get(); // only spin if at head + Node h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? ((mode == TIMEOUT) ? maxTimedSpins : maxUntimedSpins) : 0); @@ -321,16 +354,16 @@ public class LinkedTransferQueue exte /** * Returns validated tail for use in cleaning methods. */ - private QNode getValidatedTail() { + private Node getValidatedTail() { for (;;) { - QNode h = head.get(); - QNode first = h.next; + Node h = head.get(); + Node first = h.next; if (first != null && first.next == first) { // help advance advanceHead(h, first); continue; } - QNode t = tail.get(); - QNode last = t.next; + Node t = tail.get(); + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); // help advance @@ -346,7 +379,7 @@ public class LinkedTransferQueue exte * @param pred predecessor of cancelled node * @param s the cancelled node */ - private void clean(QNode pred, QNode s) { + private void clean(Node pred, Node s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -366,10 +399,10 @@ public class LinkedTransferQueue exte * processed, so this always terminates. */ while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); + Node oldpred = reclean(); // First, help get rid of cleanMe + Node t = getValidatedTail(); if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list + Node sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) break; } @@ -385,7 +418,7 @@ public class LinkedTransferQueue exte * * @return current cleanMe node (or null) */ - private QNode reclean() { + private Node reclean() { /* * cleanMe is, or at one time was, predecessor of cancelled * node s that was the tail so could not be unspliced. If s @@ -397,12 +430,12 @@ public class LinkedTransferQueue exte * This can loop only due to contention on casNext or * clearing cleanMe. */ - QNode pred; + Node pred; while ((pred = cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; + Node t = getValidatedTail(); + Node s = pred.next; if (s != t) { - QNode sn; + Node sn; if (s == null || s == pred || s.get() != s || (sn = s.next) == s || pred.casNext(s, sn)) cleanMe.compareAndSet(pred, null); @@ -417,10 +450,10 @@ public class LinkedTransferQueue exte * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + Node dummy = new Node(null, false); + head = new PaddedAtomicReference>(dummy); + tail = new PaddedAtomicReference>(dummy); + cleanMe = new PaddedAtomicReference>(null); } /** @@ -437,12 +470,20 @@ public class LinkedTransferQueue exte addAll(c); } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (Thread.interrupted()) throw new InterruptedException(); xfer(e, NOWAIT, 0); } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -451,18 +492,28 @@ public class LinkedTransferQueue exte return true; } + /** + * @throws NullPointerException {@inheritDoc} + */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); xfer(e, NOWAIT, 0); return true; } + /** + * @throws NullPointerException {@inheritDoc} + */ public boolean add(E e) { if (e == null) throw new NullPointerException(); xfer(e, NOWAIT, 0); return true; } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { @@ -471,6 +522,10 @@ public class LinkedTransferQueue exte } } + /** + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -481,11 +536,17 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } + /** + * @throws NullPointerException {@inheritDoc} + */ public boolean tryTransfer(E e) { if (e == null) throw new NullPointerException(); return fulfill(e) != null; } + /** + * @throws InterruptedException {@inheritDoc} + */ public E take() throws InterruptedException { Object e = xfer(null, WAIT, 0); if (e != null) @@ -494,6 +555,9 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } + /** + * @throws InterruptedException {@inheritDoc} + */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) @@ -502,9 +566,13 @@ public class LinkedTransferQueue exte } public E poll() { - return (E) fulfill(null); + return fulfill(null); } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); @@ -519,6 +587,10 @@ public class LinkedTransferQueue exte return n; } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); @@ -538,13 +610,13 @@ public class LinkedTransferQueue exte /** * Returns head after performing any outstanding helping steps. */ - private QNode traversalHead() { + private Node traversalHead() { for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; + Node last = t.next; + Node first = h.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -563,7 +635,6 @@ public class LinkedTransferQueue exte } } - public Iterator iterator() { return new Itr(); } @@ -576,44 +647,41 @@ public class LinkedTransferQueue exte * if subsequently removed. */ class Itr implements Iterator { - QNode next; // node to return next - QNode pnext; // predecessor of next - QNode snext; // successor of next - QNode curr; // last returned node, for remove() - QNode pcurr; // predecessor of curr, for remove() - E nextItem; // Cache of next item, once committed to in next + Node next; // node to return next + Node pnext; // predecessor of next + Node curr; // last returned node, for remove() + Node pcurr; // predecessor of curr, for remove() + E nextItem; // Cache of next item, once committed to in next Itr() { - findNext(); + advance(); } /** - * Ensures next points to next valid node, or null if none. + * Moves to next valid node and returns item to return for + * next(), or null if no such. */ - void findNext() { + private E advance() { + pcurr = pnext; + curr = next; + E item = nextItem; + for (;;) { - QNode pred = pnext; - QNode q = next; - if (pred == null || pred == q) { - pred = traversalHead(); - q = pred.next; - } - if (q == null || !q.isData) { + pnext = next == null ? traversalHead() : next; + next = pnext.next; + if (next == pnext) { next = null; - return; + continue; // restart } - Object x = q.get(); - QNode s = q.next; - if (x != null && q != x && q != s) { + if (next == null) + break; + Object x = next.get(); + if (x != null && x != next) { nextItem = (E) x; - snext = s; - pnext = pred; - next = q; - return; + break; } - pnext = q; - next = s; } + return item; } public boolean hasNext() { @@ -621,18 +689,13 @@ public class LinkedTransferQueue exte } public E next() { - if (next == null) throw new NoSuchElementException(); - pcurr = pnext; - curr = next; - pnext = next; - next = snext; - E x = nextItem; - findNext(); - return x; + if (next == null) + throw new NoSuchElementException(); + return advance(); } public void remove() { - QNode p = curr; + Node p = curr; if (p == null) throw new IllegalStateException(); Object x = p.get(); @@ -643,8 +706,8 @@ public class LinkedTransferQueue exte public E peek() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return null; Object x = p.get(); @@ -659,8 +722,8 @@ public class LinkedTransferQueue exte public boolean isEmpty() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return true; Object x = p.get(); @@ -675,8 +738,8 @@ public class LinkedTransferQueue exte public boolean hasWaitingConsumer() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return false; Object x = p.get(); @@ -698,45 +761,57 @@ public class LinkedTransferQueue exte * @return the number of elements in this queue */ public int size() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { - Object x = p.get(); - if (x != null && x != p) { - if (++count == Integer.MAX_VALUE) // saturated + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart break; + if (q == null || !q.isData) + return count; + Object x = q.get(); + if (x != null && x != q) { + if (++count == Integer.MAX_VALUE) // saturated + return count; + } + pred = q; } } - return count; } public int getWaitingConsumerCount() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { - if (p.get() == null) { - if (++count == Integer.MAX_VALUE) + // converse of size -- count valid non-data nodes + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart break; + if (q == null || q.isData) + return count; + Object x = q.get(); + if (x == null) { + if (++count == Integer.MAX_VALUE) // saturated + return count; + } + pred = q; } } - return count; - } - - public int remainingCapacity() { - return Integer.MAX_VALUE; } public boolean remove(Object o) { if (o == null) return false; for (;;) { - QNode pred = traversalHead(); + Node pred = traversalHead(); for (;;) { - QNode q = pred.next; - if (q == null || !q.isData) - return false; + Node q = pred.next; if (q == pred) // restart break; + if (q == null || !q.isData) + return false; Object x = q.get(); if (x != null && x != q && o.equals(x) && q.compareAndSet(x, q)) { @@ -748,6 +823,10 @@ public class LinkedTransferQueue exte } } + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + /** * Save the state to a stream (that is, serialize it). * @@ -775,7 +854,7 @@ public class LinkedTransferQueue exte s.defaultReadObject(); resetHeadAndTail(); for (;;) { - E item = (E) s.readObject(); + @SuppressWarnings("unchecked") E item = (E) s.readObject(); if (item == null) break; else @@ -783,61 +862,65 @@ public class LinkedTransferQueue exte } } - // Support for resetting head/tail while deserializing private void resetHeadAndTail() { - QNode dummy = new QNode(null, false); + Node dummy = new Node(null, false); UNSAFE.putObjectVolatile(this, headOffset, - new PaddedAtomicReference(dummy)); + new PaddedAtomicReference>(dummy)); UNSAFE.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference(dummy)); + new PaddedAtomicReference>(dummy)); UNSAFE.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference(null)); + new PaddedAtomicReference>(null)); } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); + private static final long cleanMeOffset = + objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); + + + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { try { - return Unsafe.getUnsafe(); + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return UNSAFE.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField(fieldName)); - } - - private static final Unsafe UNSAFE; - private static final long headOffset; - private static final long tailOffset; - private static final long cleanMeOffset; - static { - try { - UNSAFE = getUnsafe(); - headOffset = fieldOffset("head"); - tailOffset = fieldOffset("tail"); - cleanMeOffset = fieldOffset("cleanMe"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } - }