ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.46 by jsr166, Thu Oct 22 08:19:44 2009 UTC vs.
Revision 1.70 by jsr166, Sun Nov 15 02:29:12 2009 UTC

# Line 105 | Line 105 | public class LinkedTransferQueue<E> exte
105       * successful atomic operation per enq/deq pair. But it also
106       * enables lower cost variants of queue maintenance mechanics. (A
107       * variation of this idea applies even for non-dual queues that
108 <     * support deletion of embedded elements, such as
108 >     * support deletion of interior elements, such as
109       * j.u.c.ConcurrentLinkedQueue.)
110       *
111 <     * Once a node is matched, its item can never again change.  We
112 <     * may thus arrange that the linked list of them contains a prefix
113 <     * of zero or more matched nodes, followed by a suffix of zero or
114 <     * more unmatched nodes. (Note that we allow both the prefix and
115 <     * suffix to be zero length, which in turn means that we do not
116 <     * use a dummy header.)  If we were not concerned with either time
117 <     * or space efficiency, we could correctly perform enqueue and
118 <     * dequeue operations by traversing from a pointer to the initial
119 <     * node; CASing the item of the first unmatched node on match and
120 <     * CASing the next field of the trailing node on appends.  While
121 <     * this would be a terrible idea in itself, it does have the
122 <     * benefit of not requiring ANY atomic updates on head/tail
123 <     * fields.
111 >     * Once a node is matched, its match status can never again
112 >     * change.  We may thus arrange that the linked list of them
113 >     * contain a prefix of zero or more matched nodes, followed by a
114 >     * suffix of zero or more unmatched nodes. (Note that we allow
115 >     * both the prefix and suffix to be zero length, which in turn
116 >     * means that we do not use a dummy header.)  If we were not
117 >     * concerned with either time or space efficiency, we could
118 >     * correctly perform enqueue and dequeue operations by traversing
119 >     * from a pointer to the initial node; CASing the item of the
120 >     * first unmatched node on match and CASing the next field of the
121 >     * trailing node on appends. (Plus some special-casing when
122 >     * initially empty).  While this would be a terrible idea in
123 >     * itself, it does have the benefit of not requiring ANY atomic
124 >     * updates on head/tail fields.
125       *
126       * We introduce here an approach that lies between the extremes of
127 <     * never versus always updating queue (head and tail) pointers
128 <     * that reflects the tradeoff of sometimes requiring extra traversal
129 <     * steps to locate the first and/or last unmatched nodes, versus
130 <     * the reduced overhead and contention of fewer updates to queue
131 <     * pointers. For example, a possible snapshot of a queue is:
127 >     * never versus always updating queue (head and tail) pointers.
128 >     * This offers a tradeoff between sometimes requiring extra
129 >     * traversal steps to locate the first and/or last unmatched
130 >     * nodes, versus the reduced overhead and contention of fewer
131 >     * updates to queue pointers. For example, a possible snapshot of
132 >     * a queue is:
133       *
134       *  head           tail
135       *    |              |
# Line 139 | Line 141 | public class LinkedTransferQueue<E> exte
141       * similarly for "tail") is an empirical matter. We have found
142       * that using very small constants in the range of 1-3 work best
143       * over a range of platforms. Larger values introduce increasing
144 <     * costs of cache misses and risks of long traversal chains.
144 >     * costs of cache misses and risks of long traversal chains, while
145 >     * smaller values increase CAS contention and overhead.
146       *
147       * Dual queues with slack differ from plain M&S dual queues by
148       * virtue of only sometimes updating head or tail pointers when
# Line 158 | Line 161 | public class LinkedTransferQueue<E> exte
161       * targets.  Even when using very small slack values, this
162       * approach works well for dual queues because it allows all
163       * operations up to the point of matching or appending an item
164 <     * (hence potentially releasing another thread) to be read-only,
165 <     * thus not introducing any further contention. As described
166 <     * below, we implement this by performing slack maintenance
167 <     * retries only after these points.
164 >     * (hence potentially allowing progress by another thread) to be
165 >     * read-only, thus not introducing any further contention. As
166 >     * described below, we implement this by performing slack
167 >     * maintenance retries only after these points.
168       *
169       * As an accompaniment to such techniques, traversal overhead can
170       * be further reduced without increasing contention of head
171 <     * pointer updates.  During traversals, threads may sometimes
172 <     * shortcut the "next" link path from the current "head" node to
173 <     * be closer to the currently known first unmatched node. Again,
174 <     * this may be triggered with using thresholds or randomization.
171 >     * pointer updates: Threads may sometimes shortcut the "next" link
172 >     * path from the current "head" node to be closer to the currently
173 >     * known first unmatched node, and similarly for tail. Again, this
174 >     * may be triggered with using thresholds or randomization.
175       *
176       * These ideas must be further extended to avoid unbounded amounts
177       * of costly-to-reclaim garbage caused by the sequential "next"
# Line 196 | Line 199 | public class LinkedTransferQueue<E> exte
199       * mechanics because an update may leave head at a detached node.
200       * And while direct writes are possible for tail updates, they
201       * increase the risk of long retraversals, and hence long garbage
202 <     * chains which can be much more costly than is worthwhile
202 >     * chains, which can be much more costly than is worthwhile
203       * considering that the cost difference of performing a CAS vs
204       * write is smaller when they are not triggered on each operation
205       * (especially considering that writes and CASes equally require
206       * additional GC bookkeeping ("write barriers") that are sometimes
207       * more costly than the writes themselves because of contention).
208       *
206     * Removal of internal nodes (due to timed out or interrupted
207     * waits, or calls to remove or Iterator.remove) uses a scheme
208     * roughly similar to that in Scherer, Lea, and Scott
209     * SynchronousQueue. Given a predecessor, we can unsplice any node
210     * except the (actual) tail of the queue. To avoid build-up of
211     * cancelled trailing nodes, upon a request to remove a trailing
212     * node, it is placed in field "cleanMe" to be unspliced later.
213     *
209       * *** Overview of implementation ***
210       *
211 <     * We use a threshold-based approach to updates, with a target
212 <     * slack of two.  The slack value is hard-wired: a path greater
211 >     * We use a threshold-based approach to updates, with a slack
212 >     * threshold of two -- that is, we update head/tail when the
213 >     * current pointer appears to be two or more steps away from the
214 >     * first/last node. The slack value is hard-wired: a path greater
215       * than one is naturally implemented by checking equality of
216       * traversal pointers except when the list has only one element,
217 <     * in which case we keep max slack at one. Avoiding tracking
218 <     * explicit counts across situations slightly simplifies an
217 >     * in which case we keep slack threshold at one. Avoiding tracking
218 >     * explicit counts across method calls slightly simplifies an
219       * already-messy implementation. Using randomization would
220       * probably work better if there were a low-quality dirt-cheap
221       * per-thread one available, but even ThreadLocalRandom is too
222       * heavy for these purposes.
223       *
224 <     * With such a small slack value, path short-circuiting is rarely
225 <     * worthwhile. However, it is used (in awaitMatch) immediately
226 <     * before a waiting thread starts to block, as a final bit of
227 <     * helping at a point when contention with others is extremely
228 <     * unlikely (since if other threads that could release it are
229 <     * operating, then the current thread wouldn't be blocking).
224 >     * With such a small slack threshold value, it is not worthwhile
225 >     * to augment this with path short-circuiting (i.e., unsplicing
226 >     * interior nodes) except in the case of cancellation/removal (see
227 >     * below).
228 >     *
229 >     * We allow both the head and tail fields to be null before any
230 >     * nodes are enqueued; initializing upon first append.  This
231 >     * simplifies some other logic, as well as providing more
232 >     * efficient explicit control paths instead of letting JVMs insert
233 >     * implicit NullPointerExceptions when they are null.  While not
234 >     * currently fully implemented, we also leave open the possibility
235 >     * of re-nulling these fields when empty (which is complicated to
236 >     * arrange, for little benefit.)
237       *
238       * All enqueue/dequeue operations are handled by the single method
239       * "xfer" with parameters indicating whether to act as some form
240       * of offer, put, poll, take, or transfer (each possibly with
241       * timeout). The relative complexity of using one monolithic
242       * method outweighs the code bulk and maintenance problems of
243 <     * using nine separate methods.
243 >     * using separate methods for each case.
244       *
245       * Operation consists of up to three phases. The first is
246       * implemented within method xfer, the second in tryAppend, and
# Line 249 | Line 253 | public class LinkedTransferQueue<E> exte
253       *    case matching it and returning, also if necessary updating
254       *    head to one past the matched node (or the node itself if the
255       *    list has no other unmatched nodes). If the CAS misses, then
256 <     *    a retry loops until the slack is at most two. Traversals
257 <     *    also check if the initial head is now off-list, in which
258 <     *    case they start at the new head.
256 >     *    a loop retries advancing head by two steps until either
257 >     *    success or the slack is at most two. By requiring that each
258 >     *    attempt advances head by two (if applicable), we ensure that
259 >     *    the slack does not grow without bound. Traversals also check
260 >     *    if the initial head is now off-list, in which case they
261 >     *    start at the new head.
262       *
263       *    If no candidates are found and the call was untimed
264       *    poll/offer, (argument "how" is NOW) return.
265       *
266       * 2. Try to append a new node (method tryAppend)
267       *
268 <     *    Starting at current tail pointer, try to append a new node
269 <     *    to the list (or if head was null, establish the first
270 <     *    node). Nodes can be appended only if their predecessors are
271 <     *    either already matched or are of the same mode. If we detect
272 <     *    otherwise, then a new node with opposite mode must have been
273 <     *    appended during traversal, so must restart at phase 1. The
274 <     *    traversal and update steps are otherwise similar to phase 1:
275 <     *    Retrying upon CAS misses and checking for staleness.  In
276 <     *    particular, if a self-link is encountered, then we can
277 <     *    safely jump to a node on the list by continuing the
278 <     *    traversal at current head.
268 >     *    Starting at current tail pointer, find the actual last node
269 >     *    and try to append a new node (or if head was null, establish
270 >     *    the first node). Nodes can be appended only if their
271 >     *    predecessors are either already matched or are of the same
272 >     *    mode. If we detect otherwise, then a new node with opposite
273 >     *    mode must have been appended during traversal, so we must
274 >     *    restart at phase 1. The traversal and update steps are
275 >     *    otherwise similar to phase 1: Retrying upon CAS misses and
276 >     *    checking for staleness.  In particular, if a self-link is
277 >     *    encountered, then we can safely jump to a node on the list
278 >     *    by continuing the traversal at current head.
279       *
280       *    On successful append, if the call was ASYNC, return.
281       *
282       * 3. Await match or cancellation (method awaitMatch)
283       *
284       *    Wait for another thread to match node; instead cancelling if
285 <     *    current thread was interrupted or the wait timed out. On
285 >     *    the current thread was interrupted or the wait timed out. On
286       *    multiprocessors, we use front-of-queue spinning: If a node
287       *    appears to be the first unmatched node in the queue, it
288       *    spins a bit before blocking. In either case, before blocking
# Line 290 | Line 297 | public class LinkedTransferQueue<E> exte
297       *    to decide to occasionally perform a Thread.yield. While
298       *    yield has underdefined specs, we assume that might it help,
299       *    and will not hurt in limiting impact of spinning on busy
300 <     *    systems.  We also use much smaller (1/4) spins for nodes
301 <     *    that are not known to be front but whose predecessors have
302 <     *    not blocked -- these "chained" spins avoid artifacts of
300 >     *    systems.  We also use smaller (1/2) spins for nodes that are
301 >     *    not known to be front but whose predecessors have not
302 >     *    blocked -- these "chained" spins avoid artifacts of
303       *    front-of-queue rules which otherwise lead to alternating
304       *    nodes spinning vs blocking. Further, front threads that
305       *    represent phase changes (from data to request node or vice
306       *    versa) compared to their predecessors receive additional
307 <     *    spins, reflecting the longer code path lengths necessary to
308 <     *    release them under contention.
307 >     *    chained spins, reflecting longer paths typically required to
308 >     *    unblock threads during phase changes.
309 >     *
310 >     *
311 >     * ** Unlinking removed interior nodes **
312 >     *
313 >     * In addition to minimizing garbage retention via self-linking
314 >     * described above, we also unlink removed interior nodes. These
315 >     * may arise due to timed out or interrupted waits, or calls to
316 >     * remove(x) or Iterator.remove.  Normally, given a node that was
317 >     * at one time known to be the predecessor of some node s that is
318 >     * to be removed, we can unsplice s by CASing the next field of
319 >     * its predecessor if it still points to s (otherwise s must
320 >     * already have been removed or is now offlist). But there are two
321 >     * situations in which we cannot guarantee to make node s
322 >     * unreachable in this way: (1) If s is the trailing node of list
323 >     * (i.e., with null next), then it is pinned as the target node
324 >     * for appends, so can only be removed later when other nodes are
325 >     * appended. (2) We cannot necessarily unlink s given a
326 >     * predecessor node that is matched (including the case of being
327 >     * cancelled): the predecessor may already be unspliced, in which
328 >     * case some previous reachable node may still point to s.
329 >     * (For further explanation see Herlihy & Shavit "The Art of
330 >     * Multiprocessor Programming" chapter 9).  Although, in both
331 >     * cases, we can rule out the need for further action if either s
332 >     * or its predecessor are (or can be made to be) at, or fall off
333 >     * from, the head of list.
334 >     *
335 >     * Without taking these into account, it would be possible for an
336 >     * unbounded number of supposedly removed nodes to remain
337 >     * reachable.  Situations leading to such buildup are uncommon but
338 >     * can occur in practice; for example when a series of short timed
339 >     * calls to poll repeatedly time out but never otherwise fall off
340 >     * the list because of an untimed call to take at the front of the
341 >     * queue.
342 >     *
343 >     * When these cases arise, rather than always retraversing the
344 >     * entire list to find an actual predecessor to unlink (which
345 >     * won't help for case (1) anyway), we record a conservative
346 >     * estimate of possible unsplice failures (in "sweepVotes").  We
347 >     * trigger a full sweep when the estimate exceeds a threshold
348 >     * indicating the maximum number of estimated removal failures to
349 >     * tolerate before sweeping through, unlinking cancelled nodes
350 >     * that were not unlinked upon initial removal. We perform sweeps
351 >     * by the thread hitting threshold (rather than background threads
352 >     * or by spreading work to other threads) because in the main
353 >     * contexts in which removal occurs, the caller is already
354 >     * timed-out, cancelled, or performing a potentially O(n)
355 >     * operation (i.e., remove(x)), none of which are time-critical
356 >     * enough to warrant the overhead that alternatives would impose
357 >     * on other threads.
358 >     *
359 >     * Because the sweepVotes estimate is conservative, and because
360 >     * nodes become unlinked "naturally" as they fall off the head of
361 >     * the queue, and because we allow votes to accumulate even while
362 >     * sweeps are in progress, there are typically significantly fewer
363 >     * such nodes than estimated.  Choice of a threshold value
364 >     * balances the likelihood of wasted effort and contention, versus
365 >     * providing a worst-case bound on retention of interior nodes in
366 >     * quiescent queues. The value defined below was chosen
367 >     * empirically to balance these under various timeout scenarios.
368 >     *
369 >     * Note that we cannot self-link unlinked interior nodes during
370 >     * sweeps. However, the associated garbage chains terminate when
371 >     * some successor ultimately falls off the head of the list and is
372 >     * self-linked.
373       */
374  
375      /** True if on multiprocessor */
# Line 306 | Line 377 | public class LinkedTransferQueue<E> exte
377          Runtime.getRuntime().availableProcessors() > 1;
378  
379      /**
380 <     * The number of times to spin (with on average one randomly
381 <     * interspersed call to Thread.yield) on multiprocessor before
382 <     * blocking when a node is apparently the first waiter in the
383 <     * queue.  See above for explanation. Must be a power of two. The
384 <     * value is empirically derived -- it works pretty well across a
385 <     * variety of processors, numbers of CPUs, and OSes.
380 >     * The number of times to spin (with randomly interspersed calls
381 >     * to Thread.yield) on multiprocessor before blocking when a node
382 >     * is apparently the first waiter in the queue.  See above for
383 >     * explanation. Must be a power of two. The value is empirically
384 >     * derived -- it works pretty well across a variety of processors,
385 >     * numbers of CPUs, and OSes.
386       */
387      private static final int FRONT_SPINS   = 1 << 7;
388  
389      /**
390       * The number of times to spin before blocking when a node is
391 <     * preceded by another node that is apparently spinning.
391 >     * preceded by another node that is apparently spinning.  Also
392 >     * serves as an increment to FRONT_SPINS on phase changes, and as
393 >     * base average frequency for yielding during spins. Must be a
394 >     * power of two.
395       */
396 <    private static final int CHAINED_SPINS = FRONT_SPINS >>> 2;
396 >    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
397 >
398 >    /**
399 >     * The maximum number of estimated removal failures (sweepVotes)
400 >     * to tolerate before sweeping through the queue unlinking
401 >     * cancelled nodes that were not unlinked upon initial
402 >     * removal. See above for explanation. The value must be at least
403 >     * two to avoid useless sweeps when removing trailing nodes.
404 >     */
405 >    static final int SWEEP_THRESHOLD = 32;
406  
407      /**
408       * Queue nodes. Uses Object, not E, for items to allow forgetting
409       * them after use.  Relies heavily on Unsafe mechanics to minimize
410 <     * unnecessary ordering constraints: Writes that intrinsically
411 <     * precede or follow CASes use simple relaxed forms.  Other
329 <     * cleanups use releasing/lazy writes.
410 >     * unnecessary ordering constraints: Writes that are intrinsically
411 >     * ordered wrt other accesses or CASes use simple relaxed forms.
412       */
413      static final class Node {
414          final boolean isData;   // false if this is a request node
# Line 340 | Line 422 | public class LinkedTransferQueue<E> exte
422          }
423  
424          final boolean casItem(Object cmp, Object val) {
425 +            assert cmp == null || cmp.getClass() != Node.class;
426              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
427          }
428  
# Line 361 | Line 444 | public class LinkedTransferQueue<E> exte
444          }
445  
446          /**
447 <         * Sets item to self (using a releasing/lazy write) and waiter
448 <         * to null, to avoid garbage retention after extracting or
449 <         * cancelling.
447 >         * Sets item to self and waiter to null, to avoid garbage
448 >         * retention after matching or cancelling. Uses relaxed writes
449 >         * bacause order is already constrained in the only calling
450 >         * contexts: item is forgotten only after volatile/atomic
451 >         * mechanics that extract items.  Similarly, clearing waiter
452 >         * follows either CAS or return from park (if ever parked;
453 >         * else we don't care).
454           */
455          final void forgetContents() {
456 <            UNSAFE.putOrderedObject(this, itemOffset, this);
457 <            UNSAFE.putOrderedObject(this, waiterOffset, null);
456 >            UNSAFE.putObject(this, itemOffset, this);
457 >            UNSAFE.putObject(this, waiterOffset, null);
458          }
459  
460          /**
# Line 376 | Line 463 | public class LinkedTransferQueue<E> exte
463           */
464          final boolean isMatched() {
465              Object x = item;
466 <            return x == this || (x != null) != isData;
466 >            return (x == this) || ((x == null) == isData);
467 >        }
468 >
469 >        /**
470 >         * Returns true if this is an unmatched request node.
471 >         */
472 >        final boolean isUnmatchedRequest() {
473 >            return !isData && item == null;
474          }
475  
476          /**
# Line 394 | Line 488 | public class LinkedTransferQueue<E> exte
488           * Tries to artificially match a data node -- used by remove.
489           */
490          final boolean tryMatchData() {
491 +            assert isData;
492              Object x = item;
493              if (x != null && x != this && casItem(x, null)) {
494                  LockSupport.unpark(waiter);
# Line 415 | Line 510 | public class LinkedTransferQueue<E> exte
510      }
511  
512      /** head of the queue; null until first enqueue */
513 <    private transient volatile Node head;
419 <
420 <    /** predecessor of dangling unspliceable node */
421 <    private transient volatile Node cleanMe; // decl here to reduce contention
513 >    transient volatile Node head;
514  
515      /** tail of the queue; null until first append */
516      private transient volatile Node tail;
517  
518 +    /** The number of apparent failures to unsplice removed nodes */
519 +    private transient volatile int sweepVotes;
520 +
521      // CAS methods for fields
522      private boolean casTail(Node cmp, Node val) {
523          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
# Line 432 | Line 527 | public class LinkedTransferQueue<E> exte
527          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
528      }
529  
530 <    private boolean casCleanMe(Node cmp, Node val) {
531 <        return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
530 >    private boolean casSweepVotes(int cmp, int val) {
531 >        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
532      }
533  
534      /*
535 <     * Possible values for "how" argument in xfer method. Beware that
441 <     * the order of assigned numerical values matters.
535 >     * Possible values for "how" argument in xfer method.
536       */
537 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
538 <    private static final int ASYNC   = 1; // for offer, put, add
539 <    private static final int SYNC    = 2; // for transfer, take
540 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
537 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
538 >    private static final int ASYNC = 1; // for offer, put, add
539 >    private static final int SYNC  = 2; // for transfer, take
540 >    private static final int TIMED = 3; // for timed poll, tryTransfer
541 >
542 >    @SuppressWarnings("unchecked")
543 >    static <E> E cast(Object item) {
544 >        assert item == null || item.getClass() != Node.class;
545 >        return (E) item;
546 >    }
547  
548      /**
549       * Implements all queuing methods. See above for explanation.
550       *
551       * @param e the item or null for take
552       * @param haveData true if this is a put, else a take
553 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
554 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
553 >     * @param how NOW, ASYNC, SYNC, or TIMED
554 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
555       * @return an item if matched, else e
556       * @throws NullPointerException if haveData mode but e is null
557       */
558 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
558 >    private E xfer(E e, boolean haveData, int how, long nanos) {
559          if (haveData && (e == null))
560              throw new NullPointerException();
561          Node s = null;                        // the node to append, if needed
# Line 469 | Line 569 | public class LinkedTransferQueue<E> exte
569                      if (isData == haveData)   // can't match
570                          break;
571                      if (p.casItem(item, e)) { // match
572 <                        Thread w = p.waiter;
573 <                        while (p != h) {      // update head
574 <                            Node n = p.next;  // by 2 unless singleton
475 <                            if (n != null)
476 <                                p = n;
477 <                            if (head == h && casHead(h, p)) {
572 >                        for (Node q = p; q != h;) {
573 >                            Node n = q.next;  // update by 2 unless singleton
574 >                            if (head == h && casHead(h, n == null? q : n)) {
575                                  h.forgetNext();
576                                  break;
577                              }                 // advance and retry
578                              if ((h = head)   == null ||
579 <                                (p = h.next) == null || !p.isMatched())
579 >                                (q = h.next) == null || !q.isMatched())
580                                  break;        // unless slack < 2
581                          }
582 <                        LockSupport.unpark(w);
583 <                        return item;
582 >                        LockSupport.unpark(p.waiter);
583 >                        return this.<E>cast(item);
584                      }
585                  }
586                  Node n = p.next;
587 <                p = p != n ? n : (h = head);  // Use head if p offlist
587 >                p = (p != n) ? n : (h = head); // Use head if p offlist
588              }
589  
590 <            if (how >= ASYNC) {               // No matches available
590 >            if (how != NOW) {                 // No matches available
591                  if (s == null)
592                      s = new Node(e, haveData);
593                  Node pred = tryAppend(s, haveData);
594                  if (pred == null)
595                      continue retry;           // lost race vs opposite mode
596 <                if (how >= SYNC)
597 <                    return awaitMatch(pred, s, e, how, nanos);
596 >                if (how != ASYNC)
597 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
598              }
599              return e; // not waiting
600          }
# Line 506 | Line 603 | public class LinkedTransferQueue<E> exte
603      /**
604       * Tries to append node s as tail.
605       *
509     * @param haveData true if appending in data mode
606       * @param s the node to append
607 +     * @param haveData true if appending in data mode
608       * @return null on failure due to losing race with append in
609       * different mode, else s's predecessor, or s itself if no
610       * predecessor
611       */
612      private Node tryAppend(Node s, boolean haveData) {
613 <        for (Node t = tail, p = t;;) { // move p to actual tail and append
613 >        for (Node t = tail, p = t;;) {        // move p to last node and append
614              Node n, u;                        // temps for reads of next & tail
615              if (p == null && (p = head) == null) {
616                  if (casHead(null, s))
# Line 521 | Line 618 | public class LinkedTransferQueue<E> exte
618              }
619              else if (p.cannotPrecede(haveData))
620                  return null;                  // lost race vs opposite mode
621 <            else if ((n = p.next) != null)    // Not tail; keep traversing
621 >            else if ((n = p.next) != null)    // not last; keep traversing
622                  p = p != t && t != (u = tail) ? (t = u) : // stale tail
623 <                    p != n ? n : null;        // restart if off list
623 >                    (p != n) ? n : null;      // restart if off list
624              else if (!p.casNext(null, s))
625                  p = p.next;                   // re-read on CAS failure
626              else {
627 <                if (p != t) {                 // Update if slack now >= 2
627 >                if (p != t) {                 // update if slack now >= 2
628                      while ((tail != t || !casTail(t, s)) &&
629                             (t = tail)   != null &&
630                             (s = t.next) != null && // advance and retry
# Line 541 | Line 638 | public class LinkedTransferQueue<E> exte
638      /**
639       * Spins/yields/blocks until node s is matched or caller gives up.
640       *
544     * @param pred the predecessor of s or s or null if none
641       * @param s the waiting node
642 +     * @param pred the predecessor of s, or s itself if it has no
643 +     * predecessor, or null if unknown (the null case does not occur
644 +     * in any current calls but may in possible future extensions)
645       * @param e the comparison value for checking match
646 <     * @param how either SYNC or TIMEOUT
647 <     * @param nanos timeout value
646 >     * @param timed if true, wait only until timeout elapses
647 >     * @param nanos timeout in nanosecs, used only if timed is true
648       * @return matched item, or e if unmatched on interrupt or timeout
649       */
650 <    private Object awaitMatch(Node pred, Node s, Object e,
651 <                              int how, long nanos) {
553 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
650 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
651 >        long lastTime = timed ? System.nanoTime() : 0L;
652          Thread w = Thread.currentThread();
653          int spins = -1; // initialized after first item and cancel checks
654          ThreadLocalRandom randomYields = null; // bound if needed
# Line 558 | Line 656 | public class LinkedTransferQueue<E> exte
656          for (;;) {
657              Object item = s.item;
658              if (item != e) {                  // matched
659 +                assert item != s;
660                  s.forgetContents();           // avoid garbage
661 <                return item;
661 >                return this.<E>cast(item);
662              }
663 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
664 <                     s.casItem(e, s)) {       // cancel
663 >            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
664 >                    s.casItem(e, s)) {        // cancel
665                  unsplice(pred, s);
666                  return e;
667              }
# Line 571 | Line 670 | public class LinkedTransferQueue<E> exte
670                  if ((spins = spinsFor(pred, s.isData)) > 0)
671                      randomYields = ThreadLocalRandom.current();
672              }
673 <            else if (spins > 0) {             // spin, occasionally yield
575 <                if (randomYields.nextInt(FRONT_SPINS) == 0)
576 <                    Thread.yield();
673 >            else if (spins > 0) {             // spin
674                  --spins;
675 +                if (randomYields.nextInt(CHAINED_SPINS) == 0)
676 +                    Thread.yield();           // occasionally yield
677              }
678              else if (s.waiter == null) {
679 <                shortenHeadPath();            // reduce slack before blocking
581 <                s.waiter = w;                 // request unpark
679 >                s.waiter = w;                 // request unpark then recheck
680              }
681 <            else if (how == TIMEOUT) {
681 >            else if (timed) {
682                  long now = System.nanoTime();
683                  if ((nanos -= now - lastTime) > 0)
684                      LockSupport.parkNanos(this, nanos);
# Line 588 | Line 686 | public class LinkedTransferQueue<E> exte
686              }
687              else {
688                  LockSupport.park(this);
591                spins = -1;                   // spin if front upon wakeup
689              }
690          }
691      }
# Line 599 | Line 696 | public class LinkedTransferQueue<E> exte
696       */
697      private static int spinsFor(Node pred, boolean haveData) {
698          if (MP && pred != null) {
699 <            boolean predData = pred.isData;
700 <            if (predData != haveData)         // front and phase change
701 <                return FRONT_SPINS + (FRONT_SPINS >>> 1);
605 <            if (predData != (pred.item != null)) // probably at front
699 >            if (pred.isData != haveData)      // phase change
700 >                return FRONT_SPINS + CHAINED_SPINS;
701 >            if (pred.isMatched())             // probably at front
702                  return FRONT_SPINS;
703              if (pred.waiter == null)          // pred apparently spinning
704                  return CHAINED_SPINS;
# Line 610 | Line 706 | public class LinkedTransferQueue<E> exte
706          return 0;
707      }
708  
709 +    /* -------------- Traversal methods -------------- */
710 +
711      /**
712 <     * Tries (once) to unsplice nodes between head and first unmatched
713 <     * or trailing node; failing on contention.
714 <     */
715 <    private void shortenHeadPath() {
716 <        Node h, hn, p, q;
717 <        if ((p = h = head) != null && h.isMatched() &&
718 <            (q = hn = h.next) != null) {
621 <            Node n;
622 <            while ((n = q.next) != q) {
623 <                if (n == null || !q.isMatched()) {
624 <                    if (hn != q && h.next == hn)
625 <                        h.casNext(hn, q);
626 <                    break;
627 <                }
628 <                p = q;
629 <                q = n;
630 <            }
631 <        }
712 >     * Returns the successor of p, or the head node if p.next has been
713 >     * linked to self, which will only be true if traversing with a
714 >     * stale pointer that is now off the list.
715 >     */
716 >    final Node succ(Node p) {
717 >        Node next = p.next;
718 >        return (p == next) ? head : next;
719      }
720  
634    /* -------------- Traversal methods -------------- */
635
721      /**
722       * Returns the first unmatched node of the given mode, or null if
723       * none.  Used by methods isEmpty, hasWaitingConsumer.
724       */
725 <    private Node firstOfMode(boolean data) {
726 <        for (Node p = head; p != null; ) {
725 >    private Node firstOfMode(boolean isData) {
726 >        for (Node p = head; p != null; p = succ(p)) {
727              if (!p.isMatched())
728 <                return p.isData == data? p : null;
644 <            Node n = p.next;
645 <            p = n != p ? n : head;
728 >                return (p.isData == isData) ? p : null;
729          }
730          return null;
731      }
732  
733      /**
734       * Returns the item in the first unmatched node with isData; or
735 <     * null if none. Used by peek.
735 >     * null if none.  Used by peek.
736       */
737 <    private Object firstDataItem() {
738 <        for (Node p = head; p != null; ) {
656 <            boolean isData = p.isData;
737 >    private E firstDataItem() {
738 >        for (Node p = head; p != null; p = succ(p)) {
739              Object item = p.item;
740 <            if (item != p && (item != null) == isData)
741 <                return isData ? item : null;
742 <            Node n = p.next;
743 <            p = n != p ? n : head;
740 >            if (p.isData) {
741 >                if (item != null && item != p)
742 >                    return this.<E>cast(item);
743 >            }
744 >            else if (item == null)
745 >                return null;
746          }
747          return null;
748      }
# Line 689 | Line 773 | public class LinkedTransferQueue<E> exte
773  
774      final class Itr implements Iterator<E> {
775          private Node nextNode;   // next node to return item for
776 <        private Object nextItem; // the corresponding item
776 >        private E nextItem;      // the corresponding item
777          private Node lastRet;    // last returned node, to support remove
778 +        private Node lastPred;   // predecessor to unlink lastRet
779  
780          /**
781           * Moves to next node after prev, or first node if prev null.
782           */
783          private void advance(Node prev) {
784 +            lastPred = lastRet;
785              lastRet = prev;
786 <            Node p;
787 <            if (prev == null || (p = prev.next) == prev)
702 <                p = head;
703 <            while (p != null) {
786 >            for (Node p = (prev == null) ? head : succ(prev);
787 >                 p != null; p = succ(p)) {
788                  Object item = p.item;
789                  if (p.isData) {
790                      if (item != null && item != p) {
791 <                        nextItem = item;
791 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
792                          nextNode = p;
793                          return;
794                      }
795                  }
796                  else if (item == null)
797                      break;
714                Node n = p.next;
715                p = n != p ? n : head;
798              }
799              nextNode = null;
800          }
# Line 728 | Line 810 | public class LinkedTransferQueue<E> exte
810          public final E next() {
811              Node p = nextNode;
812              if (p == null) throw new NoSuchElementException();
813 <            Object e = nextItem;
813 >            E e = nextItem;
814              advance(p);
815 <            return (E) e;
815 >            return e;
816          }
817  
818          public final void remove() {
819              Node p = lastRet;
820              if (p == null) throw new IllegalStateException();
821 <            lastRet = null;
822 <            findAndRemoveNode(p);
821 >            if (p.tryMatchData())
822 >                unsplice(lastPred, p);
823          }
824      }
825  
# Line 747 | Line 829 | public class LinkedTransferQueue<E> exte
829       * Unsplices (now or later) the given deleted/cancelled node with
830       * the given predecessor.
831       *
832 <     * @param pred predecessor of node to be unspliced
832 >     * @param pred a node that was at one time known to be the
833 >     * predecessor of s, or null or s itself if s is/was at head
834       * @param s the node to be unspliced
835       */
836 <    private void unsplice(Node pred, Node s) {
837 <        s.forgetContents(); // clear unneeded fields
836 >    final void unsplice(Node pred, Node s) {
837 >        s.forgetContents(); // forget unneeded fields
838          /*
839 <         * At any given time, exactly one node on list cannot be
840 <         * deleted -- the last inserted node. To accommodate this, if
841 <         * we cannot delete s, we save its predecessor as "cleanMe",
842 <         * processing the previously saved version first. Because only
843 <         * one node in the list can have a null next, at least one of
761 <         * node s or the node previously saved can always be
762 <         * processed, so this always terminates.
839 >         * See above for rationale. Briefly: if pred still points to
840 >         * s, try to unlink s.  If s cannot be unlinked, because it is
841 >         * trailing node or pred might be unlinked, and neither pred
842 >         * nor s are head or offlist, add to sweepVotes, and if enough
843 >         * votes have accumulated, sweep.
844           */
845 <        if (pred != null && pred != s) {
846 <            while (pred.next == s) {
847 <                Node oldpred = cleanMe == null? null : reclean();
848 <                Node n = s.next;
849 <                if (n != null) {
850 <                    if (n != s)
851 <                        pred.casNext(s, n);
852 <                    break;
845 >        if (pred != null && pred != s && pred.next == s) {
846 >            Node n = s.next;
847 >            if (n == null ||
848 >                (n != s && pred.casNext(s, n) && pred.isMatched())) {
849 >                for (;;) {               // check if at, or could be, head
850 >                    Node h = head;
851 >                    if (h == pred || h == s || h == null)
852 >                        return;          // at head or list empty
853 >                    if (!h.isMatched())
854 >                        break;
855 >                    Node hn = h.next;
856 >                    if (hn == null)
857 >                        return;          // now empty
858 >                    if (hn != h && casHead(h, hn))
859 >                        h.forgetNext();  // advance head
860 >                }
861 >                if (pred.next != pred && s.next != s) { // recheck if offlist
862 >                    for (;;) {           // sweep now if enough votes
863 >                        int v = sweepVotes;
864 >                        if (v < SWEEP_THRESHOLD) {
865 >                            if (casSweepVotes(v, v + 1))
866 >                                break;
867 >                        }
868 >                        else if (casSweepVotes(v, 0)) {
869 >                            sweep();
870 >                            break;
871 >                        }
872 >                    }
873                  }
773                if (oldpred == pred ||      // Already saved
774                    (oldpred == null && casCleanMe(null, pred)))
775                    break;                  // Postpone cleaning
874              }
875          }
876      }
877  
878      /**
879 <     * Tries to unsplice the deleted/cancelled node held in cleanMe
782 <     * that was previously uncleanable because it was at tail.
783 <     *
784 <     * @return current cleanMe node (or null)
879 >     * Unlinks matched nodes encountered in a traversal from head.
880       */
881 <    private Node reclean() {
882 <        /*
883 <         * cleanMe is, or at one time was, predecessor of a cancelled
884 <         * node s that was the tail so could not be unspliced.  If it
885 <         * is no longer the tail, try to unsplice if necessary and
886 <         * make cleanMe slot available.  This differs from similar
887 <         * code in unsplice() because we must check that pred still
793 <         * points to a matched node that can be unspliced -- if not,
794 <         * we can (must) clear cleanMe without unsplicing.  This can
795 <         * loop only due to contention.
796 <         */
797 <        Node pred;
798 <        while ((pred = cleanMe) != null) {
799 <            Node s = pred.next;
800 <            Node n;
801 <            if (s == null || s == pred || !s.isMatched())
802 <                casCleanMe(pred, null); // already gone
803 <            else if ((n = s.next) != null) {
804 <                if (n != s)
805 <                    pred.casNext(s, n);
806 <                casCleanMe(pred, null);
807 <            }
881 >    private void sweep() {
882 >        Node p = head, s, n;
883 >        while (p != null && (s = p.next) != null && (n = s.next) != null) {
884 >            if (p == s || s == n)
885 >                p = head; // stale
886 >            else if (s.isMatched())
887 >                p.casNext(s, n);
888              else
889 <                break;
810 <        }
811 <        return pred;
812 <    }
813 <
814 <    /**
815 <     * Main implementation of Iterator.remove(). Find
816 <     * and unsplice the given node.
817 <     */
818 <    final void findAndRemoveNode(Node s) {
819 <        if (s.tryMatchData()) {
820 <            Node pred = null;
821 <            Node p = head;
822 <            while (p != null) {
823 <                if (p == s) {
824 <                    unsplice(pred, p);
825 <                    break;
826 <                }
827 <                if (!p.isData && !p.isMatched())
828 <                    break;
829 <                pred = p;
830 <                if ((p = p.next) == pred) { // stale
831 <                    pred = null;
832 <                    p = head;
833 <                }
834 <            }
889 >                p = s;
890          }
891      }
892  
# Line 840 | Line 895 | public class LinkedTransferQueue<E> exte
895       */
896      private boolean findAndRemove(Object e) {
897          if (e != null) {
898 <            Node pred = null;
844 <            Node p = head;
845 <            while (p != null) {
898 >            for (Node pred = null, p = head; p != null; ) {
899                  Object item = p.item;
900                  if (p.isData) {
901                      if (item != null && item != p && e.equals(item) &&
# Line 854 | Line 907 | public class LinkedTransferQueue<E> exte
907                  else if (item == null)
908                      break;
909                  pred = p;
910 <                if ((p = p.next) == pred) {
910 >                if ((p = p.next) == pred) { // stale
911                      pred = null;
912                      p = head;
913                  }
# Line 982 | Line 1035 | public class LinkedTransferQueue<E> exte
1035       */
1036      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1037          throws InterruptedException {
1038 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1038 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1039              return true;
1040          if (!Thread.interrupted())
1041              return false;
# Line 990 | Line 1043 | public class LinkedTransferQueue<E> exte
1043      }
1044  
1045      public E take() throws InterruptedException {
1046 <        Object e = xfer(null, false, SYNC, 0);
1046 >        E e = xfer(null, false, SYNC, 0);
1047          if (e != null)
1048 <            return (E)e;
1048 >            return e;
1049          Thread.interrupted();
1050          throw new InterruptedException();
1051      }
1052  
1053      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1054 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1054 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1055          if (e != null || !Thread.interrupted())
1056 <            return (E)e;
1056 >            return e;
1057          throw new InterruptedException();
1058      }
1059  
1060      public E poll() {
1061 <        return (E)xfer(null, false, NOW, 0);
1061 >        return xfer(null, false, NOW, 0);
1062      }
1063  
1064      /**
# Line 1062 | Line 1115 | public class LinkedTransferQueue<E> exte
1115      }
1116  
1117      public E peek() {
1118 <        return (E) firstDataItem();
1118 >        return firstDataItem();
1119      }
1120  
1121      /**
# Line 1158 | Line 1211 | public class LinkedTransferQueue<E> exte
1211          }
1212      }
1213  
1161
1214      // Unsafe mechanics
1215  
1216      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
# Line 1166 | Line 1218 | public class LinkedTransferQueue<E> exte
1218          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1219      private static final long tailOffset =
1220          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1221 <    private static final long cleanMeOffset =
1222 <        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
1221 >    private static final long sweepVotesOffset =
1222 >        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1223  
1224      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1225                                    String field, Class<?> klazz) {
# Line 1181 | Line 1233 | public class LinkedTransferQueue<E> exte
1233          }
1234      }
1235  
1236 <    private static sun.misc.Unsafe getUnsafe() {
1236 >    /**
1237 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1238 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
1239 >     * into a jdk.
1240 >     *
1241 >     * @return a sun.misc.Unsafe
1242 >     */
1243 >    static sun.misc.Unsafe getUnsafe() {
1244          try {
1245              return sun.misc.Unsafe.getUnsafe();
1246          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines