--- jsr166/src/jsr166y/LinkedTransferQueue.java 2008/10/03 00:39:48 1.8 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/30 22:45:39 1.35 @@ -5,13 +5,17 @@ */ package jsr166y; + import java.util.concurrent.*; -import java.util.concurrent.locks.*; -import java.util.concurrent.atomic.*; -import java.util.*; -import java.io.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.atomic.AtomicReference; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -21,7 +25,7 @@ import java.lang.reflect.*; * producer. The tail of the queue is that element that has * been on the queue the shortest time for some producer. * - *

Beware that, unlike in most collections, the size + *

Beware that, unlike in most collections, the {@code size} * method is NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements. @@ -44,26 +48,25 @@ import java.lang.reflect.*; * @since 1.7 * @author Doug Lea * @param the type of elements held in this collection - * */ public class LinkedTransferQueue extends AbstractQueue implements TransferQueue, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; /* - * This is still a work in progress... - * * This class extends the approach used in FIFO-mode * SynchronousQueues. See the internal documentation, as well as * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, * Lea & Scott * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) * - * The main extension is to provide different Wait modes - * for the main "xfer" method that puts or takes items. - * These don't impact the basic dual-queue logic, but instead - * control whether or how threads block upon insertion - * of request or data nodes into the dual queue. + * The main extension is to provide different Wait modes for the + * main "xfer" method that puts or takes items. These don't + * impact the basic dual-queue logic, but instead control whether + * or how threads block upon insertion of request or data nodes + * into the dual queue. It also uses slightly different + * conventions for tracking whether nodes are off-list or + * cancelled. */ // Wait modes for xfer method @@ -81,7 +84,7 @@ public class LinkedTransferQueue exte * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; + static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * The number of times to spin before blocking in untimed waits. @@ -97,28 +100,65 @@ public class LinkedTransferQueue exte static final long spinForTimeoutThreshold = 1000L; /** - * Node class for LinkedTransferQueue. Opportunistically subclasses from - * AtomicReference to represent item. Uses Object, not E, to allow - * setting item to "this" after use, to avoid garbage - * retention. Similarly, setting the next field to this is used as - * sentinel that node is off list. + * Node class for LinkedTransferQueue. Opportunistically + * subclasses from AtomicReference to represent item. Uses Object, + * not E, to allow setting item to "this" after use, to avoid + * garbage retention. Similarly, setting the next field to this is + * used as sentinel that node is off list. */ - static final class QNode extends AtomicReference { - volatile QNode next; + static final class Node extends AtomicReference { + volatile Node next; volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { + + Node(E item, boolean isData) { super(item); this.isData = isData; } - static final AtomicReferenceFieldUpdater - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); + // Unsafe mechanics - boolean casNext(QNode cmp, QNode val) { - return nextUpdater.compareAndSet(this, cmp, val); + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); + + final boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + + final void clearNext() { + UNSAFE.putOrderedObject(this, nextOffset, this); + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } } + + private static final long serialVersionUID = -3375979862319811754L; } /** @@ -130,28 +170,30 @@ public class LinkedTransferQueue exte // enough padding for 64bytes with 4byte refs Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; PaddedAtomicReference(T r) { super(r); } + private static final long serialVersionUID = 8170090609809740854L; } /** head of the queue */ - private transient final PaddedAtomicReference head; + private transient final PaddedAtomicReference> head; + /** tail of the queue */ - private transient final PaddedAtomicReference tail; + private transient final PaddedAtomicReference> tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private transient final PaddedAtomicReference cleanMe; + private transient final PaddedAtomicReference> cleanMe; /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ - private boolean advanceHead(QNode h, QNode nh) { + private boolean advanceHead(Node h, Node nh) { if (h == head.get() && head.compareAndSet(h, nh)) { - h.next = h; // forget old next + h.clearNext(); // forget old next return true; } return false; @@ -159,26 +201,28 @@ public class LinkedTransferQueue exte /** * Puts or takes an item. Used for most queue operations (except - * poll() and tryTransfer()) + * poll() and tryTransfer()). See the similar code in + * SynchronousQueue for detailed explanation. + * * @param e the item or if null, signifies that this is a take * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure */ - private Object xfer(Object e, int mode, long nanos) { + private E xfer(E e, int mode, long nanos) { boolean isData = (e != null); - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + Node s = null; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { if (s == null) - s = new QNode(e, isData); - QNode last = t.next; + s = new Node(e, isData); + Node last = t.next; if (last != null) { if (t == tail.get()) tail.compareAndSet(t, last); @@ -190,13 +234,13 @@ public class LinkedTransferQueue exte } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : (E) x; } } } @@ -206,19 +250,19 @@ public class LinkedTransferQueue exte /** * Version of xfer for poll() and tryTransfer, which - * simplifies control paths both here and in xfer + * simplifies control paths both here and in xfer. */ - private Object fulfill(Object e) { + private E fulfill(E e) { boolean isData = (e != null); - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; + Node last = t.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -227,14 +271,14 @@ public class LinkedTransferQueue exte } } else if (h != null) { - QNode first = h.next; + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData ? e : (E) x; } } } @@ -252,12 +296,12 @@ public class LinkedTransferQueue exte * @param nanos timeout value * @return matched item, or s if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { + private E awaitFulfill(Node pred, Node s, E e, + int mode, long nanos) { if (mode == NOWAIT) return null; - long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0; + long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = -1; // set to desired spin count below for (;;) { @@ -266,16 +310,17 @@ public class LinkedTransferQueue exte Object x = s.get(); if (x != e) { // Node was matched or cancelled advanceHead(pred, s); // unlink if head - if (x == s) // was cancelled - return clean(pred, s); + if (x == s) { // was cancelled + clean(pred, s); + return null; + } else if (x != null) { s.set(s); // avoid garbage retention - return x; + return (E) x; } else return e; } - if (mode == TIMEOUT) { long now = System.nanoTime(); nanos -= now - lastTime; @@ -286,9 +331,9 @@ public class LinkedTransferQueue exte } } if (spins < 0) { - QNode h = head.get(); // only spin if at head + Node h = head.get(); // only spin if at head spins = ((h != null && h.next == s) ? - (mode == TIMEOUT? + ((mode == TIMEOUT) ? maxTimedSpins : maxUntimedSpins) : 0); } if (spins > 0) @@ -296,14 +341,12 @@ public class LinkedTransferQueue exte else if (s.waiter == null) s.waiter = w; else if (mode != TIMEOUT) { - // LockSupport.park(this); - LockSupport.park(); // allows run on java5 + LockSupport.park(this); s.waiter = null; spins = -1; } else if (nanos > spinForTimeoutThreshold) { - // LockSupport.parkNanos(this, nanos); - LockSupport.parkNanos(nanos); + LockSupport.parkNanos(this, nanos); s.waiter = null; spins = -1; } @@ -311,10 +354,34 @@ public class LinkedTransferQueue exte } /** + * Returns validated tail for use in cleaning methods. + */ + private Node getValidatedTail() { + for (;;) { + Node h = head.get(); + Node first = h.next; + if (first != null && first.next == first) { // help advance + advanceHead(h, first); + continue; + } + Node t = tail.get(); + Node last = t.next; + if (t == tail.get()) { + if (last != null) + tail.compareAndSet(t, last); // help advance + else + return t; + } + } + } + + /** * Gets rid of cancelled node s with original predecessor pred. - * @return null (to simplify use by callers) + * + * @param pred predecessor of cancelled node + * @param s the cancelled node */ - private Object clean(QNode pred, QNode s) { + private void clean(Node pred, Node s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -322,64 +389,80 @@ public class LinkedTransferQueue exte LockSupport.unpark(w); } - for (;;) { - if (pred.next != s) // already cleaned - return null; - QNode h = head.get(); - QNode hn = h.next; // Absorb cancelled first node as head - if (hn != null && hn.next == hn) { - advanceHead(h, hn); - continue; - } - QNode t = tail.get(); // Ensure consistent read for tail - if (t == h) - return null; - QNode tn = t.next; - if (t != tail.get()) - continue; - if (tn != null) { // Help advance tail - tail.compareAndSet(t, tn); - continue; - } - if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; + if (pred == null) + return; + + /* + * At any given time, exactly one node on list cannot be + * deleted -- the last inserted node. To accommodate this, if + * we cannot delete s, we save its predecessor as "cleanMe", + * processing the previously saved version first. At least one + * of node s or the node previously saved can always be + * processed, so this always terminates. + */ + while (pred.next == s) { + Node oldpred = reclean(); // First, help get rid of cleanMe + Node t = getValidatedTail(); + if (s != t) { // If not tail, try to unsplice + Node sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) - return null; + break; } - QNode dp = cleanMe.get(); - if (dp != null) { // Try unlinking previous cancelled node - QNode d = dp.next; - QNode dn; - if (d == null || // d is gone or - d == dp || // d is off list or - d.get() != d || // d not cancelled or - (d != t && // d not tail and - (dn = d.next) != null && // has successor - dn != d && // that is on list - dp.casNext(d, dn))) // d unspliced - cleanMe.compareAndSet(dp, null); - if (dp == pred) - return null; // s is already saved node + else if (oldpred == pred || // Already saved + (oldpred == null && cleanMe.compareAndSet(null, pred))) + break; // Postpone cleaning + } + } + + /** + * Tries to unsplice the cancelled node held in cleanMe that was + * previously uncleanable because it was at tail. + * + * @return current cleanMe node (or null) + */ + private Node reclean() { + /* + * cleanMe is, or at one time was, predecessor of cancelled + * node s that was the tail so could not be unspliced. If s + * is no longer the tail, try to unsplice if necessary and + * make cleanMe slot available. This differs from similar + * code in clean() because we must check that pred still + * points to a cancelled node that must be unspliced -- if + * not, we can (must) clear cleanMe without unsplicing. + * This can loop only due to contention on casNext or + * clearing cleanMe. + */ + Node pred; + while ((pred = cleanMe.get()) != null) { + Node t = getValidatedTail(); + Node s = pred.next; + if (s != t) { + Node sn; + if (s == null || s == pred || s.get() != s || + (sn = s.next) == s || pred.casNext(s, sn)) + cleanMe.compareAndSet(pred, null); } - else if (cleanMe.compareAndSet(null, pred)) - return null; // Postpone cleaning s + else // s is still tail; cannot clean + break; } + return pred; } /** - * Creates an initially empty LinkedTransferQueue. + * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + Node dummy = new Node(null, false); + head = new PaddedAtomicReference>(dummy); + tail = new PaddedAtomicReference>(dummy); + cleanMe = new PaddedAtomicReference>(null); } /** - * Creates a LinkedTransferQueue + * Creates a {@code LinkedTransferQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. + * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null @@ -389,26 +472,76 @@ public class LinkedTransferQueue exte addAll(c); } - public void put(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block. + * + * @throws NullPointerException if the specified element is null + */ + public void put(E e) { + offer(e); } - public boolean offer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); - return true; + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block or + * return {@code false}. + * + * @return {@code true} (as specified by + * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never return {@code false}. + * + * @return {@code true} (as specified by + * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null + */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); xfer(e, NOWAIT, 0); return true; } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded this method will never throw + * {@link IllegalStateException} or return {@code false}. + * + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ + public boolean add(E e) { + return offer(e); + } + + /** + * Transfers the specified element immediately if there exists a + * consumer already waiting to receive it (in {@link #take} or + * timed {@link #poll(Object,long,TimeUnit) poll}), otherwise + * returning {@code false} without enqueuing the element. + * + * @throws NullPointerException if the specified element is null + */ + public boolean tryTransfer(E e) { + if (e == null) throw new NullPointerException(); + return fulfill(e) != null; + } + + /** + * Inserts the specified element at the tail of this queue, + * waiting if necessary for the element to be received by a + * consumer invoking {@code take} or {@code poll}. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException if the specified element is null + */ public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { @@ -417,6 +550,14 @@ public class LinkedTransferQueue exte } } + /** + * Inserts the specified element at the tail of this queue, + * waiting up to the specified wait time for the element to be + * received by a consumer invoking {@code take} or {@code poll}. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException if the specified element is null + */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -427,30 +568,32 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } - public boolean tryTransfer(E e) { - if (e == null) throw new NullPointerException(); - return fulfill(e) != null; - } - public E take() throws InterruptedException { - Object e = xfer(null, WAIT, 0); + E e = xfer(null, WAIT, 0); if (e != null) - return (E)e; + return e; Thread.interrupted(); throw new InterruptedException(); } + /** + * @throws InterruptedException {@inheritDoc} + */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { - Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); + E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return (E)e; + return e; throw new InterruptedException(); } public E poll() { - return (E)fulfill(null); + return fulfill(null); } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); @@ -465,6 +608,10 @@ public class LinkedTransferQueue exte return n; } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); @@ -482,15 +629,15 @@ public class LinkedTransferQueue exte // Traversal-based methods /** - * Return head after performing any outstanding helping steps + * Returns head after performing any outstanding helping steps. */ - private QNode traversalHead() { + private Node traversalHead() { for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; + Node last = t.next; + Node first = h.next; if (t == tail.get()) { if (last != null) tail.compareAndSet(t, last); @@ -505,10 +652,23 @@ public class LinkedTransferQueue exte return h; } } + reclean(); } } - + /** + * Returns an iterator over the elements in this queue in proper + * sequence, from head to tail. + * + *

