--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/10/24 12:29:57 1.50 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/11/02 18:38:37 1.66 @@ -373,6 +373,7 @@ public class LinkedTransferQueue exte } final boolean casItem(Object cmp, Object val) { + assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } @@ -409,7 +410,14 @@ public class LinkedTransferQueue exte */ final boolean isMatched() { Object x = item; - return x == this || (x != null) != isData; + return (x == this) || ((x == null) == isData); + } + + /** + * Returns true if this is an unmatched request node. + */ + final boolean isUnmatchedRequest() { + return !isData && item == null; } /** @@ -427,6 +435,7 @@ public class LinkedTransferQueue exte * Tries to artificially match a data node -- used by remove. */ final boolean tryMatchData() { + assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); @@ -448,10 +457,10 @@ public class LinkedTransferQueue exte } /** head of the queue; null until first enqueue */ - private transient volatile Node head; + transient volatile Node head; /** predecessor of dangling unspliceable node */ - private transient volatile Node cleanMe; // decl here to reduce contention + private transient volatile Node cleanMe; // decl here reduces contention /** tail of the queue; null until first append */ private transient volatile Node tail; @@ -470,25 +479,30 @@ public class LinkedTransferQueue exte } /* - * Possible values for "how" argument in xfer method. Beware that - * the order of assigned numerical values matters. + * Possible values for "how" argument in xfer method. */ - private static final int NOW = 0; // for untimed poll, tryTransfer - private static final int ASYNC = 1; // for offer, put, add - private static final int SYNC = 2; // for transfer, take - private static final int TIMEOUT = 3; // for timed poll, tryTransfer + private static final int NOW = 0; // for untimed poll, tryTransfer + private static final int ASYNC = 1; // for offer, put, add + private static final int SYNC = 2; // for transfer, take + private static final int TIMED = 3; // for timed poll, tryTransfer + + @SuppressWarnings("unchecked") + static E cast(Object item) { + assert item == null || item.getClass() != Node.class; + return (E) item; + } /** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take * @param haveData true if this is a put, else a take - * @param how NOW, ASYNC, SYNC, or TIMEOUT - * @param nanos timeout in nanosecs, used only if mode is TIMEOUT + * @param how NOW, ASYNC, SYNC, or TIMED + * @param nanos timeout in nanosecs, used only if mode is TIMED * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ - private Object xfer(Object e, boolean haveData, int how, long nanos) { + private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed @@ -502,35 +516,34 @@ public class LinkedTransferQueue exte if (isData == haveData) // can't match break; if (p.casItem(item, e)) { // match - Thread w = p.waiter; - while (p != h) { // update head - Node n = p.next; // by 2 unless singleton - if (n != null) - p = n; - if (head == h && casHead(h, p)) { + for (Node q = p; q != h;) { + Node n = q.next; // update head by 2 + if (n != null) // unless singleton + q = n; + if (head == h && casHead(h, q)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || - (p = h.next) == null || !p.isMatched()) + (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } - LockSupport.unpark(w); - return item; + LockSupport.unpark(p.waiter); + return this.cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } - if (how >= ASYNC) { // No matches available + if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode - if (how >= SYNC) - return awaitMatch(s, pred, e, how, nanos); + if (how != ASYNC) + return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } @@ -546,7 +559,7 @@ public class LinkedTransferQueue exte * predecessor */ private Node tryAppend(Node s, boolean haveData) { - for (Node t = tail, p = t;;) { // move p to last node and append + for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) { if (casHead(null, s)) @@ -579,13 +592,12 @@ public class LinkedTransferQueue exte * predecessor, or null if unknown (the null case does not occur * in any current calls but may in possible future extensions) * @param e the comparison value for checking match - * @param how either SYNC or TIMEOUT - * @param nanos timeout value + * @param timed if true, wait only until timeout elapses + * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */ - private Object awaitMatch(Node s, Node pred, Object e, - int how, long nanos) { - long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L; + private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { + long lastTime = timed ? System.nanoTime() : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed @@ -593,11 +605,12 @@ public class LinkedTransferQueue exte for (;;) { Object item = s.item; if (item != e) { // matched + assert item != s; s.forgetContents(); // avoid garbage - return item; + return this.cast(item); } - if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) && - s.casItem(e, s)) { // cancel + if ((w.isInterrupted() || (timed && nanos <= 0)) && + s.casItem(e, s)) { // cancel unsplice(pred, s); return e; } @@ -613,9 +626,9 @@ public class LinkedTransferQueue exte Thread.yield(); // occasionally yield } else if (s.waiter == null) { - s.waiter = w; // request unpark + s.waiter = w; // request unpark then recheck } - else if (how == TIMEOUT) { + else if (timed) { long now = System.nanoTime(); if ((nanos -= now - lastTime) > 0) LockSupport.parkNanos(this, nanos); @@ -623,6 +636,7 @@ public class LinkedTransferQueue exte } else { LockSupport.park(this); + s.waiter = null; spins = -1; // spin if front upon wakeup } } @@ -668,31 +682,40 @@ public class LinkedTransferQueue exte /* -------------- Traversal methods -------------- */ /** + * Returns the successor of p, or the head node if p.next has been + * linked to self, which will only be true if traversing with a + * stale pointer that is now off the list. + */ + final Node succ(Node p) { + Node next = p.next; + return (p == next) ? head : next; + } + + /** * Returns the first unmatched node of the given mode, or null if * none. Used by methods isEmpty, hasWaitingConsumer. */ - private Node firstOfMode(boolean data) { - for (Node p = head; p != null; ) { + private Node firstOfMode(boolean isData) { + for (Node p = head; p != null; p = succ(p)) { if (!p.isMatched()) - return (p.isData == data) ? p : null; - Node n = p.next; - p = (n != p) ? n : head; + return (p.isData == isData) ? p : null; } return null; } /** * Returns the item in the first unmatched node with isData; or - * null if none. Used by peek. + * null if none. Used by peek. */ - private Object firstDataItem() { - for (Node p = head; p != null; ) { - boolean isData = p.isData; + private E firstDataItem() { + for (Node p = head; p != null; p = succ(p)) { Object item = p.item; - if (item != p && (item != null) == isData) - return isData ? item : null; - Node n = p.next; - p = (n != p) ? n : head; + if (p.isData) { + if (item != null && item != p) + return this.cast(item); + } + else if (item == null) + return null; } return null; } @@ -723,30 +746,28 @@ public class LinkedTransferQueue exte final class Itr implements Iterator { private Node nextNode; // next node to return item for - private Object nextItem; // the corresponding item + private E nextItem; // the corresponding item private Node lastRet; // last returned node, to support remove + private Node lastPred; // predecessor to unlink lastRet /** * Moves to next node after prev, or first node if prev null. */ private void advance(Node prev) { + lastPred = lastRet; lastRet = prev; - Node p; - if (prev == null || (p = prev.next) == prev) - p = head; - while (p != null) { + for (Node p = (prev == null) ? head : succ(prev); + p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p) { - nextItem = item; + nextItem = LinkedTransferQueue.this.cast(item); nextNode = p; return; } } else if (item == null) break; - Node n = p.next; - p = (n != p) ? n : head; } nextNode = null; } @@ -762,16 +783,15 @@ public class LinkedTransferQueue exte public final E next() { Node p = nextNode; if (p == null) throw new NoSuchElementException(); - Object e = nextItem; + E e = nextItem; advance(p); - return (E) e; + return e; } public final void remove() { Node p = lastRet; if (p == null) throw new IllegalStateException(); - lastRet = null; - findAndRemoveNode(p); + findAndRemoveDataNode(lastPred, p); } } @@ -805,8 +825,10 @@ public class LinkedTransferQueue exte break; } if (oldpred == pred || // Already saved - (oldpred == null && casCleanMe(null, pred))) - break; // Postpone cleaning + ((oldpred == null || oldpred.next == s) && + casCleanMe(oldpred, pred))) { + break; + } } } } @@ -846,24 +868,30 @@ public class LinkedTransferQueue exte } /** - * Main implementation of Iterator.remove(). Find - * and unsplice the given node. + * Main implementation of Iterator.remove(). Finds + * and unsplices the given data node. + * + * @param possiblePred possible predecessor of s + * @param s the node to remove */ - final void findAndRemoveNode(Node s) { + final void findAndRemoveDataNode(Node possiblePred, Node s) { + assert s.isData; if (s.tryMatchData()) { - Node pred = null; - Node p = head; - while (p != null) { - if (p == s) { - unsplice(pred, p); - break; - } - if (!p.isData && !p.isMatched()) - break; - pred = p; - if ((p = p.next) == pred) { // stale - pred = null; - p = head; + if (possiblePred != null && possiblePred.next == s) + unsplice(possiblePred, s); // was actual predecessor + else { + for (Node pred = null, p = head; p != null; ) { + if (p == s) { + unsplice(pred, p); + break; + } + if (p.isUnmatchedRequest()) + break; + pred = p; + if ((p = p.next) == pred) { // stale + pred = null; + p = head; + } } } } @@ -874,9 +902,7 @@ public class LinkedTransferQueue exte */ private boolean findAndRemove(Object e) { if (e != null) { - Node pred = null; - Node p = head; - while (p != null) { + for (Node pred = null, p = head; p != null; ) { Object item = p.item; if (p.isData) { if (item != null && item != p && e.equals(item) && @@ -888,7 +914,7 @@ public class LinkedTransferQueue exte else if (item == null) break; pred = p; - if ((p = p.next) == pred) { + if ((p = p.next) == pred) { // stale pred = null; p = head; } @@ -1016,7 +1042,7 @@ public class LinkedTransferQueue exte */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null) + if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; @@ -1024,22 +1050,22 @@ public class LinkedTransferQueue exte } public E take() throws InterruptedException { - Object e = xfer(null, false, SYNC, 0); + E e = xfer(null, false, SYNC, 0); if (e != null) - return (E)e; + return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { - Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout)); + E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return (E)e; + return e; throw new InterruptedException(); } public E poll() { - return (E)xfer(null, false, NOW, 0); + return xfer(null, false, NOW, 0); } /** @@ -1096,7 +1122,7 @@ public class LinkedTransferQueue exte } public E peek() { - return (E) firstDataItem(); + return firstDataItem(); } /** @@ -1192,7 +1218,6 @@ public class LinkedTransferQueue exte } } - // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE = getUnsafe(); @@ -1215,7 +1240,14 @@ public class LinkedTransferQueue exte } } - private static sun.misc.Unsafe getUnsafe() { + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) {