--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/11/15 02:29:12 1.70 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2011/03/15 19:47:02 1.83 @@ -1,20 +1,20 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166y; -import java.util.concurrent.*; - import java.util.AbstractQueue; import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; + /** * An unbounded {@link TransferQueue} based on linked nodes. * This queue orders elements FIFO (first-in-first-out) with respect @@ -321,7 +321,7 @@ public class LinkedTransferQueue exte * situations in which we cannot guarantee to make node s * unreachable in this way: (1) If s is the trailing node of list * (i.e., with null next), then it is pinned as the target node - * for appends, so can only be removed later when other nodes are + * for appends, so can only be removed later after other nodes are * appended. (2) We cannot necessarily unlink s given a * predecessor node that is matched (including the case of being * cancelled): the predecessor may already be unspliced, in which @@ -343,18 +343,18 @@ public class LinkedTransferQueue exte * When these cases arise, rather than always retraversing the * entire list to find an actual predecessor to unlink (which * won't help for case (1) anyway), we record a conservative - * estimate of possible unsplice failures (in "sweepVotes"). We - * trigger a full sweep when the estimate exceeds a threshold - * indicating the maximum number of estimated removal failures to - * tolerate before sweeping through, unlinking cancelled nodes - * that were not unlinked upon initial removal. We perform sweeps - * by the thread hitting threshold (rather than background threads - * or by spreading work to other threads) because in the main - * contexts in which removal occurs, the caller is already - * timed-out, cancelled, or performing a potentially O(n) - * operation (i.e., remove(x)), none of which are time-critical - * enough to warrant the overhead that alternatives would impose - * on other threads. + * estimate of possible unsplice failures (in "sweepVotes"). + * We trigger a full sweep when the estimate exceeds a threshold + * ("SWEEP_THRESHOLD") indicating the maximum number of estimated + * removal failures to tolerate before sweeping through, unlinking + * cancelled nodes that were not unlinked upon initial removal. + * We perform sweeps by the thread hitting threshold (rather than + * background threads or by spreading work to other threads) + * because in the main contexts in which removal occurs, the + * caller is already timed-out, cancelled, or performing a + * potentially O(n) operation (e.g. remove(x)), none of which are + * time-critical enough to warrant the overhead that alternatives + * would impose on other threads. * * Because the sweepVotes estimate is conservative, and because * nodes become unlinked "naturally" as they fall off the head of @@ -422,13 +422,13 @@ public class LinkedTransferQueue exte } final boolean casItem(Object cmp, Object val) { - assert cmp == null || cmp.getClass() != Node.class; + // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** - * Creates a new node. Uses relaxed write because item can only - * be seen if followed by CAS. + * Constructs a new node. Uses relaxed write because item can + * only be seen after publication via casNext. */ Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write @@ -446,7 +446,7 @@ public class LinkedTransferQueue exte /** * Sets item to self and waiter to null, to avoid garbage * retention after matching or cancelling. Uses relaxed writes - * bacause order is already constrained in the only calling + * because order is already constrained in the only calling * contexts: item is forgotten only after volatile/atomic * mechanics that extract items. Similarly, clearing waiter * follows either CAS or return from park (if ever parked; @@ -488,7 +488,7 @@ public class LinkedTransferQueue exte * Tries to artificially match a data node -- used by remove. */ final boolean tryMatchData() { - assert isData; + // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); @@ -541,7 +541,7 @@ public class LinkedTransferQueue exte @SuppressWarnings("unchecked") static E cast(Object item) { - assert item == null || item.getClass() != Node.class; + // assert item == null || item.getClass() != Node.class; return (E) item; } @@ -560,7 +560,8 @@ public class LinkedTransferQueue exte throw new NullPointerException(); Node s = null; // the node to append, if needed - retry: for (;;) { // restart on append race + retry: + for (;;) { // restart on append race for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; @@ -571,7 +572,7 @@ public class LinkedTransferQueue exte if (p.casItem(item, e)) { // match for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton - if (head == h && casHead(h, n == null? q : n)) { + if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry @@ -656,7 +657,7 @@ public class LinkedTransferQueue exte for (;;) { Object item = s.item; if (item != e) { // matched - assert item != s; + // assert item != s; s.forgetContents(); // avoid garbage return this.cast(item); } @@ -781,22 +782,60 @@ public class LinkedTransferQueue exte * Moves to next node after prev, or first node if prev null. */ private void advance(Node prev) { - lastPred = lastRet; - lastRet = prev; - for (Node p = (prev == null) ? head : succ(prev); - p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) { - nextItem = LinkedTransferQueue.this.cast(item); - nextNode = p; + /* + * To track and avoid buildup of deleted nodes in the face + * of calls to both Queue.remove and Itr.remove, we must + * include variants of unsplice and sweep upon each + * advance: Upon Itr.remove, we may need to catch up links + * from lastPred, and upon other removes, we might need to + * skip ahead from stale nodes and unsplice deleted ones + * found while advancing. + */ + + Node r, b; // reset lastPred upon possible deletion of lastRet + if ((r = lastRet) != null && !r.isMatched()) + lastPred = r; // next lastPred is old lastRet + else if ((b = lastPred) == null || b.isMatched()) + lastPred = null; // at start of list + else { + Node s, n; // help with removal of lastPred.next + while ((s = b.next) != null && + s != b && s.isMatched() && + (n = s.next) != null && n != s) + b.casNext(s, n); + } + + this.lastRet = prev; + for (Node p = prev, s, n;;) { + s = (p == null) ? head : p.next; + if (s == null) + break; + else if (s == p) { + p = null; + continue; + } + Object item = s.item; + if (s.isData) { + if (item != null && item != s) { + nextItem = LinkedTransferQueue.cast(item); + nextNode = s; return; } } else if (item == null) break; + // assert s.isMatched(); + if (p == null) + p = s; + else if ((n = s.next) == null) + break; + else if (s == n) + p = null; + else + p.casNext(s, n); } nextNode = null; + nextItem = null; } Itr() { @@ -816,10 +855,12 @@ public class LinkedTransferQueue exte } public final void remove() { - Node p = lastRet; - if (p == null) throw new IllegalStateException(); - if (p.tryMatchData()) - unsplice(lastPred, p); + final Node lastRet = this.lastRet; + if (lastRet == null) + throw new IllegalStateException(); + this.lastRet = null; + if (lastRet.tryMatchData()) + unsplice(lastPred, lastRet); } } @@ -876,17 +917,21 @@ public class LinkedTransferQueue exte } /** - * Unlinks matched nodes encountered in a traversal from head. + * Unlinks matched (typically cancelled) nodes encountered in a + * traversal from head. */ private void sweep() { - Node p = head, s, n; - while (p != null && (s = p.next) != null && (n = s.next) != null) { - if (p == s || s == n) - p = head; // stale - else if (s.isMatched()) - p.casNext(s, n); - else + for (Node p = head, s, n; p != null && (s = p.next) != null; ) { + if (!s.isMatched()) + // Unmatched nodes are never self-linked p = s; + else if ((n = s.next) == null) // trailing node is pinned + break; + else if (s == n) // stale + // No need to also check for p == s, since that implies s == n + p = head; + else + p.casNext(s, n); } } @@ -965,8 +1010,7 @@ public class LinkedTransferQueue exte * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * - * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { @@ -1124,7 +1168,11 @@ public class LinkedTransferQueue exte * @return {@code true} if this queue contains no elements */ public boolean isEmpty() { - return firstOfMode(true) == null; + for (Node p = head; p != null; p = succ(p)) { + if (!p.isMatched()) + return !p.isData; + } + return true; } public boolean hasWaitingConsumer() { @@ -1167,6 +1215,28 @@ public class LinkedTransferQueue exte } /** + * Returns {@code true} if this queue contains the specified element. + * More formally, returns {@code true} if and only if this queue contains + * at least one element {@code e} such that {@code o.equals(e)}. + * + * @param o object to be checked for containment in this queue + * @return {@code true} if this queue contains the specified element + */ + public boolean contains(Object o) { + if (o == null) return false; + for (Node p = head; p != null; p = succ(p)) { + Object item = p.item; + if (p.isData) { + if (item != null && item != p && o.equals(item)) + return true; + } + else if (item == null) + break; + } + return false; + } + + /** * Always returns {@code Integer.MAX_VALUE} because a * {@code LinkedTransferQueue} is not capacity constrained. *