--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/07/25 00:34:00 1.26 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/08/01 20:44:05 1.41 @@ -10,11 +10,12 @@ import java.util.concurrent.*; import java.util.AbstractQueue; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Queue; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * An unbounded {@linkplain TransferQueue} based on linked nodes. @@ -115,17 +116,46 @@ public class LinkedTransferQueue exte this.isData = isData; } - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (Node.class, Node.class, "next"); + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); final boolean casNext(Node cmp, Node val) { - return nextUpdater.compareAndSet(this, cmp, val); + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } final void clearNext() { - nextUpdater.lazySet(this, this); + UNSAFE.putOrderedObject(this, nextOffset, this); + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } } private static final long serialVersionUID = -3375979862319811754L; @@ -264,7 +294,7 @@ public class LinkedTransferQueue exte * @param e the comparison value for checking match * @param mode mode * @param nanos timeout value - * @return matched item, or s if cancelled + * @return matched item, or null if cancelled */ private E awaitFulfill(Node pred, Node s, E e, int mode, long nanos) { @@ -330,7 +360,7 @@ public class LinkedTransferQueue exte for (;;) { Node h = head.get(); Node first = h.next; - if (first != null && first.next == first) { // help advance + if (first != null && first.get() == first) { // help advance advanceHead(h, first); continue; } @@ -442,32 +472,81 @@ public class LinkedTransferQueue exte addAll(c); } - public void put(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block. + * + * @throws NullPointerException if the specified element is null + */ + public void put(E e) { + offer(e); } - public boolean offer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (Thread.interrupted()) throw new InterruptedException(); - xfer(e, NOWAIT, 0); - return true; + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block or + * return {@code false}. + * + * @return {@code true} (as specified by + * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never return {@code false}. + * + * @return {@code true} (as specified by + * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null + */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); xfer(e, NOWAIT, 0); return true; } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never throw + * {@link IllegalStateException} or return {@code false}. + * + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ public boolean add(E e) { + return offer(e); + } + + /** + * Transfers the element to a waiting consumer immediately, if possible. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * otherwise returning {@code false} without enqueuing the element. + * + * @throws NullPointerException if the specified element is null + */ + public boolean tryTransfer(E e) { if (e == null) throw new NullPointerException(); - xfer(e, NOWAIT, 0); - return true; + return fulfill(e) != null; } + /** + * Transfers the element to a consumer, waiting if necessary to do so. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * else inserts the specified element at the tail of this queue + * and waits until the element is received by a consumer. + * + * @throws NullPointerException if the specified element is null + */ public void transfer(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (xfer(e, WAIT, 0) == null) { @@ -476,6 +555,20 @@ public class LinkedTransferQueue exte } } + /** + * Transfers the element to a consumer if it is possible to do so + * before the timeout elapses. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * else inserts the specified element at the tail of this queue + * and waits until the element is received by a consumer, + * returning {@code false} if the specified wait time elapses + * before the element can be transferred. + * + * @throws NullPointerException if the specified element is null + */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); @@ -486,23 +579,18 @@ public class LinkedTransferQueue exte throw new InterruptedException(); } - public boolean tryTransfer(E e) { - if (e == null) throw new NullPointerException(); - return fulfill(e) != null; - } - public E take() throws InterruptedException { - Object e = xfer(null, WAIT, 0); + E e = xfer(null, WAIT, 0); if (e != null) - return (E) e; + return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { - Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); + E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return (E) e; + return e; throw new InterruptedException(); } @@ -510,6 +598,10 @@ public class LinkedTransferQueue exte return fulfill(null); } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); @@ -524,6 +616,10 @@ public class LinkedTransferQueue exte return n; } + /** + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); @@ -568,7 +664,19 @@ public class LinkedTransferQueue exte } } - + /** + * Returns an iterator over the elements in this queue in proper + * sequence, from head to tail. + * + *

The returned iterator is a "weakly consistent" iterator that + * will never throw + * {@link ConcurrentModificationException ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed + * to) reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence + */ public Iterator iterator() { return new Itr(); } @@ -583,42 +691,39 @@ public class LinkedTransferQueue exte class Itr implements Iterator { Node next; // node to return next Node pnext; // predecessor of next - Node snext; // successor of next Node curr; // last returned node, for remove() Node pcurr; // predecessor of curr, for remove() - E nextItem; // Cache of next item, once committed to in next + E nextItem; // Cache of next item, once committed to in next Itr() { - findNext(); + advance(); } /** - * Ensures next points to next valid node, or null if none. + * Moves to next valid node and returns item to return for + * next(), or null if no such. */ - void findNext() { + private E advance() { + pcurr = pnext; + curr = next; + E item = nextItem; + for (;;) { - Node pred = pnext; - Node q = next; - if (pred == null || pred == q) { - pred = traversalHead(); - q = pred.next; - } - if (q == null || !q.isData) { + pnext = (next == null) ? traversalHead() : next; + next = pnext.next; + if (next == pnext) { next = null; - return; + continue; // restart } - Object x = q.get(); - Node s = q.next; - if (x != null && q != x && q != s) { + if (next == null) + break; + Object x = next.get(); + if (x != null && x != next) { nextItem = (E) x; - snext = s; - pnext = pred; - next = q; - return; + break; } - pnext = q; - next = s; } + return item; } public boolean hasNext() { @@ -626,14 +731,9 @@ public class LinkedTransferQueue exte } public E next() { - if (next == null) throw new NoSuchElementException(); - pcurr = pnext; - curr = next; - pnext = next; - next = snext; - E x = nextItem; - findNext(); - return x; + if (next == null) + throw new NoSuchElementException(); + return advance(); } public void remove() { @@ -662,6 +762,11 @@ public class LinkedTransferQueue exte } } + /** + * Returns {@code true} if this queue contains no elements. + * + * @return {@code true} if this queue contains no elements + */ public boolean isEmpty() { for (;;) { Node h = traversalHead(); @@ -703,32 +808,44 @@ public class LinkedTransferQueue exte * @return the number of elements in this queue */ public int size() { - int count = 0; - Node h = traversalHead(); - for (Node p = h.next; p != null && p.isData; p = p.next) { - Object x = p.get(); - if (x != null && x != p) { - if (++count == Integer.MAX_VALUE) // saturated + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart break; + if (q == null || !q.isData) + return count; + Object x = q.get(); + if (x != null && x != q) { + if (++count == Integer.MAX_VALUE) // saturated + return count; + } + pred = q; } } - return count; } public int getWaitingConsumerCount() { - int count = 0; - Node h = traversalHead(); - for (Node p = h.next; p != null && !p.isData; p = p.next) { - if (p.get() == null) { - if (++count == Integer.MAX_VALUE) + // converse of size -- count valid non-data nodes + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) // restart break; + if (q == null || q.isData) + return count; + Object x = q.get(); + if (x == null) { + if (++count == Integer.MAX_VALUE) // saturated + return count; + } + pred = q; } } - return count; - } - - public int remainingCapacity() { - return Integer.MAX_VALUE; } public boolean remove(Object o) { @@ -738,10 +855,10 @@ public class LinkedTransferQueue exte Node pred = traversalHead(); for (;;) { Node q = pred.next; - if (q == null || !q.isData) - return false; if (q == pred) // restart break; + if (q == null || !q.isData) + return false; Object x = q.get(); if (x != null && x != q && o.equals(x) && q.compareAndSet(x, q)) { @@ -754,6 +871,17 @@ public class LinkedTransferQueue exte } /** + * Always returns {@code Integer.MAX_VALUE} because a + * {@code LinkedTransferQueue} is not capacity constrained. + * + * @return {@code Integer.MAX_VALUE} (as specified by + * {@link BlockingQueue#remainingCapacity()}) + */ + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + /** * Save the state to a stream (that is, serialize it). * * @serialData All of the elements (each an {@code E}) in @@ -799,16 +927,49 @@ public class LinkedTransferQueue exte new PaddedAtomicReference>(null)); } - // Unsafe mechanics for jsr166y 3rd party package. + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); + private static final long cleanMeOffset = + objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); + + + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ private static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { + (new java.security + .PrivilegedExceptionAction() { public sun.misc.Unsafe run() throws Exception { - return getUnsafeByReflection(); + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { throw new RuntimeException("Could not initialize intrinsics", @@ -816,32 +977,4 @@ public class LinkedTransferQueue exte } } } - - private static sun.misc.Unsafe getUnsafeByReflection() - throws NoSuchFieldException, IllegalAccessException { - java.lang.reflect.Field f = - sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName, Class klazz) { - try { - return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName)); - } catch (NoSuchFieldException e) { - // Convert Exception to Error - NoSuchFieldError error = new NoSuchFieldError(fieldName); - error.initCause(e); - throw error; - } - } - - private static final sun.misc.Unsafe UNSAFE = getUnsafe(); - static final long headOffset = - fieldOffset("head", LinkedTransferQueue.class); - static final long tailOffset = - fieldOffset("tail", LinkedTransferQueue.class); - static final long cleanMeOffset = - fieldOffset("cleanMe", LinkedTransferQueue.class); - }