ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.49 by jsr166, Thu Oct 22 15:58:44 2009 UTC vs.
Revision 1.74 by jsr166, Wed Sep 1 21:43:08 2010 UTC

# Line 15 | Line 15 | import java.util.Iterator;
15   import java.util.NoSuchElementException;
16   import java.util.Queue;
17   import java.util.concurrent.locks.LockSupport;
18 +
19   /**
20   * An unbounded {@link TransferQueue} based on linked nodes.
21   * This queue orders elements FIFO (first-in-first-out) with respect
# Line 161 | Line 162 | public class LinkedTransferQueue<E> exte
162       * targets.  Even when using very small slack values, this
163       * approach works well for dual queues because it allows all
164       * operations up to the point of matching or appending an item
165 <     * (hence potentially releasing another thread) to be read-only,
166 <     * thus not introducing any further contention. As described
167 <     * below, we implement this by performing slack maintenance
168 <     * retries only after these points.
165 >     * (hence potentially allowing progress by another thread) to be
166 >     * read-only, thus not introducing any further contention. As
167 >     * described below, we implement this by performing slack
168 >     * maintenance retries only after these points.
169       *
170       * As an accompaniment to such techniques, traversal overhead can
171       * be further reduced without increasing contention of head
172 <     * pointer updates.  During traversals, threads may sometimes
173 <     * shortcut the "next" link path from the current "head" node to
174 <     * be closer to the currently known first unmatched node. Again,
175 <     * this may be triggered with using thresholds or randomization.
172 >     * pointer updates: Threads may sometimes shortcut the "next" link
173 >     * path from the current "head" node to be closer to the currently
174 >     * known first unmatched node, and similarly for tail. Again, this
175 >     * may be triggered with using thresholds or randomization.
176       *
177       * These ideas must be further extended to avoid unbounded amounts
178       * of costly-to-reclaim garbage caused by the sequential "next"
# Line 199 | Line 200 | public class LinkedTransferQueue<E> exte
200       * mechanics because an update may leave head at a detached node.
201       * And while direct writes are possible for tail updates, they
202       * increase the risk of long retraversals, and hence long garbage
203 <     * chains which can be much more costly than is worthwhile
203 >     * chains, which can be much more costly than is worthwhile
204       * considering that the cost difference of performing a CAS vs
205       * write is smaller when they are not triggered on each operation
206       * (especially considering that writes and CASes equally require
207       * additional GC bookkeeping ("write barriers") that are sometimes
208       * more costly than the writes themselves because of contention).
209       *
209     * Removal of interior nodes (due to timed out or interrupted
210     * waits, or calls to remove or Iterator.remove) uses a scheme
211     * roughly similar to that in Scherer, Lea, and Scott's
212     * SynchronousQueue. Given a predecessor, we can unsplice any node
213     * except the (actual) tail of the queue. To avoid build-up of
214     * cancelled trailing nodes, upon a request to remove a trailing
215     * node, it is placed in field "cleanMe" to be unspliced upon the
216     * next call to unsplice any other node.  Situations needing such
217     * mechanics are not common but do occur in practice; for example
218     * when an unbounded series of short timed calls to poll
219     * repeatedly time out but never otherwise fall off the list
220     * because of an untimed call to take at the front of the
221     * queue. (Note that maintaining field cleanMe does not otherwise
222     * much impact garbage retention even if never cleared by some
223     * other call because the held node will eventually either
224     * directly or indirectly lead to a self-link once off the list.)
225     *
210       * *** Overview of implementation ***
211       *
212 <     * We use a threshold-based approach to updates, with a target
213 <     * slack of two.  The slack value is hard-wired: a path greater
212 >     * We use a threshold-based approach to updates, with a slack
213 >     * threshold of two -- that is, we update head/tail when the
214 >     * current pointer appears to be two or more steps away from the
215 >     * first/last node. The slack value is hard-wired: a path greater
216       * than one is naturally implemented by checking equality of
217       * traversal pointers except when the list has only one element,
218 <     * in which case we keep target slack at one. Avoiding tracking
218 >     * in which case we keep slack threshold at one. Avoiding tracking
219       * explicit counts across method calls slightly simplifies an
220       * already-messy implementation. Using randomization would
221       * probably work better if there were a low-quality dirt-cheap
222       * per-thread one available, but even ThreadLocalRandom is too
223       * heavy for these purposes.
224       *
225 <     * With such a small target slack value, it is rarely worthwhile
226 <     * to augment this with path short-circuiting; i.e., unsplicing
227 <     * nodes between head and the first unmatched node, or similarly
228 <     * for tail, rather than advancing head or tail proper. However,
243 <     * it is used (in awaitMatch) immediately before a waiting thread
244 <     * starts to block, as a final bit of helping at a point when
245 <     * contention with others is extremely unlikely (since if other
246 <     * threads that could release it are operating, then the current
247 <     * thread wouldn't be blocking).
225 >     * With such a small slack threshold value, it is not worthwhile
226 >     * to augment this with path short-circuiting (i.e., unsplicing
227 >     * interior nodes) except in the case of cancellation/removal (see
228 >     * below).
229       *
230       * We allow both the head and tail fields to be null before any
231       * nodes are enqueued; initializing upon first append.  This
# Line 260 | Line 241 | public class LinkedTransferQueue<E> exte
241       * of offer, put, poll, take, or transfer (each possibly with
242       * timeout). The relative complexity of using one monolithic
243       * method outweighs the code bulk and maintenance problems of
244 <     * using nine separate methods.
244 >     * using separate methods for each case.
245       *
246       * Operation consists of up to three phases. The first is
247       * implemented within method xfer, the second in tryAppend, and
# Line 285 | Line 266 | public class LinkedTransferQueue<E> exte
266       *
267       * 2. Try to append a new node (method tryAppend)
268       *
269 <     *    Starting at current tail pointer, try to append a new node
270 <     *    to the list (or if head was null, establish the first
271 <     *    node). Nodes can be appended only if their predecessors are
272 <     *    either already matched or are of the same mode. If we detect
273 <     *    otherwise, then a new node with opposite mode must have been
274 <     *    appended during traversal, so must restart at phase 1. The
275 <     *    traversal and update steps are otherwise similar to phase 1:
276 <     *    Retrying upon CAS misses and checking for staleness.  In
277 <     *    particular, if a self-link is encountered, then we can
278 <     *    safely jump to a node on the list by continuing the
279 <     *    traversal at current head.
269 >     *    Starting at current tail pointer, find the actual last node
270 >     *    and try to append a new node (or if head was null, establish
271 >     *    the first node). Nodes can be appended only if their
272 >     *    predecessors are either already matched or are of the same
273 >     *    mode. If we detect otherwise, then a new node with opposite
274 >     *    mode must have been appended during traversal, so we must
275 >     *    restart at phase 1. The traversal and update steps are
276 >     *    otherwise similar to phase 1: Retrying upon CAS misses and
277 >     *    checking for staleness.  In particular, if a self-link is
278 >     *    encountered, then we can safely jump to a node on the list
279 >     *    by continuing the traversal at current head.
280       *
281       *    On successful append, if the call was ASYNC, return.
282       *
283       * 3. Await match or cancellation (method awaitMatch)
284       *
285       *    Wait for another thread to match node; instead cancelling if
286 <     *    current thread was interrupted or the wait timed out. On
286 >     *    the current thread was interrupted or the wait timed out. On
287       *    multiprocessors, we use front-of-queue spinning: If a node
288       *    appears to be the first unmatched node in the queue, it
289       *    spins a bit before blocking. In either case, before blocking
# Line 317 | Line 298 | public class LinkedTransferQueue<E> exte
298       *    to decide to occasionally perform a Thread.yield. While
299       *    yield has underdefined specs, we assume that might it help,
300       *    and will not hurt in limiting impact of spinning on busy
301 <     *    systems.  We also use much smaller (1/4) spins for nodes
302 <     *    that are not known to be front but whose predecessors have
303 <     *    not blocked -- these "chained" spins avoid artifacts of
301 >     *    systems.  We also use smaller (1/2) spins for nodes that are
302 >     *    not known to be front but whose predecessors have not
303 >     *    blocked -- these "chained" spins avoid artifacts of
304       *    front-of-queue rules which otherwise lead to alternating
305       *    nodes spinning vs blocking. Further, front threads that
306       *    represent phase changes (from data to request node or vice
307       *    versa) compared to their predecessors receive additional
308 <     *    spins, reflecting the longer code path lengths necessary to
309 <     *    release them under contention.
308 >     *    chained spins, reflecting longer paths typically required to
309 >     *    unblock threads during phase changes.
310 >     *
311 >     *
312 >     * ** Unlinking removed interior nodes **
313 >     *
314 >     * In addition to minimizing garbage retention via self-linking
315 >     * described above, we also unlink removed interior nodes. These
316 >     * may arise due to timed out or interrupted waits, or calls to
317 >     * remove(x) or Iterator.remove.  Normally, given a node that was
318 >     * at one time known to be the predecessor of some node s that is
319 >     * to be removed, we can unsplice s by CASing the next field of
320 >     * its predecessor if it still points to s (otherwise s must
321 >     * already have been removed or is now offlist). But there are two
322 >     * situations in which we cannot guarantee to make node s
323 >     * unreachable in this way: (1) If s is the trailing node of list
324 >     * (i.e., with null next), then it is pinned as the target node
325 >     * for appends, so can only be removed later after other nodes are
326 >     * appended. (2) We cannot necessarily unlink s given a
327 >     * predecessor node that is matched (including the case of being
328 >     * cancelled): the predecessor may already be unspliced, in which
329 >     * case some previous reachable node may still point to s.
330 >     * (For further explanation see Herlihy & Shavit "The Art of
331 >     * Multiprocessor Programming" chapter 9).  Although, in both
332 >     * cases, we can rule out the need for further action if either s
333 >     * or its predecessor are (or can be made to be) at, or fall off
334 >     * from, the head of list.
335 >     *
336 >     * Without taking these into account, it would be possible for an
337 >     * unbounded number of supposedly removed nodes to remain
338 >     * reachable.  Situations leading to such buildup are uncommon but
339 >     * can occur in practice; for example when a series of short timed
340 >     * calls to poll repeatedly time out but never otherwise fall off
341 >     * the list because of an untimed call to take at the front of the
342 >     * queue.
343 >     *
344 >     * When these cases arise, rather than always retraversing the
345 >     * entire list to find an actual predecessor to unlink (which
346 >     * won't help for case (1) anyway), we record a conservative
347 >     * estimate of possible unsplice failures (in "sweepVotes").  We
348 >     * trigger a full sweep when the estimate exceeds a threshold
349 >     * indicating the maximum number of estimated removal failures to
350 >     * tolerate before sweeping through, unlinking cancelled nodes
351 >     * that were not unlinked upon initial removal. We perform sweeps
352 >     * by the thread hitting threshold (rather than background threads
353 >     * or by spreading work to other threads) because in the main
354 >     * contexts in which removal occurs, the caller is already
355 >     * timed-out, cancelled, or performing a potentially O(n)
356 >     * operation (i.e., remove(x)), none of which are time-critical
357 >     * enough to warrant the overhead that alternatives would impose
358 >     * on other threads.
359 >     *
360 >     * Because the sweepVotes estimate is conservative, and because
361 >     * nodes become unlinked "naturally" as they fall off the head of
362 >     * the queue, and because we allow votes to accumulate even while
363 >     * sweeps are in progress, there are typically significantly fewer
364 >     * such nodes than estimated.  Choice of a threshold value
365 >     * balances the likelihood of wasted effort and contention, versus
366 >     * providing a worst-case bound on retention of interior nodes in
367 >     * quiescent queues. The value defined below was chosen
368 >     * empirically to balance these under various timeout scenarios.
369 >     *
370 >     * Note that we cannot self-link unlinked interior nodes during
371 >     * sweeps. However, the associated garbage chains terminate when
372 >     * some successor ultimately falls off the head of the list and is
373 >     * self-linked.
374       */
375  
376      /** True if on multiprocessor */
# Line 333 | Line 378 | public class LinkedTransferQueue<E> exte
378          Runtime.getRuntime().availableProcessors() > 1;
379  
380      /**
381 <     * The number of times to spin (with on average one randomly
382 <     * interspersed call to Thread.yield) on multiprocessor before
383 <     * blocking when a node is apparently the first waiter in the
384 <     * queue.  See above for explanation. Must be a power of two. The
385 <     * value is empirically derived -- it works pretty well across a
386 <     * variety of processors, numbers of CPUs, and OSes.
381 >     * The number of times to spin (with randomly interspersed calls
382 >     * to Thread.yield) on multiprocessor before blocking when a node
383 >     * is apparently the first waiter in the queue.  See above for
384 >     * explanation. Must be a power of two. The value is empirically
385 >     * derived -- it works pretty well across a variety of processors,
386 >     * numbers of CPUs, and OSes.
387       */
388      private static final int FRONT_SPINS   = 1 << 7;
389  
390      /**
391       * The number of times to spin before blocking when a node is
392 <     * preceded by another node that is apparently spinning.
392 >     * preceded by another node that is apparently spinning.  Also
393 >     * serves as an increment to FRONT_SPINS on phase changes, and as
394 >     * base average frequency for yielding during spins. Must be a
395 >     * power of two.
396       */
397 <    private static final int CHAINED_SPINS = FRONT_SPINS >>> 2;
397 >    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
398 >
399 >    /**
400 >     * The maximum number of estimated removal failures (sweepVotes)
401 >     * to tolerate before sweeping through the queue unlinking
402 >     * cancelled nodes that were not unlinked upon initial
403 >     * removal. See above for explanation. The value must be at least
404 >     * two to avoid useless sweeps when removing trailing nodes.
405 >     */
406 >    static final int SWEEP_THRESHOLD = 32;
407  
408      /**
409       * Queue nodes. Uses Object, not E, for items to allow forgetting
410       * them after use.  Relies heavily on Unsafe mechanics to minimize
411 <     * unnecessary ordering constraints: Writes that intrinsically
412 <     * precede or follow CASes use simple relaxed forms.  Other
356 <     * cleanups use releasing/lazy writes.
411 >     * unnecessary ordering constraints: Writes that are intrinsically
412 >     * ordered wrt other accesses or CASes use simple relaxed forms.
413       */
414      static final class Node {
415          final boolean isData;   // false if this is a request node
# Line 367 | Line 423 | public class LinkedTransferQueue<E> exte
423          }
424  
425          final boolean casItem(Object cmp, Object val) {
426 +            //            assert cmp == null || cmp.getClass() != Node.class;
427              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
428          }
429  
# Line 388 | Line 445 | public class LinkedTransferQueue<E> exte
445          }
446  
447          /**
448 <         * Sets item to self (using a releasing/lazy write) and waiter
449 <         * to null, to avoid garbage retention after extracting or
450 <         * cancelling.
448 >         * Sets item to self and waiter to null, to avoid garbage
449 >         * retention after matching or cancelling. Uses relaxed writes
450 >         * because order is already constrained in the only calling
451 >         * contexts: item is forgotten only after volatile/atomic
452 >         * mechanics that extract items.  Similarly, clearing waiter
453 >         * follows either CAS or return from park (if ever parked;
454 >         * else we don't care).
455           */
456          final void forgetContents() {
457 <            UNSAFE.putOrderedObject(this, itemOffset, this);
458 <            UNSAFE.putOrderedObject(this, waiterOffset, null);
457 >            UNSAFE.putObject(this, itemOffset, this);
458 >            UNSAFE.putObject(this, waiterOffset, null);
459          }
460  
461          /**
# Line 403 | Line 464 | public class LinkedTransferQueue<E> exte
464           */
465          final boolean isMatched() {
466              Object x = item;
467 <            return x == this || (x != null) != isData;
467 >            return (x == this) || ((x == null) == isData);
468 >        }
469 >
470 >        /**
471 >         * Returns true if this is an unmatched request node.
472 >         */
473 >        final boolean isUnmatchedRequest() {
474 >            return !isData && item == null;
475          }
476  
477          /**
# Line 421 | Line 489 | public class LinkedTransferQueue<E> exte
489           * Tries to artificially match a data node -- used by remove.
490           */
491          final boolean tryMatchData() {
492 +            //            assert isData;
493              Object x = item;
494              if (x != null && x != this && casItem(x, null)) {
495                  LockSupport.unpark(waiter);
# Line 442 | Line 511 | public class LinkedTransferQueue<E> exte
511      }
512  
513      /** head of the queue; null until first enqueue */
514 <    private transient volatile Node head;
446 <
447 <    /** predecessor of dangling unspliceable node */
448 <    private transient volatile Node cleanMe; // decl here to reduce contention
514 >    transient volatile Node head;
515  
516      /** tail of the queue; null until first append */
517      private transient volatile Node tail;
518  
519 +    /** The number of apparent failures to unsplice removed nodes */
520 +    private transient volatile int sweepVotes;
521 +
522      // CAS methods for fields
523      private boolean casTail(Node cmp, Node val) {
524          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
# Line 459 | Line 528 | public class LinkedTransferQueue<E> exte
528          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
529      }
530  
531 <    private boolean casCleanMe(Node cmp, Node val) {
532 <        return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
531 >    private boolean casSweepVotes(int cmp, int val) {
532 >        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
533      }
534  
535      /*
536 <     * Possible values for "how" argument in xfer method. Beware that
468 <     * the order of assigned numerical values matters.
536 >     * Possible values for "how" argument in xfer method.
537       */
538 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
539 <    private static final int ASYNC   = 1; // for offer, put, add
540 <    private static final int SYNC    = 2; // for transfer, take
541 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
538 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
539 >    private static final int ASYNC = 1; // for offer, put, add
540 >    private static final int SYNC  = 2; // for transfer, take
541 >    private static final int TIMED = 3; // for timed poll, tryTransfer
542 >
543 >    @SuppressWarnings("unchecked")
544 >    static <E> E cast(Object item) {
545 >        //        assert item == null || item.getClass() != Node.class;
546 >        return (E) item;
547 >    }
548  
549      /**
550       * Implements all queuing methods. See above for explanation.
551       *
552       * @param e the item or null for take
553       * @param haveData true if this is a put, else a take
554 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
555 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
554 >     * @param how NOW, ASYNC, SYNC, or TIMED
555 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
556       * @return an item if matched, else e
557       * @throws NullPointerException if haveData mode but e is null
558       */
559 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
559 >    private E xfer(E e, boolean haveData, int how, long nanos) {
560          if (haveData && (e == null))
561              throw new NullPointerException();
562          Node s = null;                        // the node to append, if needed
# Line 496 | Line 570 | public class LinkedTransferQueue<E> exte
570                      if (isData == haveData)   // can't match
571                          break;
572                      if (p.casItem(item, e)) { // match
573 <                        Thread w = p.waiter;
574 <                        while (p != h) {      // update head
575 <                            Node n = p.next;  // by 2 unless singleton
502 <                            if (n != null)
503 <                                p = n;
504 <                            if (head == h && casHead(h, p)) {
573 >                        for (Node q = p; q != h;) {
574 >                            Node n = q.next;  // update by 2 unless singleton
575 >                            if (head == h && casHead(h, n == null? q : n)) {
576                                  h.forgetNext();
577                                  break;
578                              }                 // advance and retry
579                              if ((h = head)   == null ||
580 <                                (p = h.next) == null || !p.isMatched())
580 >                                (q = h.next) == null || !q.isMatched())
581                                  break;        // unless slack < 2
582                          }
583 <                        LockSupport.unpark(w);
584 <                        return item;
583 >                        LockSupport.unpark(p.waiter);
584 >                        return this.<E>cast(item);
585                      }
586                  }
587                  Node n = p.next;
588                  p = (p != n) ? n : (h = head); // Use head if p offlist
589              }
590  
591 <            if (how >= ASYNC) {               // No matches available
591 >            if (how != NOW) {                 // No matches available
592                  if (s == null)
593                      s = new Node(e, haveData);
594                  Node pred = tryAppend(s, haveData);
595                  if (pred == null)
596                      continue retry;           // lost race vs opposite mode
597 <                if (how >= SYNC)
598 <                    return awaitMatch(pred, s, e, how, nanos);
597 >                if (how != ASYNC)
598 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
599              }
600              return e; // not waiting
601          }
# Line 540 | Line 611 | public class LinkedTransferQueue<E> exte
611       * predecessor
612       */
613      private Node tryAppend(Node s, boolean haveData) {
614 <        for (Node t = tail, p = t;;) { // move p to last node and append
614 >        for (Node t = tail, p = t;;) {        // move p to last node and append
615              Node n, u;                        // temps for reads of next & tail
616              if (p == null && (p = head) == null) {
617                  if (casHead(null, s))
# Line 568 | Line 639 | public class LinkedTransferQueue<E> exte
639      /**
640       * Spins/yields/blocks until node s is matched or caller gives up.
641       *
571     * @param pred the predecessor of s, or s or null if none
642       * @param s the waiting node
643 +     * @param pred the predecessor of s, or s itself if it has no
644 +     * predecessor, or null if unknown (the null case does not occur
645 +     * in any current calls but may in possible future extensions)
646       * @param e the comparison value for checking match
647 <     * @param how either SYNC or TIMEOUT
648 <     * @param nanos timeout value
647 >     * @param timed if true, wait only until timeout elapses
648 >     * @param nanos timeout in nanosecs, used only if timed is true
649       * @return matched item, or e if unmatched on interrupt or timeout
650       */
651 <    private Object awaitMatch(Node pred, Node s, Object e,
652 <                              int how, long nanos) {
580 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
651 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
652 >        long lastTime = timed ? System.nanoTime() : 0L;
653          Thread w = Thread.currentThread();
654          int spins = -1; // initialized after first item and cancel checks
655          ThreadLocalRandom randomYields = null; // bound if needed
# Line 585 | Line 657 | public class LinkedTransferQueue<E> exte
657          for (;;) {
658              Object item = s.item;
659              if (item != e) {                  // matched
660 +                //                assert item != s;
661                  s.forgetContents();           // avoid garbage
662 <                return item;
662 >                return this.<E>cast(item);
663              }
664 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
665 <                     s.casItem(e, s)) {       // cancel
664 >            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
665 >                    s.casItem(e, s)) {        // cancel
666                  unsplice(pred, s);
667                  return e;
668              }
# Line 598 | Line 671 | public class LinkedTransferQueue<E> exte
671                  if ((spins = spinsFor(pred, s.isData)) > 0)
672                      randomYields = ThreadLocalRandom.current();
673              }
674 <            else if (spins > 0) {             // spin, occasionally yield
602 <                if (randomYields.nextInt(FRONT_SPINS) == 0)
603 <                    Thread.yield();
674 >            else if (spins > 0) {             // spin
675                  --spins;
676 +                if (randomYields.nextInt(CHAINED_SPINS) == 0)
677 +                    Thread.yield();           // occasionally yield
678              }
679              else if (s.waiter == null) {
680 <                shortenHeadPath();            // reduce slack before blocking
608 <                s.waiter = w;                 // request unpark
680 >                s.waiter = w;                 // request unpark then recheck
681              }
682 <            else if (how == TIMEOUT) {
682 >            else if (timed) {
683                  long now = System.nanoTime();
684                  if ((nanos -= now - lastTime) > 0)
685                      LockSupport.parkNanos(this, nanos);
# Line 615 | Line 687 | public class LinkedTransferQueue<E> exte
687              }
688              else {
689                  LockSupport.park(this);
618                spins = -1;                   // spin if front upon wakeup
690              }
691          }
692      }
# Line 626 | Line 697 | public class LinkedTransferQueue<E> exte
697       */
698      private static int spinsFor(Node pred, boolean haveData) {
699          if (MP && pred != null) {
700 <            boolean predData = pred.isData;
701 <            if (predData != haveData)         // front and phase change
702 <                return FRONT_SPINS + (FRONT_SPINS >>> 1);
632 <            if (predData != (pred.item != null)) // probably at front
700 >            if (pred.isData != haveData)      // phase change
701 >                return FRONT_SPINS + CHAINED_SPINS;
702 >            if (pred.isMatched())             // probably at front
703                  return FRONT_SPINS;
704              if (pred.waiter == null)          // pred apparently spinning
705                  return CHAINED_SPINS;
# Line 637 | Line 707 | public class LinkedTransferQueue<E> exte
707          return 0;
708      }
709  
710 +    /* -------------- Traversal methods -------------- */
711 +
712      /**
713 <     * Tries (once) to unsplice nodes between head and first unmatched
714 <     * or trailing node; failing on contention.
715 <     */
716 <    private void shortenHeadPath() {
717 <        Node h, hn, p, q;
718 <        if ((p = h = head) != null && h.isMatched() &&
719 <            (q = hn = h.next) != null) {
648 <            Node n;
649 <            while ((n = q.next) != q) {
650 <                if (n == null || !q.isMatched()) {
651 <                    if (hn != q && h.next == hn)
652 <                        h.casNext(hn, q);
653 <                    break;
654 <                }
655 <                p = q;
656 <                q = n;
657 <            }
658 <        }
713 >     * Returns the successor of p, or the head node if p.next has been
714 >     * linked to self, which will only be true if traversing with a
715 >     * stale pointer that is now off the list.
716 >     */
717 >    final Node succ(Node p) {
718 >        Node next = p.next;
719 >        return (p == next) ? head : next;
720      }
721  
661    /* -------------- Traversal methods -------------- */
662
722      /**
723       * Returns the first unmatched node of the given mode, or null if
724       * none.  Used by methods isEmpty, hasWaitingConsumer.
725       */
726 <    private Node firstOfMode(boolean data) {
727 <        for (Node p = head; p != null; ) {
726 >    private Node firstOfMode(boolean isData) {
727 >        for (Node p = head; p != null; p = succ(p)) {
728              if (!p.isMatched())
729 <                return (p.isData == data) ? p : null;
671 <            Node n = p.next;
672 <            p = (n != p) ? n : head;
729 >                return (p.isData == isData) ? p : null;
730          }
731          return null;
732      }
733  
734      /**
735       * Returns the item in the first unmatched node with isData; or
736 <     * null if none. Used by peek.
736 >     * null if none.  Used by peek.
737       */
738 <    private Object firstDataItem() {
739 <        for (Node p = head; p != null; ) {
683 <            boolean isData = p.isData;
738 >    private E firstDataItem() {
739 >        for (Node p = head; p != null; p = succ(p)) {
740              Object item = p.item;
741 <            if (item != p && (item != null) == isData)
742 <                return isData ? item : null;
743 <            Node n = p.next;
744 <            p = (n != p) ? n : head;
741 >            if (p.isData) {
742 >                if (item != null && item != p)
743 >                    return this.<E>cast(item);
744 >            }
745 >            else if (item == null)
746 >                return null;
747          }
748          return null;
749      }
# Line 716 | Line 774 | public class LinkedTransferQueue<E> exte
774  
775      final class Itr implements Iterator<E> {
776          private Node nextNode;   // next node to return item for
777 <        private Object nextItem; // the corresponding item
777 >        private E nextItem;      // the corresponding item
778          private Node lastRet;    // last returned node, to support remove
779 +        private Node lastPred;   // predecessor to unlink lastRet
780  
781          /**
782           * Moves to next node after prev, or first node if prev null.
783           */
784          private void advance(Node prev) {
785 +            lastPred = lastRet;
786              lastRet = prev;
787 <            Node p;
788 <            if (prev == null || (p = prev.next) == prev)
729 <                p = head;
730 <            while (p != null) {
787 >            for (Node p = (prev == null) ? head : succ(prev);
788 >                 p != null; p = succ(p)) {
789                  Object item = p.item;
790                  if (p.isData) {
791                      if (item != null && item != p) {
792 <                        nextItem = item;
792 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
793                          nextNode = p;
794                          return;
795                      }
796                  }
797                  else if (item == null)
798                      break;
741                Node n = p.next;
742                p = (n != p) ? n : head;
799              }
800              nextNode = null;
801          }
# Line 755 | Line 811 | public class LinkedTransferQueue<E> exte
811          public final E next() {
812              Node p = nextNode;
813              if (p == null) throw new NoSuchElementException();
814 <            Object e = nextItem;
814 >            E e = nextItem;
815              advance(p);
816 <            return (E) e;
816 >            return e;
817          }
818  
819          public final void remove() {
820              Node p = lastRet;
821              if (p == null) throw new IllegalStateException();
822 <            lastRet = null;
823 <            findAndRemoveNode(p);
822 >            if (p.tryMatchData())
823 >                unsplice(lastPred, p);
824          }
825      }
826  
# Line 774 | Line 830 | public class LinkedTransferQueue<E> exte
830       * Unsplices (now or later) the given deleted/cancelled node with
831       * the given predecessor.
832       *
833 <     * @param pred predecessor of node to be unspliced
833 >     * @param pred a node that was at one time known to be the
834 >     * predecessor of s, or null or s itself if s is/was at head
835       * @param s the node to be unspliced
836       */
837 <    private void unsplice(Node pred, Node s) {
838 <        s.forgetContents(); // clear unneeded fields
837 >    final void unsplice(Node pred, Node s) {
838 >        s.forgetContents(); // forget unneeded fields
839          /*
840 <         * At any given time, exactly one node on list cannot be
841 <         * unlinked -- the last inserted node. To accommodate this, if
842 <         * we cannot unlink s, we save its predecessor as "cleanMe",
843 <         * processing the previously saved version first. Because only
844 <         * one node in the list can have a null next, at least one of
788 <         * node s or the node previously saved can always be
789 <         * processed, so this always terminates.
840 >         * See above for rationale. Briefly: if pred still points to
841 >         * s, try to unlink s.  If s cannot be unlinked, because it is
842 >         * trailing node or pred might be unlinked, and neither pred
843 >         * nor s are head or offlist, add to sweepVotes, and if enough
844 >         * votes have accumulated, sweep.
845           */
846 <        if (pred != null && pred != s) {
847 <            while (pred.next == s) {
848 <                Node oldpred = (cleanMe == null) ? null : reclean();
849 <                Node n = s.next;
850 <                if (n != null) {
851 <                    if (n != s)
852 <                        pred.casNext(s, n);
853 <                    break;
846 >        if (pred != null && pred != s && pred.next == s) {
847 >            Node n = s.next;
848 >            if (n == null ||
849 >                (n != s && pred.casNext(s, n) && pred.isMatched())) {
850 >                for (;;) {               // check if at, or could be, head
851 >                    Node h = head;
852 >                    if (h == pred || h == s || h == null)
853 >                        return;          // at head or list empty
854 >                    if (!h.isMatched())
855 >                        break;
856 >                    Node hn = h.next;
857 >                    if (hn == null)
858 >                        return;          // now empty
859 >                    if (hn != h && casHead(h, hn))
860 >                        h.forgetNext();  // advance head
861 >                }
862 >                if (pred.next != pred && s.next != s) { // recheck if offlist
863 >                    for (;;) {           // sweep now if enough votes
864 >                        int v = sweepVotes;
865 >                        if (v < SWEEP_THRESHOLD) {
866 >                            if (casSweepVotes(v, v + 1))
867 >                                break;
868 >                        }
869 >                        else if (casSweepVotes(v, 0)) {
870 >                            sweep();
871 >                            break;
872 >                        }
873 >                    }
874                  }
800                if (oldpred == pred ||      // Already saved
801                    (oldpred == null && casCleanMe(null, pred)))
802                    break;                  // Postpone cleaning
875              }
876          }
877      }
878  
879      /**
880 <     * Tries to unsplice the deleted/cancelled node held in cleanMe
809 <     * that was previously uncleanable because it was at tail.
810 <     *
811 <     * @return current cleanMe node (or null)
880 >     * Unlinks matched nodes encountered in a traversal from head.
881       */
882 <    private Node reclean() {
883 <        /*
884 <         * cleanMe is, or at one time was, predecessor of a cancelled
885 <         * node s that was the tail so could not be unspliced.  If it
886 <         * is no longer the tail, try to unsplice if necessary and
887 <         * make cleanMe slot available.  This differs from similar
888 <         * code in unsplice() because we must check that pred still
820 <         * points to a matched node that can be unspliced -- if not,
821 <         * we can (must) clear cleanMe without unsplicing.  This can
822 <         * loop only due to contention.
823 <         */
824 <        Node pred;
825 <        while ((pred = cleanMe) != null) {
826 <            Node s = pred.next;
827 <            Node n;
828 <            if (s == null || s == pred || !s.isMatched())
829 <                casCleanMe(pred, null); // already gone
830 <            else if ((n = s.next) != null) {
831 <                if (n != s)
832 <                    pred.casNext(s, n);
833 <                casCleanMe(pred, null);
834 <            }
835 <            else
882 >    private void sweep() {
883 >        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
884 >            if (p == s)                    // stale
885 >                p = head;
886 >            else if (!s.isMatched())
887 >                p = s;
888 >            else if ((n = s.next) == null) // trailing node is pinned
889                  break;
890 <        }
891 <        return pred;
839 <    }
840 <
841 <    /**
842 <     * Main implementation of Iterator.remove(). Find
843 <     * and unsplice the given node.
844 <     */
845 <    final void findAndRemoveNode(Node s) {
846 <        if (s.tryMatchData()) {
847 <            Node pred = null;
848 <            Node p = head;
849 <            while (p != null) {
850 <                if (p == s) {
851 <                    unsplice(pred, p);
852 <                    break;
853 <                }
854 <                if (!p.isData && !p.isMatched())
855 <                    break;
856 <                pred = p;
857 <                if ((p = p.next) == pred) { // stale
858 <                    pred = null;
859 <                    p = head;
860 <                }
861 <            }
890 >            else
891 >                p.casNext(s, n);
892          }
893      }
894  
# Line 867 | Line 897 | public class LinkedTransferQueue<E> exte
897       */
898      private boolean findAndRemove(Object e) {
899          if (e != null) {
900 <            Node pred = null;
871 <            Node p = head;
872 <            while (p != null) {
900 >            for (Node pred = null, p = head; p != null; ) {
901                  Object item = p.item;
902                  if (p.isData) {
903                      if (item != null && item != p && e.equals(item) &&
# Line 881 | Line 909 | public class LinkedTransferQueue<E> exte
909                  else if (item == null)
910                      break;
911                  pred = p;
912 <                if ((p = p.next) == pred) {
912 >                if ((p = p.next) == pred) { // stale
913                      pred = null;
914                      p = head;
915                  }
# Line 1009 | Line 1037 | public class LinkedTransferQueue<E> exte
1037       */
1038      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1039          throws InterruptedException {
1040 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1040 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1041              return true;
1042          if (!Thread.interrupted())
1043              return false;
# Line 1017 | Line 1045 | public class LinkedTransferQueue<E> exte
1045      }
1046  
1047      public E take() throws InterruptedException {
1048 <        Object e = xfer(null, false, SYNC, 0);
1048 >        E e = xfer(null, false, SYNC, 0);
1049          if (e != null)
1050 <            return (E)e;
1050 >            return e;
1051          Thread.interrupted();
1052          throw new InterruptedException();
1053      }
1054  
1055      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1056 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1056 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1057          if (e != null || !Thread.interrupted())
1058 <            return (E)e;
1058 >            return e;
1059          throw new InterruptedException();
1060      }
1061  
1062      public E poll() {
1063 <        return (E)xfer(null, false, NOW, 0);
1063 >        return xfer(null, false, NOW, 0);
1064      }
1065  
1066      /**
# Line 1089 | Line 1117 | public class LinkedTransferQueue<E> exte
1117      }
1118  
1119      public E peek() {
1120 <        return (E) firstDataItem();
1120 >        return firstDataItem();
1121      }
1122  
1123      /**
# Line 1098 | Line 1126 | public class LinkedTransferQueue<E> exte
1126       * @return {@code true} if this queue contains no elements
1127       */
1128      public boolean isEmpty() {
1129 <        return firstOfMode(true) == null;
1129 >        for (Node p = head; p != null; p = succ(p)) {
1130 >            if (!p.isMatched())
1131 >                return !p.isData;
1132 >        }
1133 >        return true;
1134      }
1135  
1136      public boolean hasWaitingConsumer() {
# Line 1185 | Line 1217 | public class LinkedTransferQueue<E> exte
1217          }
1218      }
1219  
1188
1220      // Unsafe mechanics
1221  
1222      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
# Line 1193 | Line 1224 | public class LinkedTransferQueue<E> exte
1224          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1225      private static final long tailOffset =
1226          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1227 <    private static final long cleanMeOffset =
1228 <        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
1227 >    private static final long sweepVotesOffset =
1228 >        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1229  
1230      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1231                                    String field, Class<?> klazz) {
# Line 1208 | Line 1239 | public class LinkedTransferQueue<E> exte
1239          }
1240      }
1241  
1242 <    private static sun.misc.Unsafe getUnsafe() {
1242 >    /**
1243 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1244 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
1245 >     * into a jdk.
1246 >     *
1247 >     * @return a sun.misc.Unsafe
1248 >     */
1249 >    static sun.misc.Unsafe getUnsafe() {
1250          try {
1251              return sun.misc.Unsafe.getUnsafe();
1252          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines