--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/23 23:23:41 1.24 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/24 23:48:26 1.25 @@ -10,8 +10,6 @@ import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; import java.util.*; import java.io.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -102,20 +100,22 @@ public class LinkedTransferQueue exte * garbage retention. Similarly, setting the next field to this is * used as sentinel that node is off list. */ - static final class QNode extends AtomicReference { - volatile QNode next; + static final class Node extends AtomicReference { + volatile Node next; volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { + + Node(E item, boolean isData) { super(item); this.isData = isData; } - static final AtomicReferenceFieldUpdater + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); + (Node.class, Node.class, "next"); - final boolean casNext(QNode cmp, QNode val) { + final boolean casNext(Node cmp, Node val) { return nextUpdater.compareAndSet(this, cmp, val); } @@ -140,23 +140,23 @@ public class LinkedTransferQueue exte /** head of the queue */ - private transient final PaddedAtomicReference head; + private transient final PaddedAtomicReference> head; /** tail of the queue */ - private transient final PaddedAtomicReference tail; + private transient final PaddedAtomicReference> tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private transient final PaddedAtomicReference cleanMe; + private transient final PaddedAtomicReference> cleanMe; /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ - private boolean advanceHead(QNode h, QNode nh) { + private boolean advanceHead(Node h, Node nh) { if (h == head.get() && head.compareAndSet(h, nh)) { h.clearNext(); // forget old next return true; @@ -174,20 +174,20 @@ public class LinkedTransferQueue exte * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure */ - private Object xfer(Object e, int mode, long nanos) { + private E xfer(E e, int mode, long nanos) { boolean isData = (e != null); - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + Node s = null; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { if (s == null) - s = new QNode(e, isData); - QNode last = t.next; + s = new Node(e, isData); + Node last = t.next; if (last != null) { if (t == tail.get()) tail.compareAndSet(t, last); @@ -199,13 +199,13 @@ public class LinkedTransferQueue exte } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData ? e : x; + return isData ? e : (E) x; } } } @@ -217,17 +217,17 @@ public class LinkedTransferQueue exte * Version of xfer for poll() and tryTransfer, which * simplifies control paths both here and in xfer. */ - private Object fulfill(Object e) { + private E fulfill(E e) { boolean isData = (e != null); - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -236,14 +236,14 @@ public class LinkedTransferQueue exte } } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData ? e : x; + return isData ? e : (E) x; } } } @@ -261,8 +261,8 @@ public class LinkedTransferQueue exte * @param nanos timeout value * @return matched item, or s if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { + private E awaitFulfill(Node pred, Node s, E e, + int mode, long nanos) { if (mode == NOWAIT) return null; @@ -281,7 +281,7 @@ public class LinkedTransferQueue exte } else if (x != null) { s.set(s); // avoid garbage retention - return x; + return (E) x; } else return e; @@ -296,7 +296,7 @@ public class LinkedTransferQueue exte } } if (spins < 0) { - QNode h = head.get(); // only spin if at head + Node h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? ((mode == TIMEOUT) ? maxTimedSpins : maxUntimedSpins) : 0); @@ -321,16 +321,16 @@ public class LinkedTransferQueue exte /** * Returns validated tail for use in cleaning methods. */ - private QNode getValidatedTail() { + private Node getValidatedTail() { for (;;) { - QNode h = head.get(); - QNode first = h.next; + Node h = head.get(); + Node first = h.next; if (first != null && first.next == first) { // help advance advanceHead(h, first); continue; } - QNode t = tail.get(); - QNode last = t.next; + Node t = tail.get(); + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); // help advance @@ -346,7 +346,7 @@ public class LinkedTransferQueue exte * @param pred predecessor of cancelled node * @param s the cancelled node */ - private void clean(QNode pred, QNode s) { + private void clean(Node pred, Node s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -366,10 +366,10 @@ public class LinkedTransferQueue exte * processed, so this always terminates. */ while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); + Node oldpred = reclean(); // First, help get rid of cleanMe + Node t = getValidatedTail(); if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list + Node sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) break; } @@ -385,7 +385,7 @@ public class LinkedTransferQueue exte * * @return current cleanMe node (or null) */ - private QNode reclean() { + private Node reclean() { /* * cleanMe is, or at one time was, predecessor of cancelled * node s that was the tail so could not be unspliced. If s @@ -397,12 +397,12 @@ public class LinkedTransferQueue exte * This can loop only due to contention on casNext or * clearing cleanMe. */ - QNode pred; + Node pred; while ((pred = cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; + Node t = getValidatedTail(); + Node s = pred.next; if (s != t) { - QNode sn; + Node sn; if (s == null || s == pred || s.get() != s || (sn = s.next) == s || pred.casNext(s, sn)) cleanMe.compareAndSet(pred, null); @@ -417,10 +417,10 @@ public class LinkedTransferQueue exte * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + Node dummy = new Node(null, false); + head = new PaddedAtomicReference>(dummy); + tail = new PaddedAtomicReference>(dummy); + cleanMe = new PaddedAtomicReference>(null); } /** @@ -502,7 +502,7 @@ public class LinkedTransferQueue exte } public E poll() { - return (E) fulfill(null); + return fulfill(null); } public int drainTo(Collection c) { @@ -538,13 +538,13 @@ public class LinkedTransferQueue exte /** * Returns head after performing any outstanding helping steps. */ - private QNode traversalHead() { + private Node traversalHead() { for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; + Node last = t.next; + Node first = h.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -576,11 +576,11 @@ public class LinkedTransferQueue exte * if subsequently removed. */ class Itr implements Iterator { - QNode next; // node to return next - QNode pnext; // predecessor of next - QNode snext; // successor of next - QNode curr; // last returned node, for remove() - QNode pcurr; // predecessor of curr, for remove() + Node next; // node to return next + Node pnext; // predecessor of next + Node snext; // successor of next + Node curr; // last returned node, for remove() + Node pcurr; // predecessor of curr, for remove() E nextItem; // Cache of next item, once committed to in next Itr() { @@ -592,8 +592,8 @@ public class LinkedTransferQueue exte */ void findNext() { for (;;) { - QNode pred = pnext; - QNode q = next; + Node pred = pnext; + Node q = next; if (pred == null || pred == q) { pred = traversalHead(); q = pred.next; @@ -603,7 +603,7 @@ public class LinkedTransferQueue exte return; } Object x = q.get(); - QNode s = q.next; + Node s = q.next; if (x != null && q != x && q != s) { nextItem = (E) x; snext = s; @@ -632,7 +632,7 @@ public class LinkedTransferQueue exte } public void remove() { - QNode p = curr; + Node p = curr; if (p == null) throw new IllegalStateException(); Object x = p.get(); @@ -643,8 +643,8 @@ public class LinkedTransferQueue exte public E peek() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return null; Object x = p.get(); @@ -659,8 +659,8 @@ public class LinkedTransferQueue exte public boolean isEmpty() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return true; Object x = p.get(); @@ -675,8 +675,8 @@ public class LinkedTransferQueue exte public boolean hasWaitingConsumer() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return false; Object x = p.get(); @@ -699,8 +699,8 @@ public class LinkedTransferQueue exte */ public int size() { int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { + Node h = traversalHead(); + for (Node p = h.next; p != null && p.isData; p = p.next) { Object x = p.get(); if (x != null && x != p) { if (++count == Integer.MAX_VALUE) // saturated @@ -712,8 +712,8 @@ public class LinkedTransferQueue exte public int getWaitingConsumerCount() { int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { + Node h = traversalHead(); + for (Node p = h.next; p != null && !p.isData; p = p.next) { if (p.get() == null) { if (++count == Integer.MAX_VALUE) break; @@ -730,9 +730,9 @@ public class LinkedTransferQueue exte if (o == null) return false; for (;;) { - QNode pred = traversalHead(); + Node pred = traversalHead(); for (;;) { - QNode q = pred.next; + Node q = pred.next; if (q == null || !q.isData) return false; if (q == pred) // restart @@ -775,7 +775,7 @@ public class LinkedTransferQueue exte s.defaultReadObject(); resetHeadAndTail(); for (;;) { - E item = (E) s.readObject(); + @SuppressWarnings("unchecked") E item = (E) s.readObject(); if (item == null) break; else @@ -783,61 +783,60 @@ public class LinkedTransferQueue exte } } - // Support for resetting head/tail while deserializing private void resetHeadAndTail() { - QNode dummy = new QNode(null, false); + Node dummy = new Node(null, false); UNSAFE.putObjectVolatile(this, headOffset, - new PaddedAtomicReference(dummy)); + new PaddedAtomicReference>(dummy)); UNSAFE.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference(dummy)); + new PaddedAtomicReference>(dummy)); UNSAFE.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference(null)); + new PaddedAtomicReference>(null)); } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + // Unsafe mechanics for jsr166y 3rd party package. + private static sun.misc.Unsafe getUnsafe() { try { - return Unsafe.getUnsafe(); + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + return getUnsafeByReflection(); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - private static Unsafe getUnsafePrivileged() + private static sun.misc.Unsafe getUnsafeByReflection() throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); + java.lang.reflect.Field f = + sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); - return (Unsafe) f.get(null); + return (sun.misc.Unsafe) f.get(null); } - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return UNSAFE.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField(fieldName)); - } - - private static final Unsafe UNSAFE; - private static final long headOffset; - private static final long tailOffset; - private static final long cleanMeOffset; - static { + private static long fieldOffset(String fieldName, Class klazz) { try { - UNSAFE = getUnsafe(); - headOffset = fieldOffset("head"); - tailOffset = fieldOffset("tail"); - cleanMeOffset = fieldOffset("cleanMe"); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName)); + } catch (NoSuchFieldException e) { + // Convert Exception to Error + NoSuchFieldError error = new NoSuchFieldError(fieldName); + error.initCause(e); + throw error; } } + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + static final long headOffset = + fieldOffset("head", LinkedTransferQueue.class); + static final long tailOffset = + fieldOffset("tail", LinkedTransferQueue.class); + static final long cleanMeOffset = + fieldOffset("cleanMe", LinkedTransferQueue.class); + }