The returned iterator is a "weakly consistent" iterator that + * will never throw + * {@link ConcurrentModificationException ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed + * to) reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence + */ public Iterator iterator() { return new Itr(); } @@ -521,63 +681,67 @@ public class LinkedTransferQueue exte * if subsequently removed. */ class Itr implements Iterator { - QNode nextNode; // Next node to return next - QNode currentNode; // last returned node, for remove() - QNode prevNode; // predecessor of last returned node - E nextItem; // Cache of next item, once commited to in next + Node next; // node to return next + Node pnext; // predecessor of next + Node curr; // last returned node, for remove() + Node pcurr; // predecessor of curr, for remove() + E nextItem; // Cache of next item, once committed to in next Itr() { - nextNode = traversalHead(); advance(); } - E advance() { - prevNode = currentNode; - currentNode = nextNode; - E x = nextItem; + /** + * Moves to next valid node and returns item to return for + * next(), or null if no such. + */ + private E advance() { + pcurr = pnext; + curr = next; + E item = nextItem; - QNode p = nextNode.next; for (;;) { - if (p == null || !p.isData) { - nextNode = null; - nextItem = null; - return x; - } - Object item = p.get(); - if (item != p && item != null) { - nextNode = p; - nextItem = (E)item; - return x; + pnext = (next == null) ? traversalHead() : next; + next = pnext.next; + if (next == pnext) { + next = null; + continue; // restart + } + if (next == null) + break; + Object x = next.get(); + if (x != null && x != next) { + nextItem = (E) x; + break; } - prevNode = p; - p = p.next; } + return item; } public boolean hasNext() { - return nextNode != null; + return next != null; } public E next() { - if (nextNode == null) throw new NoSuchElementException(); + if (next == null) + throw new NoSuchElementException(); return advance(); } public void remove() { - QNode p = currentNode; - QNode prev = prevNode; - if (prev == null || p == null) + Node p = curr; + if (p == null) throw new IllegalStateException(); Object x = p.get(); if (x != null && x != p && p.compareAndSet(x, p)) - clean(prev, p); + clean(pcurr, p); } } public E peek() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return null; Object x = p.get(); @@ -585,15 +749,15 @@ public class LinkedTransferQueue exte if (!p.isData) return null; if (x != null) - return (E)x; + return (E) x; } } } public boolean isEmpty() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return true; Object x = p.get(); @@ -608,8 +772,8 @@ public class LinkedTransferQueue exte public boolean hasWaitingConsumer() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) return false; Object x = p.get(); @@ -620,8 +784,8 @@ public class LinkedTransferQueue exte /** * Returns the number of elements in this queue. If this queue - * contains more than Integer.MAX_VALUE elements, returns - * Integer.MAX_VALUE. + * contains more than {@code Integer.MAX_VALUE} elements, returns + * {@code Integer.MAX_VALUE}. * *

Beware that, unlike in most collections, this method is * NOT a constant-time operation. Because of the @@ -631,30 +795,75 @@ public class LinkedTransferQueue exte * @return the number of elements in this queue */ public int size() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { - Object x = p.get(); - if (x != null && x != p) { - if (++count == Integer.MAX_VALUE) // saturated + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart break; + if (q == null || !q.isData) + return count; + Object x = q.get(); + if (x != null && x != q) { + if (++count == Integer.MAX_VALUE) // saturated + return count; + } + pred = q; } } - return count; } public int getWaitingConsumerCount() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { - if (p.get() == null) { - if (++count == Integer.MAX_VALUE) + // converse of size -- count valid non-data nodes + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart break; + if (q == null || q.isData) + return count; + Object x = q.get(); + if (x == null) { + if (++count == Integer.MAX_VALUE) // saturated + return count; + } + pred = q; } } - return count; } + public boolean remove(Object o) { + if (o == null) + return false; + for (;;) { + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart + break; + if (q == null || !q.isData) + return false; + Object x = q.get(); + if (x != null && x != q && o.equals(x) && + q.compareAndSet(x, q)) { + clean(pred, q); + return true; + } + pred = q; + } + } + } + + /** + * Always returns {@code Integer.MAX_VALUE} because a + * {@code LinkedTransferQueue} is not capacity constrained. + * + * @return {@code Integer.MAX_VALUE} (as specified by + * {@link BlockingQueue#remainingCapacity()}) + */ public int remainingCapacity() { return Integer.MAX_VALUE; } @@ -662,15 +871,15 @@ public class LinkedTransferQueue exte /** * Save the state to a stream (that is, serialize it). * - * @serialData All of the elements (each an E) in + * @serialData All of the elements (each an {@code E}) in * the proper order, followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { s.defaultWriteObject(); - for (Iterator it = iterator(); it.hasNext(); ) - s.writeObject(it.next()); + for (E e : this) + s.writeObject(e); // Use trailing null as sentinel s.writeObject(null); } @@ -678,6 +887,7 @@ public class LinkedTransferQueue exte /** * Reconstitute the Queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -685,7 +895,7 @@ public class LinkedTransferQueue exte s.defaultReadObject(); resetHeadAndTail(); for (;;) { - E item = (E)s.readObject(); + @SuppressWarnings("unchecked") E item = (E) s.readObject(); if (item == null) break; else @@ -693,43 +903,65 @@ public class LinkedTransferQueue exte } } - // Support for resetting head/tail while deserializing + private void resetHeadAndTail() { + Node dummy = new Node(null, false); + UNSAFE.putObjectVolatile(this, headOffset, + new PaddedAtomicReference>(dummy)); + UNSAFE.putObjectVolatile(this, tailOffset, + new PaddedAtomicReference>(dummy)); + UNSAFE.putObjectVolatile(this, cleanMeOffset, + new PaddedAtomicReference>(null)); + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); + private static final long cleanMeOffset = + objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); - // Temporary Unsafe mechanics for preliminary release - private static final Unsafe _unsafe; - private static final long headOffset; - private static final long tailOffset; - private static final long cleanMeOffset; - static { + + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class klazz) { try { - if (LinkedTransferQueue.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - headOffset = _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField("head")); - tailOffset = _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField("tail")); - cleanMeOffset = _unsafe.objectFieldOffset - (LinkedTransferQueue.class.getDeclaredField("cleanMe")); - } catch (Exception e) { - throw new RuntimeException("Could not initialize intrinsics", e); + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; } } - private void resetHeadAndTail() { - QNode dummy = new QNode(null, false); - _unsafe.putObjectVolatile(this, headOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference(dummy)); - _unsafe.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference(null)); - + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } } - }