--- jsr166/src/jsr166y/LinkedTransferQueue.java 2007/08/27 19:48:36 1.3
+++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/10/22 08:19:44 1.46
@@ -5,21 +5,25 @@
*/
package jsr166y;
+
import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.atomic.*;
-import java.util.*;
-import java.io.*;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.locks.LockSupport;
/**
- * An unbounded {@linkplain TransferQueue} based on linked nodes.
+ * An unbounded {@link TransferQueue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect
* to any given producer. The head of the queue is that
* element that has been on the queue the longest time for some
* producer. The tail of the queue is that element that has
* been on the queue the shortest time for some producer.
*
- *
Beware that, unlike in most collections, the size
+ *
Beware that, unlike in most collections, the {@code size}
* method is NOT a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
@@ -42,410 +46,972 @@ import java.io.*;
* @since 1.7
* @author Doug Lea
* @param the type of elements held in this collection
- *
*/
public class LinkedTransferQueue extends AbstractQueue
implements TransferQueue, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L;
/*
- * This is still a work in prgress...
+ * *** Overview of Dual Queues with Slack ***
+ *
+ * Dual Queues, introduced by Scherer and Scott
+ * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
+ * (linked) queues in which nodes may represent either data or
+ * requests. When a thread tries to enqueue a data node, but
+ * encounters a request node, it instead "matches" and removes it;
+ * and vice versa for enqueuing requests. Blocking Dual Queues
+ * arrange that threads enqueuing unmatched requests block until
+ * other threads provide the match. Dual Synchronous Queues (see
+ * Scherer, Lea, & Scott
+ * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
+ * additionally arrange that threads enqueuing unmatched data also
+ * block. Dual Transfer Queues support all of these modes, as
+ * dictated by callers.
+ *
+ * A FIFO dual queue may be implemented using a variation of the
+ * Michael & Scott (M&S) lock-free queue algorithm
+ * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
+ * It maintains two pointer fields, "head", pointing to a
+ * (matched) node that in turn points to the first actual
+ * (unmatched) queue node (or null if empty); and "tail" that
+ * points to the last node on the queue (or again null if
+ * empty). For example, here is a possible queue with four data
+ * elements:
+ *
+ * head tail
+ * | |
+ * v v
+ * M -> U -> U -> U -> U
+ *
+ * The M&S queue algorithm is known to be prone to scalability and
+ * overhead limitations when maintaining (via CAS) these head and
+ * tail pointers. This has led to the development of
+ * contention-reducing variants such as elimination arrays (see
+ * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
+ * optimistic back pointers (see Ladan-Mozes & Shavit
+ * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
+ * However, the nature of dual queues enables a simpler tactic for
+ * improving M&S-style implementations when dual-ness is needed.
+ *
+ * In a dual queue, each node must atomically maintain its match
+ * status. While there are other possible variants, we implement
+ * this here as: for a data-mode node, matching entails CASing an
+ * "item" field from a non-null data value to null upon match, and
+ * vice-versa for request nodes, CASing from null to a data
+ * value. (Note that the linearization properties of this style of
+ * queue are easy to verify -- elements are made available by
+ * linking, and unavailable by matching.) Compared to plain M&S
+ * queues, this property of dual queues requires one additional
+ * successful atomic operation per enq/deq pair. But it also
+ * enables lower cost variants of queue maintenance mechanics. (A
+ * variation of this idea applies even for non-dual queues that
+ * support deletion of embedded elements, such as
+ * j.u.c.ConcurrentLinkedQueue.)
+ *
+ * Once a node is matched, its item can never again change. We
+ * may thus arrange that the linked list of them contains a prefix
+ * of zero or more matched nodes, followed by a suffix of zero or
+ * more unmatched nodes. (Note that we allow both the prefix and
+ * suffix to be zero length, which in turn means that we do not
+ * use a dummy header.) If we were not concerned with either time
+ * or space efficiency, we could correctly perform enqueue and
+ * dequeue operations by traversing from a pointer to the initial
+ * node; CASing the item of the first unmatched node on match and
+ * CASing the next field of the trailing node on appends. While
+ * this would be a terrible idea in itself, it does have the
+ * benefit of not requiring ANY atomic updates on head/tail
+ * fields.
+ *
+ * We introduce here an approach that lies between the extremes of
+ * never versus always updating queue (head and tail) pointers
+ * that reflects the tradeoff of sometimes requiring extra traversal
+ * steps to locate the first and/or last unmatched nodes, versus
+ * the reduced overhead and contention of fewer updates to queue
+ * pointers. For example, a possible snapshot of a queue is:
+ *
+ * head tail
+ * | |
+ * v v
+ * M -> M -> U -> U -> U -> U
+ *
+ * The best value for this "slack" (the targeted maximum distance
+ * between the value of "head" and the first unmatched node, and
+ * similarly for "tail") is an empirical matter. We have found
+ * that using very small constants in the range of 1-3 work best
+ * over a range of platforms. Larger values introduce increasing
+ * costs of cache misses and risks of long traversal chains.
+ *
+ * Dual queues with slack differ from plain M&S dual queues by
+ * virtue of only sometimes updating head or tail pointers when
+ * matching, appending, or even traversing nodes; in order to
+ * maintain a targeted slack. The idea of "sometimes" may be
+ * operationalized in several ways. The simplest is to use a
+ * per-operation counter incremented on each traversal step, and
+ * to try (via CAS) to update the associated queue pointer
+ * whenever the count exceeds a threshold. Another, that requires
+ * more overhead, is to use random number generators to update
+ * with a given probability per traversal step.
+ *
+ * In any strategy along these lines, because CASes updating
+ * fields may fail, the actual slack may exceed targeted
+ * slack. However, they may be retried at any time to maintain
+ * targets. Even when using very small slack values, this
+ * approach works well for dual queues because it allows all
+ * operations up to the point of matching or appending an item
+ * (hence potentially releasing another thread) to be read-only,
+ * thus not introducing any further contention. As described
+ * below, we implement this by performing slack maintenance
+ * retries only after these points.
+ *
+ * As an accompaniment to such techniques, traversal overhead can
+ * be further reduced without increasing contention of head
+ * pointer updates. During traversals, threads may sometimes
+ * shortcut the "next" link path from the current "head" node to
+ * be closer to the currently known first unmatched node. Again,
+ * this may be triggered with using thresholds or randomization.
+ *
+ * These ideas must be further extended to avoid unbounded amounts
+ * of costly-to-reclaim garbage caused by the sequential "next"
+ * links of nodes starting at old forgotten head nodes: As first
+ * described in detail by Boehm
+ * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
+ * delays noticing that any arbitrarily old node has become
+ * garbage, all newer dead nodes will also be unreclaimed.
+ * (Similar issues arise in non-GC environments.) To cope with
+ * this in our implementation, upon CASing to advance the head
+ * pointer, we set the "next" link of the previous head to point
+ * only to itself; thus limiting the length of connected dead lists.
+ * (We also take similar care to wipe out possibly garbage
+ * retaining values held in other Node fields.) However, doing so
+ * adds some further complexity to traversal: If any "next"
+ * pointer links to itself, it indicates that the current thread
+ * has lagged behind a head-update, and so the traversal must
+ * continue from the "head". Traversals trying to find the
+ * current tail starting from "tail" may also encounter
+ * self-links, in which case they also continue at "head".
+ *
+ * It is tempting in slack-based scheme to not even use CAS for
+ * updates (similarly to Ladan-Mozes & Shavit). However, this
+ * cannot be done for head updates under the above link-forgetting
+ * mechanics because an update may leave head at a detached node.
+ * And while direct writes are possible for tail updates, they
+ * increase the risk of long retraversals, and hence long garbage
+ * chains which can be much more costly than is worthwhile
+ * considering that the cost difference of performing a CAS vs
+ * write is smaller when they are not triggered on each operation
+ * (especially considering that writes and CASes equally require
+ * additional GC bookkeeping ("write barriers") that are sometimes
+ * more costly than the writes themselves because of contention).
+ *
+ * Removal of internal nodes (due to timed out or interrupted
+ * waits, or calls to remove or Iterator.remove) uses a scheme
+ * roughly similar to that in Scherer, Lea, and Scott
+ * SynchronousQueue. Given a predecessor, we can unsplice any node
+ * except the (actual) tail of the queue. To avoid build-up of
+ * cancelled trailing nodes, upon a request to remove a trailing
+ * node, it is placed in field "cleanMe" to be unspliced later.
*
- * This class extends the approach used in FIFO-mode
- * SynchronousQueues. See the internal documentation, as well as
- * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
- * Lea & Scott
- * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
+ * *** Overview of implementation ***
*
- * The main extension is to provide different Wait modes
- * for the main "xfer" method that puts or takes items.
- * These don't impact the basic dual-queue logic, but instead
- * control whether or how threads block upon insertion
- * of request or data nodes into the dual queue.
+ * We use a threshold-based approach to updates, with a target
+ * slack of two. The slack value is hard-wired: a path greater
+ * than one is naturally implemented by checking equality of
+ * traversal pointers except when the list has only one element,
+ * in which case we keep max slack at one. Avoiding tracking
+ * explicit counts across situations slightly simplifies an
+ * already-messy implementation. Using randomization would
+ * probably work better if there were a low-quality dirt-cheap
+ * per-thread one available, but even ThreadLocalRandom is too
+ * heavy for these purposes.
+ *
+ * With such a small slack value, path short-circuiting is rarely
+ * worthwhile. However, it is used (in awaitMatch) immediately
+ * before a waiting thread starts to block, as a final bit of
+ * helping at a point when contention with others is extremely
+ * unlikely (since if other threads that could release it are
+ * operating, then the current thread wouldn't be blocking).
+ *
+ * All enqueue/dequeue operations are handled by the single method
+ * "xfer" with parameters indicating whether to act as some form
+ * of offer, put, poll, take, or transfer (each possibly with
+ * timeout). The relative complexity of using one monolithic
+ * method outweighs the code bulk and maintenance problems of
+ * using nine separate methods.
+ *
+ * Operation consists of up to three phases. The first is
+ * implemented within method xfer, the second in tryAppend, and
+ * the third in method awaitMatch.
+ *
+ * 1. Try to match an existing node
+ *
+ * Starting at head, skip already-matched nodes until finding
+ * an unmatched node of opposite mode, if one exists, in which
+ * case matching it and returning, also if necessary updating
+ * head to one past the matched node (or the node itself if the
+ * list has no other unmatched nodes). If the CAS misses, then
+ * a retry loops until the slack is at most two. Traversals
+ * also check if the initial head is now off-list, in which
+ * case they start at the new head.
+ *
+ * If no candidates are found and the call was untimed
+ * poll/offer, (argument "how" is NOW) return.
+ *
+ * 2. Try to append a new node (method tryAppend)
+ *
+ * Starting at current tail pointer, try to append a new node
+ * to the list (or if head was null, establish the first
+ * node). Nodes can be appended only if their predecessors are
+ * either already matched or are of the same mode. If we detect
+ * otherwise, then a new node with opposite mode must have been
+ * appended during traversal, so must restart at phase 1. The
+ * traversal and update steps are otherwise similar to phase 1:
+ * Retrying upon CAS misses and checking for staleness. In
+ * particular, if a self-link is encountered, then we can
+ * safely jump to a node on the list by continuing the
+ * traversal at current head.
+ *
+ * On successful append, if the call was ASYNC, return.
+ *
+ * 3. Await match or cancellation (method awaitMatch)
+ *
+ * Wait for another thread to match node; instead cancelling if
+ * current thread was interrupted or the wait timed out. On
+ * multiprocessors, we use front-of-queue spinning: If a node
+ * appears to be the first unmatched node in the queue, it
+ * spins a bit before blocking. In either case, before blocking
+ * it tries to unsplice any nodes between the current "head"
+ * and the first unmatched node.
+ *
+ * Front-of-queue spinning vastly improves performance of
+ * heavily contended queues. And so long as it is relatively
+ * brief and "quiet", spinning does not much impact performance
+ * of less-contended queues. During spins threads check their
+ * interrupt status and generate a thread-local random number
+ * to decide to occasionally perform a Thread.yield. While
+ * yield has underdefined specs, we assume that might it help,
+ * and will not hurt in limiting impact of spinning on busy
+ * systems. We also use much smaller (1/4) spins for nodes
+ * that are not known to be front but whose predecessors have
+ * not blocked -- these "chained" spins avoid artifacts of
+ * front-of-queue rules which otherwise lead to alternating
+ * nodes spinning vs blocking. Further, front threads that
+ * represent phase changes (from data to request node or vice
+ * versa) compared to their predecessors receive additional
+ * spins, reflecting the longer code path lengths necessary to
+ * release them under contention.
*/
- // Wait modes for xfer method
- static final int NOWAIT = 0;
- static final int TIMEOUT = 1;
- static final int WAIT = 2;
-
- /** The number of CPUs, for spin control */
- static final int NCPUS = Runtime.getRuntime().availableProcessors();
+ /** True if on multiprocessor */
+ private static final boolean MP =
+ Runtime.getRuntime().availableProcessors() > 1;
/**
- * The number of times to spin before blocking in timed waits.
- * The value is empirically derived -- it works well across a
- * variety of processors and OSes. Empirically, the best value
- * seems not to vary with number of CPUs (beyond 2) so is just
- * a constant.
+ * The number of times to spin (with on average one randomly
+ * interspersed call to Thread.yield) on multiprocessor before
+ * blocking when a node is apparently the first waiter in the
+ * queue. See above for explanation. Must be a power of two. The
+ * value is empirically derived -- it works pretty well across a
+ * variety of processors, numbers of CPUs, and OSes.
*/
- static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
+ private static final int FRONT_SPINS = 1 << 7;
/**
- * The number of times to spin before blocking in untimed waits.
- * This is greater than timed value because untimed waits spin
- * faster since they don't need to check times on each spin.
+ * The number of times to spin before blocking when a node is
+ * preceded by another node that is apparently spinning.
*/
- static final int maxUntimedSpins = maxTimedSpins * 16;
+ private static final int CHAINED_SPINS = FRONT_SPINS >>> 2;
/**
- * The number of nanoseconds for which it is faster to spin
- * rather than to use timed park. A rough estimate suffices.
+ * Queue nodes. Uses Object, not E, for items to allow forgetting
+ * them after use. Relies heavily on Unsafe mechanics to minimize
+ * unnecessary ordering constraints: Writes that intrinsically
+ * precede or follow CASes use simple relaxed forms. Other
+ * cleanups use releasing/lazy writes.
*/
- static final long spinForTimeoutThreshold = 1000L;
-
- /**
- * Node class for LinkedTransferQueue. Opportunistically subclasses from
- * AtomicReference to represent item. Uses Object, not E, to allow
- * setting item to "this" after use, to avoid garbage
- * retention. Similarly, setting the next field to this is used as
- * sentinel that node is off list.
- */
- static final class QNode extends AtomicReference