--- jsr166/src/jsr166y/LinkedTransferQueue.java 2009/08/04 20:32:16 1.44 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2009/10/21 16:30:40 1.45 @@ -15,8 +15,6 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.atomic.AtomicReference; - /** * An unbounded {@link TransferQueue} based on linked nodes. * This queue orders elements FIFO (first-in-first-out) with respect @@ -54,405 +52,821 @@ public class LinkedTransferQueue exte private static final long serialVersionUID = -3223113410248163686L; /* - * This class extends the approach used in FIFO-mode - * SynchronousQueues. See the internal documentation, as well as - * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, - * Lea & Scott - * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) + * *** Overview of Dual Queues with Slack *** * - * The main extension is to provide different Wait modes for the - * main "xfer" method that puts or takes items. These don't - * impact the basic dual-queue logic, but instead control whether - * or how threads block upon insertion of request or data nodes - * into the dual queue. It also uses slightly different - * conventions for tracking whether nodes are off-list or - * cancelled. - */ - - // Wait modes for xfer method - static final int NOWAIT = 0; - static final int TIMEOUT = 1; - static final int WAIT = 2; - - /** The number of CPUs, for spin control */ - static final int NCPUS = Runtime.getRuntime().availableProcessors(); - - /** - * The number of times to spin before blocking in timed waits. - * The value is empirically derived -- it works well across a - * variety of processors and OSes. Empirically, the best value - * seems not to vary with number of CPUs (beyond 2) so is just - * a constant. - */ - static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; + * Dual Queues, introduced by Scherer and Scott + * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are + * (linked) queues in which nodes may represent either data or + * requests. When a thread tries to enqueue a data node, but + * encounters a request node, it instead "matches" and removes it; + * and vice versa for enqueuing requests. Blocking Dual Queues + * arrange that threads enqueuing unmatched requests block until + * other threads provide the match. Dual Synchronous Queues (see + * Scherer, Lea, & Scott + * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) + * additionally arrange that threads enqueuing unmatched data also + * block. Dual Transfer Queues support all of these modes, as + * dictated by callers. + * + * A FIFO dual queue may be implemented using a variation of the + * Michael & Scott (M&S) lock-free queue algorithm + * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). + * It maintains two pointer fields, "head", pointing to a + * (matched) node that in turn points to the first actual + * (unmatched) queue node (or null if empty); and "tail" that + * points to the last node on the queue (or again null if + * empty). For example, here is a possible queue with four data + * elements: + * + * head tail + * | | + * v v + * M -> U -> U -> U -> U + * + * The M&S queue algorithm is known to be prone to scalability and + * overhead limitations when maintaining (via CAS) these head and + * tail pointers. This has led to the development of + * contention-reducing variants such as elimination arrays (see + * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and + * optimistic back pointers (see Ladan-Mozes & Shavit + * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). + * However, the nature of dual queues enables a simpler tactic for + * improving M&S-style implementations when dual-ness is needed. + * + * In a dual queue, each node must atomically maintain its match + * status. While there are other possible variants, we implement + * this here as: for a data-mode node, matching entails CASing an + * "item" field from a non-null data value to null upon match, and + * vice-versa for request nodes, CASing from null to a data + * value. (Note that the linearization properties of this style of + * queue are easy to verify -- elements are made available by + * linking, and unavailable by matching.) Compared to plain M&S + * queues, this property of dual queues requires one additional + * successful atomic operation per enq/deq pair. But it also + * enables lower cost variants of queue maintenance mechanics. (A + * variation of this idea applies even for non-dual queues that + * support deletion of embedded elements, such as + * j.u.c.ConcurrentLinkedQueue.) + * + * Once a node is matched, its item can never again change. We + * may thus arrange that the linked list of them contains a prefix + * of zero or more matched nodes, followed by a suffix of zero or + * more unmatched nodes. (Note that we allow both the prefix and + * suffix to be zero length, which in turn means that we do not + * use a dummy header.) If we were not concerned with either time + * or space efficiency, we could correctly perform enqueue and + * dequeue operations by traversing from a pointer to the initial + * node; CASing the item of the first unmatched node on match and + * CASing the next field of the trailing node on appends. While + * this would be a terrible idea in itself, it does have the + * benefit of not requiring ANY atomic updates on head/tail + * fields. + * + * We introduce here an approach that lies between the extremes of + * never versus always updating queue (head and tail) pointers + * that reflects the tradeoff of sometimes require extra traversal + * steps to locate the first and/or last unmatched nodes, versus + * the reduced overhead and contention of fewer updates to queue + * pointers. For example, a possible snapshot of a queue is: + * + * head tail + * | | + * v v + * M -> M -> U -> U -> U -> U + * + * The best value for this "slack" (the targeted maximum distance + * between the value of "head" and the first unmatched node, and + * similarly for "tail") is an empirical matter. We have found + * that using very small constants in the range of 1-3 work best + * over a range of platforms. Larger values introduce increasing + * costs of cache misses and risks of long traversal chains. + * + * Dual queues with slack differ from plain M&S dual queues by + * virtue of only sometimes updating head or tail pointers when + * matching, appending, or even traversing nodes; in order to + * maintain a targeted slack. The idea of "sometimes" may be + * operationalized in several ways. The simplest is to use a + * per-operation counter incremented on each traversal step, and + * to try (via CAS) to update the associated queue pointer + * whenever the count exceeds a threshold. Another, that requires + * more overhead, is to use random number generators to update + * with a given probability per traversal step. + * + * In any strategy along these lines, because CASes updating + * fields may fail, the actual slack may exceed targeted + * slack. However, they may be retried at any time to maintain + * targets. Even when using very small slack values, this + * approach works well for dual queues because it allows all + * operations up to the point of matching or appending an item + * (hence potentially releasing another thread) to be read-only, + * thus not introducing any further contention. As described + * below, we implement this by performing slack maintenance + * retries only after these points. + * + * As an accompaniment to such techniques, traversal overhead can + * be further reduced without increasing contention of head + * pointer updates. During traversals, threads may sometimes + * shortcut the "next" link path from the current "head" node to + * be closer to the currently known first unmatched node. Again, + * this may be triggered with using thresholds or randomization. + * + * These ideas must be further extended to avoid unbounded amounts + * of costly-to-reclaim garbage caused by the sequential "next" + * links of nodes starting at old forgotten head nodes: As first + * described in detail by Boehm + * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC + * delays noticing that any arbitrarily old node has become + * garbage, all newer dead nodes will also be unreclaimed. + * (Similar issues arise in non-GC environments.) To cope with + * this in our implementation, upon CASing to advance the head + * pointer, we set the "next" link of the previous head to point + * only to itself; thus limiting the length connected dead lists. + * (We also take similar care to wipe out possibly garbage + * retaining values held in other Node fields.) However, doing so + * adds some further complexity to traversal: If any "next" + * pointer links to itself, it indicates that the current thread + * has lagged behind a head-update, and so the traversal must + * continue from the "head". Traversals trying to find the + * current tail starting from "tail" may also encounter + * self-links, in which case they also continue at "head". + * + * It is tempting in slack-based scheme to not even use CAS for + * updates (similarly to Ladan-Mozes & Shavit). However, this + * cannot be done for head updates under the above link-forgetting + * mechanics because an update may leave head at a detached node. + * And while direct writes are possible for tail updates, they + * increase the risk of long retraversals, and hence long garbage + * chains which can be much more costly than is worthwhile + * considering that the cost difference of performing a CAS vs + * write is smaller when they are not triggered on each operation + * (especially considering that writes and CASes equally require + * additional GC bookkeeping ("write barriers") that are sometimes + * more costly than the writes themselves because of contention). + * + * Removal of internal nodes (due to timed out or interrupted + * waits, or calls to remove or Iterator.remove) uses a scheme + * roughly similar to that in Scherer, Lea, and Scott + * SynchronousQueue. Given a predecessor, we can unsplice any node + * except the (actual) tail of the queue. To avoid build-up of + * cancelled trailing nodes, upon a request to remove a trailing + * node, it is placed in field "cleanMe" to be unspliced later. + * + * *** Overview of implementation *** + * + * We use a threshold-based approach to updates, with a target + * slack of two. The slack value is hard-wired: a path greater + * than one is naturally implemented by checking equality of + * traversal pointers except when the list has only one element, + * in which case we keep max slack at one. Avoiding tracking + * explicit counts across situations slightly simplifies an + * already-messy implementation. Using randomization would + * probably work better if there were a low-quality dirt-cheap + * per-thread one available, but even ThreadLocalRandom is too + * heavy for these purposes. + * + * With such a small slack value, path short-circuiting is rarely + * worthwhile. However, it is used (in awaitMatch) immediately + * before a waiting thread starts to block, as a final bit of + * helping at a point when contention with others is extremely + * unlikely (since if other threads that could release it are + * operating, then the current thread wouldn't be blocking). + * + * All enqueue/dequeue operations are handled by the single method + * "xfer" with parameters indicating whether to act as some form + * of offer, put, poll, take, or transfer (each possibly with + * timeout). The relative complexity of using one monolithic + * method outweighs the code bulk and maintenance problems of + * using nine separate methods. + * + * Operation consists of up to three phases. The first is + * implemented within method xfer, the second in tryAppend, and + * the third in method awaitMatch. + * + * 1. Try to match an existing node + * + * Starting at head, skip already-matched nodes until finding + * an unmatched node of opposite mode, if one exists, in which + * case matching it and returning, also if necessary updating + * head to one past the matched node (or the node itself if the + * list has no other unmatched nodes). If the CAS misses, then + * a retry loops until the slack is at most two. Traversals + * also check if the initial head is now off-list, in which + * case they start at the new head. + * + * If no candidates are found and the call was untimed + * poll/offer, (argument "how" is NOW) return. + * + * 2. Try to append a new node (method tryAppend) + * + * Starting at current tail pointer, try to append a new node + * to the list (or if head was null, establish the first + * node). Nodes can be appended only if their predecessors are + * either already matched or are of the same mode. If we detect + * otherwise, then a new node with opposite mode must have been + * appended during traversal, so must restart at phase 1. The + * traversal and update steps are otherwise similar to phase 1: + * Retrying upon CAS misses and checking for staleness. In + * particular, if a self-link is encountered, then we can + * safely jump to a node on the list by continuing the + * traversal at current head. + * + * On successful append, if the call was ASYNC, return + * + * 3. Await match or cancellation (method awaitMatch) + * + * Wait for another thread to match node; instead cancelling if + * current thread was interrupted or the wait timed out. On + * multiprocessors, we use front-of-queue spinning: If a node + * appears to be the first unmatched node in the queue, it + * spins a bit before blocking. In either case, before blocking + * it tries to unsplice any nodes between the current "head" + * and the first unmatched node. + * + * Front-of-queue spinning vastly improves performance of + * heavily contended queues. And so long as it is relatively + * brief and "quiet", spinning does not much impact performance + * of less-contended queues. During spins threads check their + * interrupt status and generate a thread-local random number + * to decide to occasionally perform a Thread.yield. While + * yield has underdefined specs, we assume that might it help, + * and will not hurt in limiting impact of spinning on busy + * systems. We also use much smaller (1/4) spins for nodes + * that are not known to be front but whose predecessors have + * not blocked -- these "chained" spins avoid artifacts of + * front-of-queue rules which otherwise lead to alternating + * nodes spinning vs blocking. Further, front threads that + * represent phase changes (from data to request node or vice + * versa) compared to their predecessors receive additional + * spins, reflecting the longer code path lengths necessary to + * release them under contention. + */ + + /** True if on multiprocessor */ + private static final boolean MP = + Runtime.getRuntime().availableProcessors() > 1; + + /** + * The number of times to spin (with on average one randomly + * interspersed call to Thread.yield) on multiprocessor before + * blocking when a node is apparently the first waiter in the + * queue. See above for explanation. Must be a power of two. The + * value is empirically derived -- it works pretty well across a + * variety of processors, numbers of CPUs, and OSes. + */ + private static final int FRONT_SPINS = 1 << 7; + + /** + * The number of times to spin before blocking when a node is + * preceded by another node that is apparently spinning. + */ + private static final int CHAINED_SPINS = FRONT_SPINS >>> 2; + + /** + * Queue nodes. Uses Object, not E for items to allow forgetting + * them after use. Relies heavily on Unsafe mechanics to minimize + * unecessary ordering constraints: Writes that intrinsically + * precede or follow CASes use simple relaxed forms. Other + * cleanups use releasing/lazy writes. + */ + static final class Node { + final boolean isData; // false if this is a request node + volatile Object item; // initially nonnull if isData; CASed to match + volatile Node next; + volatile Thread waiter; // null until waiting - /** - * The number of times to spin before blocking in untimed waits. - * This is greater than timed value because untimed waits spin - * faster since they don't need to check times on each spin. - */ - static final int maxUntimedSpins = maxTimedSpins * 16; - - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - static final long spinForTimeoutThreshold = 1000L; + // CAS methods for fields + final boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } - /** - * Node class for LinkedTransferQueue. Opportunistically - * subclasses from AtomicReference to represent item. Uses Object, - * not E, to allow setting item to "this" after use, to avoid - * garbage retention. Similarly, setting the next field to this is - * used as sentinel that node is off list. - */ - static final class Node extends AtomicReference { - volatile Node next; - volatile Thread waiter; // to control park/unpark - final boolean isData; + final boolean casItem(Object cmp, Object val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } - Node(E item, boolean isData) { - super(item); + /** + * Create a new node. Uses relaxed write because item can only + * be seen if followed by CAS + */ + Node(Object item, boolean isData) { + UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } - // Unsafe mechanics + /** + * Links node to itself to avoid garbage retention. Called + * only after CASing head field, so uses relaxed write. + */ + final void forgetNext() { + UNSAFE.putObject(this, nextOffset, this); + } - private static final sun.misc.Unsafe UNSAFE = getUnsafe(); - private static final long nextOffset = - objectFieldOffset(UNSAFE, "next", Node.class); + /** + * Sets item to self (using a releasing/lazy write) and waiter + * to null, to avoid garbage retention after extracting or + * cancelling. + */ + final void forgetContents() { + UNSAFE.putOrderedObject(this, itemOffset, this); + UNSAFE.putOrderedObject(this, waiterOffset, null); + } - final boolean casNext(Node cmp, Node val) { - return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + /** + * Returns true if this node has been matched, including the + * case of artificial matches due to cancellation. + */ + final boolean isMatched() { + Object x = item; + return x == this || (x != null) != isData; } - final void clearNext() { - UNSAFE.putOrderedObject(this, nextOffset, this); + /** + * Returns true if a node with the given mode cannot be + * appended to this node because this node is unmatched and + * has opposite data mode. + */ + final boolean cannotPrecede(boolean haveData) { + boolean d = isData; + Object x; + return d != haveData && (x = item) != this && (x != null) == d; } /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe + * Tries to artifically match a data node -- used by remove. */ - private static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } + final boolean tryMatchData() { + Object x = item; + if (x != null && x != this && casItem(x, null)) { + LockSupport.unpark(waiter); + return true; } + return false; } + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); + private static final long itemOffset = + objectFieldOffset(UNSAFE, "item", Node.class); + private static final long waiterOffset = + objectFieldOffset(UNSAFE, "waiter", Node.class); + private static final long serialVersionUID = -3375979862319811754L; } - /** - * Padded version of AtomicReference used for head, tail and - * cleanMe, to alleviate contention across threads CASing one vs - * the other. - */ - static final class PaddedAtomicReference extends AtomicReference { - // enough padding for 64bytes with 4byte refs - Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - PaddedAtomicReference(T r) { super(r); } - private static final long serialVersionUID = 8170090609809740854L; - } + /** head of the queue; null until first enqueue */ + private transient volatile Node head; + /** predecessor of dangling unspliceable node */ + private transient volatile Node cleanMe; // decl here to reduce contention - /** head of the queue */ - private transient final PaddedAtomicReference> head; + /** tail of the queue; null until first append */ + private transient volatile Node tail; - /** tail of the queue */ - private transient final PaddedAtomicReference> tail; + // CAS methods for fields + private boolean casTail(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); + } - /** - * Reference to a cancelled node that might not yet have been - * unlinked from queue because it was the last inserted node - * when it cancelled. - */ - private transient final PaddedAtomicReference> cleanMe; + private boolean casHead(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); + } - /** - * Tries to cas nh as new head; if successful, unlink - * old head's next node to avoid garbage retention. - */ - private boolean advanceHead(Node h, Node nh) { - if (h == head.get() && head.compareAndSet(h, nh)) { - h.clearNext(); // forget old next - return true; - } - return false; + private boolean casCleanMe(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); } + /* + * Possible values for "how" argument in xfer method. Beware that + * the order of assigned numerical values matters. + */ + private static final int NOW = 0; // for untimed poll, tryTransfer + private static final int ASYNC = 1; // for offer, put, add + private static final int SYNC = 2; // for transfer, take + private static final int TIMEOUT = 3; // for timed poll, tryTransfer + /** - * Puts or takes an item. Used for most queue operations (except - * poll() and tryTransfer()). See the similar code in - * SynchronousQueue for detailed explanation. + * Implements all queuing methods. See above for explanation. * - * @param e the item or if null, signifies that this is a take - * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT + * @param e the item or null for take + * @param haveData true if this is a put else a take + * @param how NOW, ASYNC, SYNC, or TIMEOUT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT - * @return an item, or null on failure + * @return an item if matched, else e; + * @throws NullPointerException if haveData mode but e is null */ - private E xfer(E e, int mode, long nanos) { - boolean isData = (e != null); - Node s = null; - final PaddedAtomicReference> head = this.head; - final PaddedAtomicReference> tail = this.tail; + private Object xfer(Object e, boolean haveData, int how, long nanos) { + if (haveData && (e == null)) + throw new NullPointerException(); + Node s = null; // the node to append, if needed - for (;;) { - Node t = tail.get(); - Node h = head.get(); + retry: for (;;) { // restart on append race - if (t == h || t.isData == isData) { - if (s == null) - s = new Node(e, isData); - Node last = t.next; - if (last != null) { - if (t == tail.get()) - tail.compareAndSet(t, last); - } - else if (t.casNext(null, s)) { - tail.compareAndSet(t, s); - return awaitFulfill(t, s, e, mode, nanos); - } - } else { - Node first = h.next; - if (t == tail.get() && first != null && - advanceHead(h, first)) { - Object x = first.get(); - if (x != first && first.compareAndSet(x, e)) { - LockSupport.unpark(first.waiter); - return isData ? e : (E) x; + for (Node h = head, p = h; p != null;) { // find & match first node + boolean isData = p.isData; + Object item = p.item; + if (item != p && (item != null) == isData) { // unmatched + if (isData == haveData) // can't match + break; + if (p.casItem(item, e)) { // match + Thread w = p.waiter; + while (p != h) { // update head + Node n = p.next; // by 2 unless singleton + if (n != null) + p = n; + if (head == h && casHead(h, p)) { + h.forgetNext(); + break; + } // advance and retry + if ((h = head) == null || + (p = h.next) == null || !p.isMatched()) + break; // unless slack < 2 + } + LockSupport.unpark(w); + return item; } } + Node n = p.next; + p = p != n ? n : (h = head); // Use head if p offlist + } + + if (how >= ASYNC) { // No matches available + if (s == null) + s = new Node(e, haveData); + Node pred = tryAppend(s, haveData); + if (pred == null) + continue retry; // lost race vs opposite mode + if (how >= SYNC) + return awaitMatch(pred, s, e, how, nanos); } + return e; // not waiting } } - /** - * Version of xfer for poll() and tryTransfer, which - * simplifies control paths both here and in xfer. + * Tries to append node s as tail + * @param haveData true if appending in data mode + * @param s the node to append + * @return null on failure due to losing race with append in + * different mode, else s's predecessor, or s itself if no + * predecessor */ - private E fulfill(E e) { - boolean isData = (e != null); - final PaddedAtomicReference> head = this.head; - final PaddedAtomicReference> tail = this.tail; - - for (;;) { - Node t = tail.get(); - Node h = head.get(); - - if (t == h || t.isData == isData) { - Node last = t.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); - else - return null; - } - } else { - Node first = h.next; - if (t == tail.get() && - first != null && - advanceHead(h, first)) { - Object x = first.get(); - if (x != first && first.compareAndSet(x, e)) { - LockSupport.unpark(first.waiter); - return isData ? e : (E) x; - } + private Node tryAppend(Node s, boolean haveData) { + for (Node t = tail, p = t;;) { // move p to actual tail and append + Node n, u; // temps for reads of next & tail + if (p == null && (p = head) == null) { + if (casHead(null, s)) + return s; // initialize + } + else if (p.cannotPrecede(haveData)) + return null; // lost race vs opposite mode + else if ((n = p.next) != null) // Not tail; keep traversing + p = p != t && t != (u = tail) ? (t = u) : // stale tail + p != n ? n : null; // restart if off list + else if (!p.casNext(null, s)) + p = p.next; // re-read on CAS failure + else { + if (p != t) { // Update if slack now >= 2 + while ((tail != t || !casTail(t, s)) && + (t = tail) != null && + (s = t.next) != null && // advance and retry + (s = s.next) != null && s != t); } + return p; } } } /** - * Spins/blocks until node s is fulfilled or caller gives up, - * depending on wait mode. + * Spins/yields/blocks until node s is matched or caller gives up. * - * @param pred the predecessor of waiting node + * @param pred the predecessor of s or s or null if none * @param s the waiting node * @param e the comparison value for checking match - * @param mode mode + * @param how either SYNC or TIMEOUT * @param nanos timeout value - * @return matched item, or null if cancelled + * @return matched item, or e if unmatched on interrupt or timeout */ - private E awaitFulfill(Node pred, Node s, E e, - int mode, long nanos) { - if (mode == NOWAIT) - return null; - - long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0; + private Object awaitMatch(Node pred, Node s, Object e, + int how, long nanos) { + long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L; Thread w = Thread.currentThread(); - int spins = -1; // set to desired spin count below + int spins = -1; // initialized after first item and cancel checks + ThreadLocalRandom randomYields = null; // bound if needed + for (;;) { - if (w.isInterrupted()) - s.compareAndSet(e, s); - Object x = s.get(); - if (x != e) { // Node was matched or cancelled - advanceHead(pred, s); // unlink if head - if (x == s) { // was cancelled - clean(pred, s); - return null; - } - else if (x != null) { - s.set(s); // avoid garbage retention - return (E) x; - } - else - return e; + Object item = s.item; + if (item != e) { // matched + s.forgetContents(); // avoid garbage + return item; } - if (mode == TIMEOUT) { - long now = System.nanoTime(); - nanos -= now - lastTime; - lastTime = now; - if (nanos <= 0) { - s.compareAndSet(e, s); // try to cancel - continue; - } + if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) && + s.casItem(e, s)) { // cancel + unsplice(pred, s); + return e; } - if (spins < 0) { - Node h = head.get(); // only spin if at head - spins = ((h.next != s) ? 0 : - (mode == TIMEOUT) ? maxTimedSpins : - maxUntimedSpins); + + if (spins < 0) { // establish spins at/near front + if ((spins = spinsFor(pred, s.isData)) > 0) + randomYields = ThreadLocalRandom.current(); } - if (spins > 0) + else if (spins > 0) { // spin, occasionally yield + if (randomYields.nextInt(FRONT_SPINS) == 0) + Thread.yield(); --spins; - else if (s.waiter == null) - s.waiter = w; - else if (mode != TIMEOUT) { - LockSupport.park(this); - s.waiter = null; - spins = -1; } - else if (nanos > spinForTimeoutThreshold) { - LockSupport.parkNanos(this, nanos); - s.waiter = null; - spins = -1; + else if (s.waiter == null) { + shortenHeadPath(); // reduce slack before blocking + s.waiter = w; // request unpark + } + else if (how == TIMEOUT) { + long now = System.nanoTime(); + if ((nanos -= now - lastTime) > 0) + LockSupport.parkNanos(this, nanos); + lastTime = now; + } + else { + LockSupport.park(this); + spins = -1; // spin if front upon wakeup } } } /** - * Returns validated tail for use in cleaning methods. + * Return spin/yield value for a node with given predecessor and + * data mode. See above for explanation. */ - private Node getValidatedTail() { - for (;;) { - Node h = head.get(); - Node first = h.next; - if (first != null && first.get() == first) { // help advance - advanceHead(h, first); - continue; - } - Node t = tail.get(); - Node last = t.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); // help advance - else - return t; + private static int spinsFor(Node pred, boolean haveData) { + if (MP && pred != null) { + boolean predData = pred.isData; + if (predData != haveData) // front and phase change + return FRONT_SPINS + (FRONT_SPINS >>> 1); + if (predData != (pred.item != null)) // probably at front + return FRONT_SPINS; + if (pred.waiter == null) // pred apparently spinning + return CHAINED_SPINS; + } + return 0; + } + + /** + * Tries (once) to unsplice nodes between head and first unmatched + * or trailing node; failing on contention. + */ + private void shortenHeadPath() { + Node h, hn, p, q; + if ((p = h = head) != null && h.isMatched() && + (q = hn = h.next) != null) { + Node n; + while ((n = q.next) != q) { + if (n == null || !q.isMatched()) { + if (hn != q && h.next == hn) + h.casNext(hn, q); + break; + } + p = q; + q = n; } } } + /* -------------- Traversal methods -------------- */ + /** - * Gets rid of cancelled node s with original predecessor pred. - * - * @param pred predecessor of cancelled node - * @param s the cancelled node + * Return the first unmatched node of the given mode, or null if + * none. Used by methods isEmpty, hasWaitingConsumer. */ - private void clean(Node pred, Node s) { - Thread w = s.waiter; - if (w != null) { // Wake up thread - s.waiter = null; - if (w != Thread.currentThread()) - LockSupport.unpark(w); + private Node firstOfMode(boolean data) { + for (Node p = head; p != null; ) { + if (!p.isMatched()) + return p.isData == data? p : null; + Node n = p.next; + p = n != p ? n : head; } + return null; + } + + /** + * Returns the item in the first unmatched node with isData; or + * null if none. Used by peek. + */ + private Object firstDataItem() { + for (Node p = head; p != null; ) { + boolean isData = p.isData; + Object item = p.item; + if (item != p && (item != null) == isData) + return isData ? item : null; + Node n = p.next; + p = n != p ? n : head; + } + return null; + } + + /** + * Traverse and count nodes of the given mode. + * Used by methds size and getWaitingConsumerCount. + */ + private int countOfMode(boolean data) { + int count = 0; + for (Node p = head; p != null; ) { + if (!p.isMatched()) { + if (p.isData != data) + return 0; + if (++count == Integer.MAX_VALUE) // saturated + break; + } + Node n = p.next; + if (n != p) + p = n; + else { + count = 0; + p = head; + } + } + return count; + } - if (pred == null) - return; + final class Itr implements Iterator { + private Node nextNode; // next node to return item for + private Object nextItem; // the corresponding item + private Node lastRet; // last returned node, to support remove + /** + * Moves to next node after prev, or first node if prev null. + */ + private void advance(Node prev) { + lastRet = prev; + Node p; + if (prev == null || (p = prev.next) == prev) + p = head; + while (p != null) { + Object item = p.item; + if (p.isData) { + if (item != null && item != p) { + nextItem = item; + nextNode = p; + return; + } + } + else if (item == null) + break; + Node n = p.next; + p = n != p ? n : head; + } + nextNode = null; + } + + Itr() { + advance(null); + } + + public final boolean hasNext() { + return nextNode != null; + } + + public final E next() { + Node p = nextNode; + if (p == null) throw new NoSuchElementException(); + Object e = nextItem; + advance(p); + return (E) e; + } + + public final void remove() { + Node p = lastRet; + if (p == null) throw new IllegalStateException(); + lastRet = null; + findAndRemoveNode(p); + } + } + + /* -------------- Removal methods -------------- */ + + /** + * Unsplices (now or later) the given deleted/cancelled node with + * the given predecessor. + * + * @param pred predecessor of node to be unspliced + * @param s the node to be unspliced + */ + private void unsplice(Node pred, Node s) { + s.forgetContents(); // clear unneeded fields /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, if * we cannot delete s, we save its predecessor as "cleanMe", - * processing the previously saved version first. At least one - * of node s or the node previously saved can always be + * processing the previously saved version first. Because only + * one node in the list can have a null next, at least one of + * node s or the node previously saved can always be * processed, so this always terminates. */ - while (pred.next == s) { - Node oldpred = reclean(); // First, help get rid of cleanMe - Node t = getValidatedTail(); - if (s != t) { // If not tail, try to unsplice - Node sn = s.next; // s.next == s means s already off list - if (sn == s || pred.casNext(s, sn)) + if (pred != null && pred != s) { + while (pred.next == s) { + Node oldpred = cleanMe == null? null : reclean(); + Node n = s.next; + if (n != null) { + if (n != s) + pred.casNext(s, n); break; + } + if (oldpred == pred || // Already saved + (oldpred == null && casCleanMe(null, pred))) + break; // Postpone cleaning } - else if (oldpred == pred || // Already saved - (oldpred == null && cleanMe.compareAndSet(null, pred))) - break; // Postpone cleaning } } /** - * Tries to unsplice the cancelled node held in cleanMe that was - * previously uncleanable because it was at tail. + * Tries to unsplice the deleted/cancelled node held in cleanMe + * that was previously uncleanable because it was at tail. * * @return current cleanMe node (or null) */ - private Node reclean() { + private Node reclean() { /* - * cleanMe is, or at one time was, predecessor of cancelled - * node s that was the tail so could not be unspliced. If s + * cleanMe is, or at one time was, predecessor of a cancelled + * node s that was the tail so could not be unspliced. If it * is no longer the tail, try to unsplice if necessary and * make cleanMe slot available. This differs from similar - * code in clean() because we must check that pred still - * points to a cancelled node that must be unspliced -- if - * not, we can (must) clear cleanMe without unsplicing. - * This can loop only due to contention on casNext or - * clearing cleanMe. + * code in unsplice() because we must check that pred still + * points to a matched node that can be unspliced -- if not, + * we can (must) clear cleanMe without unsplicing. This can + * loop only due to contention. */ - Node pred; - while ((pred = cleanMe.get()) != null) { - Node t = getValidatedTail(); - Node s = pred.next; - if (s != t) { - Node sn; - if (s == null || s == pred || s.get() != s || - (sn = s.next) == s || pred.casNext(s, sn)) - cleanMe.compareAndSet(pred, null); + Node pred; + while ((pred = cleanMe) != null) { + Node s = pred.next; + Node n; + if (s == null || s == pred || !s.isMatched()) + casCleanMe(pred, null); // already gone + else if ((n = s.next) != null) { + if (n != s) + pred.casNext(s, n); + casCleanMe(pred, null); } - else // s is still tail; cannot clean + else break; } return pred; } /** + * Main implementation of Iterator.remove(). Find + * and unsplice the given node. + */ + final void findAndRemoveNode(Node s) { + if (s.tryMatchData()) { + Node pred = null; + Node p = head; + while (p != null) { + if (p == s) { + unsplice(pred, p); + break; + } + if (!p.isData && !p.isMatched()) + break; + pred = p; + if ((p = p.next) == pred) { // stale + pred = null; + p = head; + } + } + } + } + + /** + * Main implementation of remove(Object) + */ + private boolean findAndRemove(Object e) { + if (e != null) { + Node pred = null; + Node p = head; + while (p != null) { + Object item = p.item; + if (p.isData) { + if (item != null && item != p && e.equals(item) && + p.tryMatchData()) { + unsplice(pred, p); + return true; + } + } + else if (item == null) + break; + pred = p; + if ((p = p.next) == pred) { + pred = null; + p = head; + } + } + } + return false; + } + + + /** * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - Node dummy = new Node(null, false); - head = new PaddedAtomicReference>(dummy); - tail = new PaddedAtomicReference>(dummy); - cleanMe = new PaddedAtomicReference>(null); } /** @@ -476,7 +890,7 @@ public class LinkedTransferQueue exte * @throws NullPointerException if the specified element is null */ public void put(E e) { - offer(e); + xfer(e, true, ASYNC, 0); } /** @@ -489,7 +903,8 @@ public class LinkedTransferQueue exte * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { - return offer(e); + xfer(e, true, ASYNC, 0); + return true; } /** @@ -501,8 +916,7 @@ public class LinkedTransferQueue exte * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { - if (e == null) throw new NullPointerException(); - xfer(e, NOWAIT, 0); + xfer(e, true, ASYNC, 0); return true; } @@ -515,7 +929,8 @@ public class LinkedTransferQueue exte * @throws NullPointerException if the specified element is null */ public boolean add(E e) { - return offer(e); + xfer(e, true, ASYNC, 0); + return true; } /** @@ -529,8 +944,7 @@ public class LinkedTransferQueue exte * @throws NullPointerException if the specified element is null */ public boolean tryTransfer(E e) { - if (e == null) throw new NullPointerException(); - return fulfill(e) != null; + return xfer(e, true, NOW, 0) == null; } /** @@ -545,9 +959,8 @@ public class LinkedTransferQueue exte * @throws NullPointerException if the specified element is null */ public void transfer(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (xfer(e, WAIT, 0) == null) { - Thread.interrupted(); + if (xfer(e, true, SYNC, 0) != null) { + Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } @@ -568,8 +981,7 @@ public class LinkedTransferQueue exte */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null) + if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; @@ -577,22 +989,22 @@ public class LinkedTransferQueue exte } public E take() throws InterruptedException { - E e = xfer(null, WAIT, 0); + Object e = xfer(null, false, SYNC, 0); if (e != null) - return e; + return (E)e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { - E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); + Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) - return e; + return (E)e; throw new InterruptedException(); } public E poll() { - return fulfill(null); + return (E)xfer(null, false, NOW, 0); } /** @@ -631,34 +1043,6 @@ public class LinkedTransferQueue exte return n; } - // Traversal-based methods - - /** - * Returns head after performing any outstanding helping steps. - */ - private Node traversalHead() { - for (;;) { - Node t = tail.get(); - Node h = head.get(); - Node last = t.next; - Node first = h.next; - if (t == tail.get()) { - if (last != null) - tail.compareAndSet(t, last); - else if (first != null) { - Object x = first.get(); - if (x == first) - advanceHead(h, first); - else - return h; - } - else - return h; - } - reclean(); - } - } - /** * Returns an iterator over the elements in this queue in proper * sequence, from head to tail. @@ -676,85 +1060,8 @@ public class LinkedTransferQueue exte return new Itr(); } - /** - * Iterators. Basic strategy is to traverse list, treating - * non-data (i.e., request) nodes as terminating list. - * Once a valid data node is found, the item is cached - * so that the next call to next() will return it even - * if subsequently removed. - */ - class Itr implements Iterator { - Node next; // node to return next - Node pnext; // predecessor of next - Node curr; // last returned node, for remove() - Node pcurr; // predecessor of curr, for remove() - E nextItem; // Cache of next item, once committed to in next - - Itr() { - advance(); - } - - /** - * Moves to next valid node and returns item to return for - * next(), or null if no such. - */ - private E advance() { - pcurr = pnext; - curr = next; - E item = nextItem; - - for (;;) { - pnext = (next == null) ? traversalHead() : next; - next = pnext.next; - if (next == pnext) { - next = null; - continue; // restart - } - if (next == null) - break; - Object x = next.get(); - if (x != null && x != next) { - nextItem = (E) x; - break; - } - } - return item; - } - - public boolean hasNext() { - return next != null; - } - - public E next() { - if (next == null) - throw new NoSuchElementException(); - return advance(); - } - - public void remove() { - Node p = curr; - if (p == null) - throw new IllegalStateException(); - Object x = p.get(); - if (x != null && x != p && p.compareAndSet(x, p)) - clean(pcurr, p); - } - } - public E peek() { - for (;;) { - Node h = traversalHead(); - Node p = h.next; - if (p == null) - return null; - Object x = p.get(); - if (p != x) { - if (!p.isData) - return null; - if (x != null) - return (E) x; - } - } + return (E) firstDataItem(); } /** @@ -763,31 +1070,11 @@ public class LinkedTransferQueue exte * @return {@code true} if this queue contains no elements */ public boolean isEmpty() { - for (;;) { - Node h = traversalHead(); - Node p = h.next; - if (p == null) - return true; - Object x = p.get(); - if (p != x) { - if (!p.isData) - return true; - if (x != null) - return false; - } - } + return firstOfMode(true) == null; } public boolean hasWaitingConsumer() { - for (;;) { - Node h = traversalHead(); - Node p = h.next; - if (p == null) - return false; - Object x = p.get(); - if (p != x) - return !p.isData; - } + return firstOfMode(false) != null; } /** @@ -803,44 +1090,11 @@ public class LinkedTransferQueue exte * @return the number of elements in this queue */ public int size() { - for (;;) { - int count = 0; - Node pred = traversalHead(); - for (;;) { - Node q = pred.next; - if (q == pred) // restart - break; - if (q == null || !q.isData) - return count; - Object x = q.get(); - if (x != null && x != q) { - if (++count == Integer.MAX_VALUE) // saturated - return count; - } - pred = q; - } - } + return countOfMode(true); } public int getWaitingConsumerCount() { - // converse of size -- count valid non-data nodes - for (;;) { - int count = 0; - Node pred = traversalHead(); - for (;;) { - Node q = pred.next; - if (q == pred) // restart - break; - if (q == null || q.isData) - return count; - Object x = q.get(); - if (x == null) { - if (++count == Integer.MAX_VALUE) // saturated - return count; - } - pred = q; - } - } + return countOfMode(false); } /** @@ -855,25 +1109,7 @@ public class LinkedTransferQueue exte * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { - if (o == null) - return false; - for (;;) { - Node pred = traversalHead(); - for (;;) { - Node q = pred.next; - if (q == pred) // restart - break; - if (q == null || !q.isData) - return false; - Object x = q.get(); - if (x != null && x != q && o.equals(x) && - q.compareAndSet(x, q)) { - clean(pred, q); - return true; - } - pred = q; - } - } + return findAndRemove(o); } /** @@ -912,7 +1148,6 @@ public class LinkedTransferQueue exte private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); - resetHeadAndTail(); for (;;) { @SuppressWarnings("unchecked") E item = (E) s.readObject(); if (item == null) @@ -922,16 +1157,6 @@ public class LinkedTransferQueue exte } } - // Support for resetting head/tail while deserializing - private void resetHeadAndTail() { - Node dummy = new Node(null, false); - UNSAFE.putObjectVolatile(this, headOffset, - new PaddedAtomicReference>(dummy)); - UNSAFE.putObjectVolatile(this, tailOffset, - new PaddedAtomicReference>(dummy)); - UNSAFE.putObjectVolatile(this, cleanMeOffset, - new PaddedAtomicReference>(null)); - } // Unsafe mechanics @@ -943,7 +1168,6 @@ public class LinkedTransferQueue exte private static final long cleanMeOffset = objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); - static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class klazz) { try { @@ -956,13 +1180,6 @@ public class LinkedTransferQueue exte } } - /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe - */ private static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); @@ -983,4 +1200,5 @@ public class LinkedTransferQueue exte } } } + }