--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/10/28 10:23:38 1.58
+++ jsr166/src/jsr166y/LinkedTransferQueue.java 2014/10/29 20:23:14 1.93
@@ -1,20 +1,19 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/licenses/publicdomain
+ * http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
-import java.util.concurrent.*;
-
import java.util.AbstractQueue;
import java.util.Collection;
-import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
+
/**
* An unbounded {@link TransferQueue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect
@@ -23,10 +22,17 @@ import java.util.concurrent.locks.LockSu
* producer. The tail of the queue is that element that has
* been on the queue the shortest time for some producer.
*
- *
Beware that, unlike in most collections, the {@code size}
- * method is NOT a constant-time operation. Because of the
+ *
Beware that, unlike in most collections, the {@code size} method
+ * is NOT a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
- * of elements requires a traversal of the elements.
+ * of elements requires a traversal of the elements, and so may report
+ * inaccurate results if this collection is modified during traversal.
+ * Additionally, the bulk operations {@code addAll},
+ * {@code removeAll}, {@code retainAll}, {@code containsAll},
+ * {@code equals}, and {@code toArray} are not guaranteed
+ * to be performed atomically. For example, an iterator operating
+ * concurrently with an {@code addAll} operation might view only some
+ * of the added elements.
*
*
This class and its iterator implement all of the
* optional methods of the {@link Collection} and {@link
@@ -70,7 +76,7 @@ public class LinkedTransferQueue exte
*
* A FIFO dual queue may be implemented using a variation of the
* Michael & Scott (M&S) lock-free queue algorithm
- * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
+ * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
* It maintains two pointer fields, "head", pointing to a
* (matched) node that in turn points to the first actual
* (unmatched) queue node (or null if empty); and "tail" that
@@ -206,24 +212,6 @@ public class LinkedTransferQueue exte
* additional GC bookkeeping ("write barriers") that are sometimes
* more costly than the writes themselves because of contention).
*
- * Removal of interior nodes (due to timed out or interrupted
- * waits, or calls to remove(x) or Iterator.remove) can use a
- * scheme roughly similar to that described in Scherer, Lea, and
- * Scott's SynchronousQueue. Given a predecessor, we can unsplice
- * any node except the (actual) tail of the queue. To avoid
- * build-up of cancelled trailing nodes, upon a request to remove
- * a trailing node, it is placed in field "cleanMe" to be
- * unspliced upon the next call to unsplice any other node.
- * Situations needing such mechanics are not common but do occur
- * in practice; for example when an unbounded series of short
- * timed calls to poll repeatedly time out but never otherwise
- * fall off the list because of an untimed call to take at the
- * front of the queue. Note that maintaining field cleanMe does
- * not otherwise much impact garbage retention even if never
- * cleared by some other call because the held node will
- * eventually either directly or indirectly lead to a self-link
- * once off the list.
- *
* *** Overview of implementation ***
*
* We use a threshold-based approach to updates, with a slack
@@ -239,15 +227,10 @@ public class LinkedTransferQueue exte
* per-thread one available, but even ThreadLocalRandom is too
* heavy for these purposes.
*
- * With such a small slack threshold value, it is rarely
- * worthwhile to augment this with path short-circuiting; i.e.,
- * unsplicing nodes between head and the first unmatched node, or
- * similarly for tail, rather than advancing head or tail
- * proper. However, it is used (in awaitMatch) immediately before
- * a waiting thread starts to block, as a final bit of helping at
- * a point when contention with others is extremely unlikely
- * (since if other threads that could release it are operating,
- * then the current thread wouldn't be blocking).
+ * With such a small slack threshold value, it is not worthwhile
+ * to augment this with path short-circuiting (i.e., unsplicing
+ * interior nodes) except in the case of cancellation/removal (see
+ * below).
*
* We allow both the head and tail fields to be null before any
* nodes are enqueued; initializing upon first append. This
@@ -318,8 +301,8 @@ public class LinkedTransferQueue exte
* of less-contended queues. During spins threads check their
* interrupt status and generate a thread-local random number
* to decide to occasionally perform a Thread.yield. While
- * yield has underdefined specs, we assume that might it help,
- * and will not hurt in limiting impact of spinning on busy
+ * yield has underdefined specs, we assume that it might help,
+ * and will not hurt, in limiting impact of spinning on busy
* systems. We also use smaller (1/2) spins for nodes that are
* not known to be front but whose predecessors have not
* blocked -- these "chained" spins avoid artifacts of
@@ -329,6 +312,70 @@ public class LinkedTransferQueue exte
* versa) compared to their predecessors receive additional
* chained spins, reflecting longer paths typically required to
* unblock threads during phase changes.
+ *
+ *
+ * ** Unlinking removed interior nodes **
+ *
+ * In addition to minimizing garbage retention via self-linking
+ * described above, we also unlink removed interior nodes. These
+ * may arise due to timed out or interrupted waits, or calls to
+ * remove(x) or Iterator.remove. Normally, given a node that was
+ * at one time known to be the predecessor of some node s that is
+ * to be removed, we can unsplice s by CASing the next field of
+ * its predecessor if it still points to s (otherwise s must
+ * already have been removed or is now offlist). But there are two
+ * situations in which we cannot guarantee to make node s
+ * unreachable in this way: (1) If s is the trailing node of list
+ * (i.e., with null next), then it is pinned as the target node
+ * for appends, so can only be removed later after other nodes are
+ * appended. (2) We cannot necessarily unlink s given a
+ * predecessor node that is matched (including the case of being
+ * cancelled): the predecessor may already be unspliced, in which
+ * case some previous reachable node may still point to s.
+ * (For further explanation see Herlihy & Shavit "The Art of
+ * Multiprocessor Programming" chapter 9). Although, in both
+ * cases, we can rule out the need for further action if either s
+ * or its predecessor are (or can be made to be) at, or fall off
+ * from, the head of list.
+ *
+ * Without taking these into account, it would be possible for an
+ * unbounded number of supposedly removed nodes to remain
+ * reachable. Situations leading to such buildup are uncommon but
+ * can occur in practice; for example when a series of short timed
+ * calls to poll repeatedly time out but never otherwise fall off
+ * the list because of an untimed call to take at the front of the
+ * queue.
+ *
+ * When these cases arise, rather than always retraversing the
+ * entire list to find an actual predecessor to unlink (which
+ * won't help for case (1) anyway), we record a conservative
+ * estimate of possible unsplice failures (in "sweepVotes").
+ * We trigger a full sweep when the estimate exceeds a threshold
+ * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
+ * removal failures to tolerate before sweeping through, unlinking
+ * cancelled nodes that were not unlinked upon initial removal.
+ * We perform sweeps by the thread hitting threshold (rather than
+ * background threads or by spreading work to other threads)
+ * because in the main contexts in which removal occurs, the
+ * caller is already timed-out, cancelled, or performing a
+ * potentially O(n) operation (e.g. remove(x)), none of which are
+ * time-critical enough to warrant the overhead that alternatives
+ * would impose on other threads.
+ *
+ * Because the sweepVotes estimate is conservative, and because
+ * nodes become unlinked "naturally" as they fall off the head of
+ * the queue, and because we allow votes to accumulate even while
+ * sweeps are in progress, there are typically significantly fewer
+ * such nodes than estimated. Choice of a threshold value
+ * balances the likelihood of wasted effort and contention, versus
+ * providing a worst-case bound on retention of interior nodes in
+ * quiescent queues. The value defined below was chosen
+ * empirically to balance these under various timeout scenarios.
+ *
+ * Note that we cannot self-link unlinked interior nodes during
+ * sweeps. However, the associated garbage chains terminate when
+ * some successor ultimately falls off the head of the list and is
+ * self-linked.
*/
/** True if on multiprocessor */
@@ -355,33 +402,41 @@ public class LinkedTransferQueue exte
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
+ * The maximum number of estimated removal failures (sweepVotes)
+ * to tolerate before sweeping through the queue unlinking
+ * cancelled nodes that were not unlinked upon initial
+ * removal. See above for explanation. The value must be at least
+ * two to avoid useless sweeps when removing trailing nodes.
+ */
+ static final int SWEEP_THRESHOLD = 32;
+
+ /**
* Queue nodes. Uses Object, not E, for items to allow forgetting
* them after use. Relies heavily on Unsafe mechanics to minimize
- * unnecessary ordering constraints: Writes that intrinsically
- * precede or follow CASes use simple relaxed forms. Other
- * cleanups use releasing/lazy writes.
+ * unnecessary ordering constraints: Writes that are intrinsically
+ * ordered wrt other accesses or CASes use simple relaxed forms.
*/
- static final class Node {
+ static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
- volatile Node next;
+ volatile Node next;
volatile Thread waiter; // null until waiting
// CAS methods for fields
- final boolean casNext(Node cmp, Node val) {
+ final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
final boolean casItem(Object cmp, Object val) {
- assert cmp == null || cmp.getClass() != Node.class;
+ // assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
- * Creates a new node. Uses relaxed write because item can only
- * be seen if followed by CAS.
+ * Constructs a new node. Uses relaxed write because item can
+ * only be seen after publication via casNext.
*/
- Node(E item, boolean isData) {
+ Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
@@ -395,13 +450,17 @@ public class LinkedTransferQueue exte
}
/**
- * Sets item to self (using a releasing/lazy write) and waiter
- * to null, to avoid garbage retention after extracting or
- * cancelling.
+ * Sets item to self and waiter to null, to avoid garbage
+ * retention after matching or cancelling. Uses relaxed writes
+ * because order is already constrained in the only calling
+ * contexts: item is forgotten only after volatile/atomic
+ * mechanics that extract items. Similarly, clearing waiter
+ * follows either CAS or return from park (if ever parked;
+ * else we don't care).
*/
final void forgetContents() {
- UNSAFE.putOrderedObject(this, itemOffset, this);
- UNSAFE.putOrderedObject(this, waiterOffset, null);
+ UNSAFE.putObject(this, itemOffset, this);
+ UNSAFE.putObject(this, waiterOffset, null);
}
/**
@@ -435,7 +494,7 @@ public class LinkedTransferQueue exte
* Tries to artificially match a data node -- used by remove.
*/
final boolean tryMatchData() {
- assert isData;
+ // assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
@@ -444,52 +503,62 @@ public class LinkedTransferQueue exte
return false;
}
- // Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE = getUnsafe();
- private static final long nextOffset =
- objectFieldOffset(UNSAFE, "next", Node.class);
- private static final long itemOffset =
- objectFieldOffset(UNSAFE, "item", Node.class);
- private static final long waiterOffset =
- objectFieldOffset(UNSAFE, "waiter", Node.class);
-
private static final long serialVersionUID = -3375979862319811754L;
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long itemOffset;
+ private static final long nextOffset;
+ private static final long waiterOffset;
+ static {
+ try {
+ UNSAFE = getUnsafe();
+ Class> k = Node.class;
+ itemOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("item"));
+ nextOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("next"));
+ waiterOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("waiter"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
}
/** head of the queue; null until first enqueue */
- transient volatile Node head;
-
- /** predecessor of dangling unspliceable node */
- private transient volatile Node cleanMe; // decl here reduces contention
+ transient volatile Node head;
/** tail of the queue; null until first append */
- private transient volatile Node tail;
+ private transient volatile Node tail;
+
+ /** The number of apparent failures to unsplice removed nodes */
+ private transient volatile int sweepVotes;
// CAS methods for fields
- private boolean casTail(Node cmp, Node val) {
+ private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
- private boolean casHead(Node cmp, Node val) {
+ private boolean casHead(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
- private boolean casCleanMe(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
+ private boolean casSweepVotes(int cmp, int val) {
+ return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
}
/*
- * Possible values for "how" argument in xfer method. Beware that
- * the order of assigned numerical values matters.
+ * Possible values for "how" argument in xfer method.
*/
- private static final int NOW = 0; // for untimed poll, tryTransfer
- private static final int ASYNC = 1; // for offer, put, add
- private static final int SYNC = 2; // for transfer, take
- private static final int TIMEOUT = 3; // for timed poll, tryTransfer
+ private static final int NOW = 0; // for untimed poll, tryTransfer
+ private static final int ASYNC = 1; // for offer, put, add
+ private static final int SYNC = 2; // for transfer, take
+ private static final int TIMED = 3; // for timed poll, tryTransfer
@SuppressWarnings("unchecked")
static E cast(Object item) {
- assert item == null || item.getClass() != Node.class;
+ // assert item == null || item.getClass() != Node.class;
return (E) item;
}
@@ -498,31 +567,29 @@ public class LinkedTransferQueue exte
*
* @param e the item or null for take
* @param haveData true if this is a put, else a take
- * @param how NOW, ASYNC, SYNC, or TIMEOUT
- * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
+ * @param how NOW, ASYNC, SYNC, or TIMED
+ * @param nanos timeout in nanosecs, used only if mode is TIMED
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
- Node s = null; // the node to append, if needed
+ Node s = null; // the node to append, if needed
- retry: for (;;) { // restart on append race
+ retry:
+ for (;;) { // restart on append race
- for (Node h = head, p = h; p != null;) {
- // find & match first node
+ for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
if (p.casItem(item, e)) { // match
- for (Node q = p; q != h;) {
- Node n = q.next; // update head by 2
- if (n != null) // unless singleton
- q = n;
- if (head == h && casHead(h, q)) {
+ for (Node q = p; q != h;) {
+ Node n = q.next; // update by 2 unless singleton
+ if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
@@ -531,21 +598,21 @@ public class LinkedTransferQueue exte
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
- return this.cast(item);
+ return LinkedTransferQueue.cast(item);
}
}
- Node n = p.next;
+ Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
- if (how >= ASYNC) { // No matches available
+ if (how != NOW) { // No matches available
if (s == null)
- s = new Node(e, haveData);
- Node pred = tryAppend(s, haveData);
+ s = new Node(e, haveData);
+ Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
- if (how >= SYNC)
- return awaitMatch(s, pred, e, how, nanos);
+ if (how != ASYNC)
+ return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
@@ -560,9 +627,9 @@ public class LinkedTransferQueue exte
* different mode, else s's predecessor, or s itself if no
* predecessor
*/
- private Node tryAppend(Node s, boolean haveData) {
- for (Node t = tail, p = t;;) { // move p to last node and append
- Node n, u; // temps for reads of next & tail
+ private Node tryAppend(Node s, boolean haveData) {
+ for (Node t = tail, p = t;;) { // move p to last node and append
+ Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
@@ -594,12 +661,12 @@ public class LinkedTransferQueue exte
* predecessor, or null if unknown (the null case does not occur
* in any current calls but may in possible future extensions)
* @param e the comparison value for checking match
- * @param how either SYNC or TIMEOUT
- * @param nanos timeout value
+ * @param timed if true, wait only until timeout elapses
+ * @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout
*/
- private E awaitMatch(Node s, Node pred, E e, int how, long nanos) {
- long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
+ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
+ long lastTime = timed ? System.nanoTime() : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
@@ -607,12 +674,12 @@ public class LinkedTransferQueue exte
for (;;) {
Object item = s.item;
if (item != e) { // matched
- assert item != s;
+ // assert item != s;
s.forgetContents(); // avoid garbage
- return this.cast(item);
+ return LinkedTransferQueue.cast(item);
}
- if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
- s.casItem(e, s)) { // cancel
+ if ((w.isInterrupted() || (timed && nanos <= 0)) &&
+ s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
@@ -622,15 +689,14 @@ public class LinkedTransferQueue exte
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
- if (--spins == 0)
- shortenHeadPath(); // reduce slack before blocking
- else if (randomYields.nextInt(CHAINED_SPINS) == 0)
+ --spins;
+ if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
- else if (how == TIMEOUT) {
+ else if (timed) {
long now = System.nanoTime();
if ((nanos -= now - lastTime) > 0)
LockSupport.parkNanos(this, nanos);
@@ -638,8 +704,6 @@ public class LinkedTransferQueue exte
}
else {
LockSupport.park(this);
- s.waiter = null;
- spins = -1; // spin if front upon wakeup
}
}
}
@@ -648,7 +712,7 @@ public class LinkedTransferQueue exte
* Returns spin/yield value for a node with given predecessor and
* data mode. See above for explanation.
*/
- private static int spinsFor(Node> pred, boolean haveData) {
+ private static int spinsFor(Node pred, boolean haveData) {
if (MP && pred != null) {
if (pred.isData != haveData) // phase change
return FRONT_SPINS + CHAINED_SPINS;
@@ -660,39 +724,26 @@ public class LinkedTransferQueue exte
return 0;
}
+ /* -------------- Traversal methods -------------- */
+
/**
- * Tries (once) to unsplice nodes between head and first unmatched
- * or trailing node; failing on contention.
- */
- private void shortenHeadPath() {
- Node h, hn, p, q;
- if ((p = h = head) != null && h.isMatched() &&
- (q = hn = h.next) != null) {
- Node n;
- while ((n = q.next) != q) {
- if (n == null || !q.isMatched()) {
- if (hn != q && h.next == hn)
- h.casNext(hn, q);
- break;
- }
- p = q;
- q = n;
- }
- }
+ * Returns the successor of p, or the head node if p.next has been
+ * linked to self, which will only be true if traversing with a
+ * stale pointer that is now off the list.
+ */
+ final Node succ(Node p) {
+ Node next = p.next;
+ return (p == next) ? head : next;
}
- /* -------------- Traversal methods -------------- */
-
/**
* Returns the first unmatched node of the given mode, or null if
* none. Used by methods isEmpty, hasWaitingConsumer.
*/
- private Node firstOfMode(boolean data) {
- for (Node p = head; p != null; ) {
+ private Node firstOfMode(boolean isData) {
+ for (Node p = head; p != null; p = succ(p)) {
if (!p.isMatched())
- return (p.isData == data) ? p : null;
- Node n = p.next;
- p = (n != p) ? n : head;
+ return (p.isData == isData) ? p : null;
}
return null;
}
@@ -702,13 +753,14 @@ public class LinkedTransferQueue exte
* null if none. Used by peek.
*/
private E firstDataItem() {
- for (Node p = head; p != null; ) {
- boolean isData = p.isData;
+ for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
- if (item != p && (item != null) == isData)
- return isData ? this.cast(item) : null;
- Node n = p.next;
- p = (n != p) ? n : head;
+ if (p.isData) {
+ if (item != null && item != p)
+ return LinkedTransferQueue.cast(item);
+ }
+ else if (item == null)
+ return null;
}
return null;
}
@@ -719,14 +771,14 @@ public class LinkedTransferQueue exte
*/
private int countOfMode(boolean data) {
int count = 0;
- for (Node p = head; p != null; ) {
+ for (Node p = head; p != null; ) {
if (!p.isMatched()) {
if (p.isData != data)
return 0;
if (++count == Integer.MAX_VALUE) // saturated
break;
}
- Node n = p.next;
+ Node n = p.next;
if (n != p)
p = n;
else {
@@ -738,33 +790,70 @@ public class LinkedTransferQueue exte
}
final class Itr implements Iterator {
- private Node nextNode; // next node to return item for
- private E nextItem; // the corresponding item
- private Node lastRet; // last returned node, to support remove
+ private Node nextNode; // next node to return item for
+ private E nextItem; // the corresponding item
+ private Node lastRet; // last returned node, to support remove
+ private Node lastPred; // predecessor to unlink lastRet
/**
* Moves to next node after prev, or first node if prev null.
*/
- private void advance(Node prev) {
- lastRet = prev;
- Node p;
- if (prev == null || (p = prev.next) == prev)
- p = head;
- while (p != null) {
- Object item = p.item;
- if (p.isData) {
- if (item != null && item != p) {
- nextItem = LinkedTransferQueue.this.cast(item);
- nextNode = p;
+ private void advance(Node prev) {
+ /*
+ * To track and avoid buildup of deleted nodes in the face
+ * of calls to both Queue.remove and Itr.remove, we must
+ * include variants of unsplice and sweep upon each
+ * advance: Upon Itr.remove, we may need to catch up links
+ * from lastPred, and upon other removes, we might need to
+ * skip ahead from stale nodes and unsplice deleted ones
+ * found while advancing.
+ */
+
+ Node r, b; // reset lastPred upon possible deletion of lastRet
+ if ((r = lastRet) != null && !r.isMatched())
+ lastPred = r; // next lastPred is old lastRet
+ else if ((b = lastPred) == null || b.isMatched())
+ lastPred = null; // at start of list
+ else {
+ Node s, n; // help with removal of lastPred.next
+ while ((s = b.next) != null &&
+ s != b && s.isMatched() &&
+ (n = s.next) != null && n != s)
+ b.casNext(s, n);
+ }
+
+ this.lastRet = prev;
+
+ for (Node p = prev, s, n;;) {
+ s = (p == null) ? head : p.next;
+ if (s == null)
+ break;
+ else if (s == p) {
+ p = null;
+ continue;
+ }
+ Object item = s.item;
+ if (s.isData) {
+ if (item != null && item != s) {
+ nextItem = LinkedTransferQueue.cast(item);
+ nextNode = s;
return;
}
}
else if (item == null)
break;
- Node n = p.next;
- p = (n != p) ? n : head;
+ // assert s.isMatched();
+ if (p == null)
+ p = s;
+ else if ((n = s.next) == null)
+ break;
+ else if (s == n)
+ p = null;
+ else
+ p.casNext(s, n);
}
nextNode = null;
+ nextItem = null;
}
Itr() {
@@ -776,7 +865,7 @@ public class LinkedTransferQueue exte
}
public final E next() {
- Node p = nextNode;
+ Node p = nextNode;
if (p == null) throw new NoSuchElementException();
E e = nextItem;
advance(p);
@@ -784,10 +873,12 @@ public class LinkedTransferQueue exte
}
public final void remove() {
- Node p = lastRet;
- if (p == null) throw new IllegalStateException();
- lastRet = null;
- findAndRemoveDataNode(p);
+ final Node lastRet = this.lastRet;
+ if (lastRet == null)
+ throw new IllegalStateException();
+ this.lastRet = null;
+ if (lastRet.tryMatchData())
+ unsplice(lastPred, lastRet);
}
}
@@ -797,90 +888,68 @@ public class LinkedTransferQueue exte
* Unsplices (now or later) the given deleted/cancelled node with
* the given predecessor.
*
- * @param pred predecessor of node to be unspliced
+ * @param pred a node that was at one time known to be the
+ * predecessor of s, or null or s itself if s is/was at head
* @param s the node to be unspliced
*/
- private void unsplice(Node pred, Node s) {
- s.forgetContents(); // clear unneeded fields
+ final void unsplice(Node pred, Node s) {
+ s.forgetContents(); // forget unneeded fields
/*
- * At any given time, exactly one node on list cannot be
- * unlinked -- the last inserted node. To accommodate this, if
- * we cannot unlink s, we save its predecessor as "cleanMe",
- * processing the previously saved version first. Because only
- * one node in the list can have a null next, at least one of
- * node s or the node previously saved can always be
- * processed, so this always terminates.
+ * See above for rationale. Briefly: if pred still points to
+ * s, try to unlink s. If s cannot be unlinked, because it is
+ * trailing node or pred might be unlinked, and neither pred
+ * nor s are head or offlist, add to sweepVotes, and if enough
+ * votes have accumulated, sweep.
*/
- if (pred != null && pred != s) {
- while (pred.next == s) {
- Node oldpred = (cleanMe == null) ? null : reclean();
- Node n = s.next;
- if (n != null) {
- if (n != s)
- pred.casNext(s, n);
- break;
+ if (pred != null && pred != s && pred.next == s) {
+ Node n = s.next;
+ if (n == null ||
+ (n != s && pred.casNext(s, n) && pred.isMatched())) {
+ for (;;) { // check if at, or could be, head
+ Node h = head;
+ if (h == pred || h == s || h == null)
+ return; // at head or list empty
+ if (!h.isMatched())
+ break;
+ Node hn = h.next;
+ if (hn == null)
+ return; // now empty
+ if (hn != h && casHead(h, hn))
+ h.forgetNext(); // advance head
+ }
+ if (pred.next != pred && s.next != s) { // recheck if offlist
+ for (;;) { // sweep now if enough votes
+ int v = sweepVotes;
+ if (v < SWEEP_THRESHOLD) {
+ if (casSweepVotes(v, v + 1))
+ break;
+ }
+ else if (casSweepVotes(v, 0)) {
+ sweep();
+ break;
+ }
+ }
}
- if (oldpred == pred || // Already saved
- (oldpred == null && casCleanMe(null, pred)))
- break; // Postpone cleaning
}
}
}
/**
- * Tries to unsplice the deleted/cancelled node held in cleanMe
- * that was previously uncleanable because it was at tail.
- *
- * @return current cleanMe node (or null)
+ * Unlinks matched (typically cancelled) nodes encountered in a
+ * traversal from head.
*/
- private Node reclean() {
- /*
- * cleanMe is, or at one time was, predecessor of a cancelled
- * node s that was the tail so could not be unspliced. If it
- * is no longer the tail, try to unsplice if necessary and
- * make cleanMe slot available. This differs from similar
- * code in unsplice() because we must check that pred still
- * points to a matched node that can be unspliced -- if not,
- * we can (must) clear cleanMe without unsplicing. This can
- * loop only due to contention.
- */
- Node pred;
- while ((pred = cleanMe) != null) {
- Node s = pred.next;
- Node n;
- if (s == null || s == pred || !s.isMatched())
- casCleanMe(pred, null); // already gone
- else if ((n = s.next) != null) {
- if (n != s)
- pred.casNext(s, n);
- casCleanMe(pred, null);
- }
- else
+ private void sweep() {
+ for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
+ if (!s.isMatched())
+ // Unmatched nodes are never self-linked
+ p = s;
+ else if ((n = s.next) == null) // trailing node is pinned
break;
- }
- return pred;
- }
-
- /**
- * Main implementation of Iterator.remove(). Find
- * and unsplice the given data node.
- */
- final void findAndRemoveDataNode(Node s) {
- assert s.isData;
- if (s.tryMatchData()) {
- for (Node pred = null, p = head; p != null; ) {
- if (p == s) {
- unsplice(pred, p);
- break;
- }
- if (p.isUnmatchedRequest())
- break;
- pred = p;
- if ((p = p.next) == pred) { // stale
- pred = null;
- p = head;
- }
- }
+ else if (s == n) // stale
+ // No need to also check for p == s, since that implies s == n
+ p = head;
+ else
+ p.casNext(s, n);
}
}
@@ -889,7 +958,7 @@ public class LinkedTransferQueue exte
*/
private boolean findAndRemove(Object e) {
if (e != null) {
- for (Node pred = null, p = head; p != null; ) {
+ for (Node pred = null, p = head; p != null; ) {
Object item = p.item;
if (p.isData) {
if (item != null && item != p && e.equals(item) &&
@@ -947,7 +1016,8 @@ public class LinkedTransferQueue exte
* return {@code false}.
*
* @return {@code true} (as specified by
- * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
+ * {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
+ * BlockingQueue.offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
@@ -959,8 +1029,7 @@ public class LinkedTransferQueue exte
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
- * @return {@code true} (as specified by
- * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
+ * @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
@@ -1029,7 +1098,7 @@ public class LinkedTransferQueue exte
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
+ if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
@@ -1045,7 +1114,7 @@ public class LinkedTransferQueue exte
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
+ E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
@@ -1065,8 +1134,7 @@ public class LinkedTransferQueue exte
if (c == this)
throw new IllegalArgumentException();
int n = 0;
- E e;
- while ( (e = poll()) != null) {
+ for (E e; (e = poll()) != null;) {
c.add(e);
++n;
}
@@ -1083,8 +1151,7 @@ public class LinkedTransferQueue exte
if (c == this)
throw new IllegalArgumentException();
int n = 0;
- E e;
- while (n < maxElements && (e = poll()) != null) {
+ for (E e; n < maxElements && (e = poll()) != null;) {
c.add(e);
++n;
}
@@ -1092,15 +1159,15 @@ public class LinkedTransferQueue exte
}
/**
- * Returns an iterator over the elements in this queue in proper
- * sequence, from head to tail.
+ * Returns an iterator over the elements in this queue in proper sequence.
+ * The elements will be returned in order from first (head) to last (tail).
*
* The returned iterator is a "weakly consistent" iterator that
- * will never throw
- * {@link ConcurrentModificationException ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed
- * to) reflect any modifications subsequent to construction.
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException}, and guarantees to traverse
+ * elements as they existed upon construction of the iterator, and
+ * may (but is not guaranteed to) reflect any modifications
+ * subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
@@ -1118,7 +1185,11 @@ public class LinkedTransferQueue exte
* @return {@code true} if this queue contains no elements
*/
public boolean isEmpty() {
- return firstOfMode(true) == null;
+ for (Node p = head; p != null; p = succ(p)) {
+ if (!p.isMatched())
+ return !p.isData;
+ }
+ return true;
}
public boolean hasWaitingConsumer() {
@@ -1161,11 +1232,34 @@ public class LinkedTransferQueue exte
}
/**
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
+ *
+ * @param o object to be checked for containment in this queue
+ * @return {@code true} if this queue contains the specified element
+ */
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ for (Node p = head; p != null; p = succ(p)) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p && o.equals(item))
+ return true;
+ }
+ else if (item == null)
+ break;
+ }
+ return false;
+ }
+
+ /**
* Always returns {@code Integer.MAX_VALUE} because a
* {@code LinkedTransferQueue} is not capacity constrained.
*
* @return {@code Integer.MAX_VALUE} (as specified by
- * {@link BlockingQueue#remainingCapacity()})
+ * {@link java.util.concurrent.BlockingQueue#remainingCapacity()
+ * BlockingQueue.remainingCapacity})
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
@@ -1197,7 +1291,8 @@ public class LinkedTransferQueue exte
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
for (;;) {
- @SuppressWarnings("unchecked") E item = (E) s.readObject();
+ @SuppressWarnings("unchecked")
+ E item = (E) s.readObject();
if (item == null)
break;
else
@@ -1207,23 +1302,22 @@ public class LinkedTransferQueue exte
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE = getUnsafe();
- private static final long headOffset =
- objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
- private static final long tailOffset =
- objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
- private static final long cleanMeOffset =
- objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
-
- static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
- String field, Class> klazz) {
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long headOffset;
+ private static final long tailOffset;
+ private static final long sweepVotesOffset;
+ static {
try {
- return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
- } catch (NoSuchFieldException e) {
- // Convert Exception to corresponding Error
- NoSuchFieldError error = new NoSuchFieldError(field);
- error.initCause(e);
- throw error;
+ UNSAFE = getUnsafe();
+ Class> k = LinkedTransferQueue.class;
+ headOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("head"));
+ tailOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("tail"));
+ sweepVotesOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("sweepVotes"));
+ } catch (Exception e) {
+ throw new Error(e);
}
}
@@ -1237,22 +1331,23 @@ public class LinkedTransferQueue exte
static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
- } catch (SecurityException se) {
- try {
- return java.security.AccessController.doPrivileged
- (new java.security
- .PrivilegedExceptionAction() {
- public sun.misc.Unsafe run() throws Exception {
- java.lang.reflect.Field f = sun.misc
- .Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- return (sun.misc.Unsafe) f.get(null);
- }});
- } catch (java.security.PrivilegedActionException e) {
- throw new RuntimeException("Could not initialize intrinsics",
- e.getCause());
- }
+ } catch (SecurityException tryReflectionInstead) {}
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security.PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ Class k = sun.misc.Unsafe.class;
+ for (java.lang.reflect.Field f : k.getDeclaredFields()) {
+ f.setAccessible(true);
+ Object x = f.get(null);
+ if (k.isInstance(x))
+ return k.cast(x);
+ }
+ throw new NoSuchFieldError("the Unsafe");
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
}
}
-
}