ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.56 by jsr166, Wed Oct 28 00:14:03 2009 UTC vs.
Revision 1.71 by jsr166, Mon Nov 16 01:02:49 2009 UTC

# Line 206 | Line 206 | public class LinkedTransferQueue<E> exte
206       * additional GC bookkeeping ("write barriers") that are sometimes
207       * more costly than the writes themselves because of contention).
208       *
209     * Removal of interior nodes (due to timed out or interrupted
210     * waits, or calls to remove(x) or Iterator.remove) can use a
211     * scheme roughly similar to that described in Scherer, Lea, and
212     * Scott's SynchronousQueue. Given a predecessor, we can unsplice
213     * any node except the (actual) tail of the queue. To avoid
214     * build-up of cancelled trailing nodes, upon a request to remove
215     * a trailing node, it is placed in field "cleanMe" to be
216     * unspliced upon the next call to unsplice any other node.
217     * Situations needing such mechanics are not common but do occur
218     * in practice; for example when an unbounded series of short
219     * timed calls to poll repeatedly time out but never otherwise
220     * fall off the list because of an untimed call to take at the
221     * front of the queue. Note that maintaining field cleanMe does
222     * not otherwise much impact garbage retention even if never
223     * cleared by some other call because the held node will
224     * eventually either directly or indirectly lead to a self-link
225     * once off the list.
226     *
209       * *** Overview of implementation ***
210       *
211       * We use a threshold-based approach to updates, with a slack
# Line 239 | Line 221 | public class LinkedTransferQueue<E> exte
221       * per-thread one available, but even ThreadLocalRandom is too
222       * heavy for these purposes.
223       *
224 <     * With such a small slack threshold value, it is rarely
225 <     * worthwhile to augment this with path short-circuiting; i.e.,
226 <     * unsplicing nodes between head and the first unmatched node, or
227 <     * similarly for tail, rather than advancing head or tail
246 <     * proper. However, it is used (in awaitMatch) immediately before
247 <     * a waiting thread starts to block, as a final bit of helping at
248 <     * a point when contention with others is extremely unlikely
249 <     * (since if other threads that could release it are operating,
250 <     * then the current thread wouldn't be blocking).
224 >     * With such a small slack threshold value, it is not worthwhile
225 >     * to augment this with path short-circuiting (i.e., unsplicing
226 >     * interior nodes) except in the case of cancellation/removal (see
227 >     * below).
228       *
229       * We allow both the head and tail fields to be null before any
230       * nodes are enqueued; initializing upon first append.  This
# Line 329 | Line 306 | public class LinkedTransferQueue<E> exte
306       *    versa) compared to their predecessors receive additional
307       *    chained spins, reflecting longer paths typically required to
308       *    unblock threads during phase changes.
309 +     *
310 +     *
311 +     * ** Unlinking removed interior nodes **
312 +     *
313 +     * In addition to minimizing garbage retention via self-linking
314 +     * described above, we also unlink removed interior nodes. These
315 +     * may arise due to timed out or interrupted waits, or calls to
316 +     * remove(x) or Iterator.remove.  Normally, given a node that was
317 +     * at one time known to be the predecessor of some node s that is
318 +     * to be removed, we can unsplice s by CASing the next field of
319 +     * its predecessor if it still points to s (otherwise s must
320 +     * already have been removed or is now offlist). But there are two
321 +     * situations in which we cannot guarantee to make node s
322 +     * unreachable in this way: (1) If s is the trailing node of list
323 +     * (i.e., with null next), then it is pinned as the target node
324 +     * for appends, so can only be removed later when other nodes are
325 +     * appended. (2) We cannot necessarily unlink s given a
326 +     * predecessor node that is matched (including the case of being
327 +     * cancelled): the predecessor may already be unspliced, in which
328 +     * case some previous reachable node may still point to s.
329 +     * (For further explanation see Herlihy & Shavit "The Art of
330 +     * Multiprocessor Programming" chapter 9).  Although, in both
331 +     * cases, we can rule out the need for further action if either s
332 +     * or its predecessor are (or can be made to be) at, or fall off
333 +     * from, the head of list.
334 +     *
335 +     * Without taking these into account, it would be possible for an
336 +     * unbounded number of supposedly removed nodes to remain
337 +     * reachable.  Situations leading to such buildup are uncommon but
338 +     * can occur in practice; for example when a series of short timed
339 +     * calls to poll repeatedly time out but never otherwise fall off
340 +     * the list because of an untimed call to take at the front of the
341 +     * queue.
342 +     *
343 +     * When these cases arise, rather than always retraversing the
344 +     * entire list to find an actual predecessor to unlink (which
345 +     * won't help for case (1) anyway), we record a conservative
346 +     * estimate of possible unsplice failures (in "sweepVotes").  We
347 +     * trigger a full sweep when the estimate exceeds a threshold
348 +     * indicating the maximum number of estimated removal failures to
349 +     * tolerate before sweeping through, unlinking cancelled nodes
350 +     * that were not unlinked upon initial removal. We perform sweeps
351 +     * by the thread hitting threshold (rather than background threads
352 +     * or by spreading work to other threads) because in the main
353 +     * contexts in which removal occurs, the caller is already
354 +     * timed-out, cancelled, or performing a potentially O(n)
355 +     * operation (i.e., remove(x)), none of which are time-critical
356 +     * enough to warrant the overhead that alternatives would impose
357 +     * on other threads.
358 +     *
359 +     * Because the sweepVotes estimate is conservative, and because
360 +     * nodes become unlinked "naturally" as they fall off the head of
361 +     * the queue, and because we allow votes to accumulate even while
362 +     * sweeps are in progress, there are typically significantly fewer
363 +     * such nodes than estimated.  Choice of a threshold value
364 +     * balances the likelihood of wasted effort and contention, versus
365 +     * providing a worst-case bound on retention of interior nodes in
366 +     * quiescent queues. The value defined below was chosen
367 +     * empirically to balance these under various timeout scenarios.
368 +     *
369 +     * Note that we cannot self-link unlinked interior nodes during
370 +     * sweeps. However, the associated garbage chains terminate when
371 +     * some successor ultimately falls off the head of the list and is
372 +     * self-linked.
373       */
374  
375      /** True if on multiprocessor */
# Line 355 | Line 396 | public class LinkedTransferQueue<E> exte
396      private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
397  
398      /**
399 +     * The maximum number of estimated removal failures (sweepVotes)
400 +     * to tolerate before sweeping through the queue unlinking
401 +     * cancelled nodes that were not unlinked upon initial
402 +     * removal. See above for explanation. The value must be at least
403 +     * two to avoid useless sweeps when removing trailing nodes.
404 +     */
405 +    static final int SWEEP_THRESHOLD = 32;
406 +
407 +    /**
408       * Queue nodes. Uses Object, not E, for items to allow forgetting
409       * them after use.  Relies heavily on Unsafe mechanics to minimize
410 <     * unnecessary ordering constraints: Writes that intrinsically
411 <     * precede or follow CASes use simple relaxed forms.  Other
362 <     * cleanups use releasing/lazy writes.
410 >     * unnecessary ordering constraints: Writes that are intrinsically
411 >     * ordered wrt other accesses or CASes use simple relaxed forms.
412       */
413 <    static final class Node<E> {
413 >    static final class Node {
414          final boolean isData;   // false if this is a request node
415          volatile Object item;   // initially non-null if isData; CASed to match
416 <        volatile Node<E> next;
416 >        volatile Node next;
417          volatile Thread waiter; // null until waiting
418  
419          // CAS methods for fields
420 <        final boolean casNext(Node<E> cmp, Node<E> val) {
420 >        final boolean casNext(Node cmp, Node val) {
421              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
422          }
423  
# Line 381 | Line 430 | public class LinkedTransferQueue<E> exte
430           * Creates a new node. Uses relaxed write because item can only
431           * be seen if followed by CAS.
432           */
433 <        Node(E item, boolean isData) {
433 >        Node(Object item, boolean isData) {
434              UNSAFE.putObject(this, itemOffset, item); // relaxed write
435              this.isData = isData;
436          }
# Line 395 | Line 444 | public class LinkedTransferQueue<E> exte
444          }
445  
446          /**
447 <         * Sets item to self (using a releasing/lazy write) and waiter
448 <         * to null, to avoid garbage retention after extracting or
449 <         * cancelling.
447 >         * Sets item to self and waiter to null, to avoid garbage
448 >         * retention after matching or cancelling. Uses relaxed writes
449 >         * bacause order is already constrained in the only calling
450 >         * contexts: item is forgotten only after volatile/atomic
451 >         * mechanics that extract items.  Similarly, clearing waiter
452 >         * follows either CAS or return from park (if ever parked;
453 >         * else we don't care).
454           */
455          final void forgetContents() {
456 <            UNSAFE.putOrderedObject(this, itemOffset, this);
457 <            UNSAFE.putOrderedObject(this, waiterOffset, null);
456 >            UNSAFE.putObject(this, itemOffset, this);
457 >            UNSAFE.putObject(this, waiterOffset, null);
458          }
459  
460          /**
# Line 410 | Line 463 | public class LinkedTransferQueue<E> exte
463           */
464          final boolean isMatched() {
465              Object x = item;
466 <            return x == this || (x != null) != isData;
466 >            return (x == this) || ((x == null) == isData);
467 >        }
468 >
469 >        /**
470 >         * Returns true if this is an unmatched request node.
471 >         */
472 >        final boolean isUnmatchedRequest() {
473 >            return !isData && item == null;
474          }
475  
476          /**
# Line 428 | Line 488 | public class LinkedTransferQueue<E> exte
488           * Tries to artificially match a data node -- used by remove.
489           */
490          final boolean tryMatchData() {
491 +            assert isData;
492              Object x = item;
493              if (x != null && x != this && casItem(x, null)) {
494                  LockSupport.unpark(waiter);
# Line 449 | Line 510 | public class LinkedTransferQueue<E> exte
510      }
511  
512      /** head of the queue; null until first enqueue */
513 <    transient volatile Node<E> head;
453 <
454 <    /** predecessor of dangling unspliceable node */
455 <    private transient volatile Node<E> cleanMe; // decl here reduces contention
513 >    transient volatile Node head;
514  
515      /** tail of the queue; null until first append */
516 <    private transient volatile Node<E> tail;
516 >    private transient volatile Node tail;
517 >
518 >    /** The number of apparent failures to unsplice removed nodes */
519 >    private transient volatile int sweepVotes;
520  
521      // CAS methods for fields
522 <    private boolean casTail(Node<E> cmp, Node<E> val) {
522 >    private boolean casTail(Node cmp, Node val) {
523          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
524      }
525  
526 <    private boolean casHead(Node<E> cmp, Node<E> val) {
526 >    private boolean casHead(Node cmp, Node val) {
527          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
528      }
529  
530 <    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
531 <        return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
530 >    private boolean casSweepVotes(int cmp, int val) {
531 >        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
532      }
533  
534      /*
535 <     * Possible values for "how" argument in xfer method. Beware that
475 <     * the order of assigned numerical values matters.
535 >     * Possible values for "how" argument in xfer method.
536       */
537 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
538 <    private static final int ASYNC   = 1; // for offer, put, add
539 <    private static final int SYNC    = 2; // for transfer, take
540 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
537 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
538 >    private static final int ASYNC = 1; // for offer, put, add
539 >    private static final int SYNC  = 2; // for transfer, take
540 >    private static final int TIMED = 3; // for timed poll, tryTransfer
541  
542      @SuppressWarnings("unchecked")
543      static <E> E cast(Object item) {
# Line 490 | Line 550 | public class LinkedTransferQueue<E> exte
550       *
551       * @param e the item or null for take
552       * @param haveData true if this is a put, else a take
553 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
554 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
553 >     * @param how NOW, ASYNC, SYNC, or TIMED
554 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
555       * @return an item if matched, else e
556       * @throws NullPointerException if haveData mode but e is null
557       */
558      private E xfer(E e, boolean haveData, int how, long nanos) {
559          if (haveData && (e == null))
560              throw new NullPointerException();
561 <        Node<E> s = null;                     // the node to append, if needed
561 >        Node s = null;                        // the node to append, if needed
562  
563          retry: for (;;) {                     // restart on append race
564  
565 <            for (Node<E> h = head, p = h; p != null;) {
506 <                // find & match first node
565 >            for (Node h = head, p = h; p != null;) { // find & match first node
566                  boolean isData = p.isData;
567                  Object item = p.item;
568                  if (item != p && (item != null) == isData) { // unmatched
569                      if (isData == haveData)   // can't match
570                          break;
571                      if (p.casItem(item, e)) { // match
572 <                        for (Node<E> q = p; q != h;) {
573 <                            Node<E> n = q.next; // update head by 2
574 <                            if (n != null)    // unless singleton
516 <                                q = n;
517 <                            if (head == h && casHead(h, q)) {
572 >                        for (Node q = p; q != h;) {
573 >                            Node n = q.next;  // update by 2 unless singleton
574 >                            if (head == h && casHead(h, n == null? q : n)) {
575                                  h.forgetNext();
576                                  break;
577                              }                 // advance and retry
# Line 526 | Line 583 | public class LinkedTransferQueue<E> exte
583                          return this.<E>cast(item);
584                      }
585                  }
586 <                Node<E> n = p.next;
586 >                Node n = p.next;
587                  p = (p != n) ? n : (h = head); // Use head if p offlist
588              }
589  
590 <            if (how >= ASYNC) {               // No matches available
590 >            if (how != NOW) {                 // No matches available
591                  if (s == null)
592 <                    s = new Node<E>(e, haveData);
593 <                Node<E> pred = tryAppend(s, haveData);
592 >                    s = new Node(e, haveData);
593 >                Node pred = tryAppend(s, haveData);
594                  if (pred == null)
595                      continue retry;           // lost race vs opposite mode
596 <                if (how >= SYNC)
597 <                    return awaitMatch(s, pred, e, how, nanos);
596 >                if (how != ASYNC)
597 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
598              }
599              return e; // not waiting
600          }
# Line 552 | Line 609 | public class LinkedTransferQueue<E> exte
609       * different mode, else s's predecessor, or s itself if no
610       * predecessor
611       */
612 <    private Node<E> tryAppend(Node<E> s, boolean haveData) {
613 <        for (Node<E> t = tail, p = t;;) { // move p to last node and append
614 <            Node<E> n, u;                     // temps for reads of next & tail
612 >    private Node tryAppend(Node s, boolean haveData) {
613 >        for (Node t = tail, p = t;;) {        // move p to last node and append
614 >            Node n, u;                        // temps for reads of next & tail
615              if (p == null && (p = head) == null) {
616                  if (casHead(null, s))
617                      return s;                 // initialize
# Line 586 | Line 643 | public class LinkedTransferQueue<E> exte
643       * predecessor, or null if unknown (the null case does not occur
644       * in any current calls but may in possible future extensions)
645       * @param e the comparison value for checking match
646 <     * @param how either SYNC or TIMEOUT
647 <     * @param nanos timeout value
646 >     * @param timed if true, wait only until timeout elapses
647 >     * @param nanos timeout in nanosecs, used only if timed is true
648       * @return matched item, or e if unmatched on interrupt or timeout
649       */
650 <    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
651 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
650 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
651 >        long lastTime = timed ? System.nanoTime() : 0L;
652          Thread w = Thread.currentThread();
653          int spins = -1; // initialized after first item and cancel checks
654          ThreadLocalRandom randomYields = null; // bound if needed
# Line 603 | Line 660 | public class LinkedTransferQueue<E> exte
660                  s.forgetContents();           // avoid garbage
661                  return this.<E>cast(item);
662              }
663 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
664 <                    s.casItem(e, s)) {       // cancel
663 >            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
664 >                    s.casItem(e, s)) {        // cancel
665                  unsplice(pred, s);
666                  return e;
667              }
# Line 614 | Line 671 | public class LinkedTransferQueue<E> exte
671                      randomYields = ThreadLocalRandom.current();
672              }
673              else if (spins > 0) {             // spin
674 <                if (--spins == 0)
675 <                    shortenHeadPath();        // reduce slack before blocking
619 <                else if (randomYields.nextInt(CHAINED_SPINS) == 0)
674 >                --spins;
675 >                if (randomYields.nextInt(CHAINED_SPINS) == 0)
676                      Thread.yield();           // occasionally yield
677              }
678              else if (s.waiter == null) {
679                  s.waiter = w;                 // request unpark then recheck
680              }
681 <            else if (how == TIMEOUT) {
681 >            else if (timed) {
682                  long now = System.nanoTime();
683                  if ((nanos -= now - lastTime) > 0)
684                      LockSupport.parkNanos(this, nanos);
# Line 630 | Line 686 | public class LinkedTransferQueue<E> exte
686              }
687              else {
688                  LockSupport.park(this);
633                s.waiter = null;
634                spins = -1;                   // spin if front upon wakeup
689              }
690          }
691      }
# Line 640 | Line 694 | public class LinkedTransferQueue<E> exte
694       * Returns spin/yield value for a node with given predecessor and
695       * data mode. See above for explanation.
696       */
697 <    private static int spinsFor(Node<?> pred, boolean haveData) {
697 >    private static int spinsFor(Node pred, boolean haveData) {
698          if (MP && pred != null) {
699              if (pred.isData != haveData)      // phase change
700                  return FRONT_SPINS + CHAINED_SPINS;
# Line 652 | Line 706 | public class LinkedTransferQueue<E> exte
706          return 0;
707      }
708  
709 +    /* -------------- Traversal methods -------------- */
710 +
711      /**
712 <     * Tries (once) to unsplice nodes between head and first unmatched
713 <     * or trailing node; failing on contention.
714 <     */
715 <    private void shortenHeadPath() {
716 <        Node<E> h, hn, p, q;
717 <        if ((p = h = head) != null && h.isMatched() &&
718 <            (q = hn = h.next) != null) {
663 <            Node<E> n;
664 <            while ((n = q.next) != q) {
665 <                if (n == null || !q.isMatched()) {
666 <                    if (hn != q && h.next == hn)
667 <                        h.casNext(hn, q);
668 <                    break;
669 <                }
670 <                p = q;
671 <                q = n;
672 <            }
673 <        }
712 >     * Returns the successor of p, or the head node if p.next has been
713 >     * linked to self, which will only be true if traversing with a
714 >     * stale pointer that is now off the list.
715 >     */
716 >    final Node succ(Node p) {
717 >        Node next = p.next;
718 >        return (p == next) ? head : next;
719      }
720  
676    /* -------------- Traversal methods -------------- */
677
721      /**
722       * Returns the first unmatched node of the given mode, or null if
723       * none.  Used by methods isEmpty, hasWaitingConsumer.
724       */
725 <    private Node<E> firstOfMode(boolean data) {
726 <        for (Node<E> p = head; p != null; ) {
725 >    private Node firstOfMode(boolean isData) {
726 >        for (Node p = head; p != null; p = succ(p)) {
727              if (!p.isMatched())
728 <                return (p.isData == data) ? p : null;
686 <            Node<E> n = p.next;
687 <            p = (n != p) ? n : head;
728 >                return (p.isData == isData) ? p : null;
729          }
730          return null;
731      }
# Line 694 | Line 735 | public class LinkedTransferQueue<E> exte
735       * null if none.  Used by peek.
736       */
737      private E firstDataItem() {
738 <        for (Node<E> p = head; p != null; ) {
698 <            boolean isData = p.isData;
738 >        for (Node p = head; p != null; p = succ(p)) {
739              Object item = p.item;
740 <            if (item != p && (item != null) == isData)
741 <                return isData ? this.<E>cast(item) : null;
742 <            Node<E> n = p.next;
743 <            p = (n != p) ? n : head;
740 >            if (p.isData) {
741 >                if (item != null && item != p)
742 >                    return this.<E>cast(item);
743 >            }
744 >            else if (item == null)
745 >                return null;
746          }
747          return null;
748      }
# Line 711 | Line 753 | public class LinkedTransferQueue<E> exte
753       */
754      private int countOfMode(boolean data) {
755          int count = 0;
756 <        for (Node<E> p = head; p != null; ) {
756 >        for (Node p = head; p != null; ) {
757              if (!p.isMatched()) {
758                  if (p.isData != data)
759                      return 0;
760                  if (++count == Integer.MAX_VALUE) // saturated
761                      break;
762              }
763 <            Node<E> n = p.next;
763 >            Node n = p.next;
764              if (n != p)
765                  p = n;
766              else {
# Line 730 | Line 772 | public class LinkedTransferQueue<E> exte
772      }
773  
774      final class Itr implements Iterator<E> {
775 <        private Node<E> nextNode;   // next node to return item for
776 <        private E nextItem;         // the corresponding item
777 <        private Node<E> lastRet;    // last returned node, to support remove
775 >        private Node nextNode;   // next node to return item for
776 >        private E nextItem;      // the corresponding item
777 >        private Node lastRet;    // last returned node, to support remove
778 >        private Node lastPred;   // predecessor to unlink lastRet
779  
780          /**
781           * Moves to next node after prev, or first node if prev null.
782           */
783 <        private void advance(Node<E> prev) {
783 >        private void advance(Node prev) {
784 >            lastPred = lastRet;
785              lastRet = prev;
786 <            Node<E> p;
787 <            if (prev == null || (p = prev.next) == prev)
744 <                p = head;
745 <            while (p != null) {
786 >            for (Node p = (prev == null) ? head : succ(prev);
787 >                 p != null; p = succ(p)) {
788                  Object item = p.item;
789                  if (p.isData) {
790                      if (item != null && item != p) {
# Line 753 | Line 795 | public class LinkedTransferQueue<E> exte
795                  }
796                  else if (item == null)
797                      break;
756                Node<E> n = p.next;
757                p = (n != p) ? n : head;
798              }
799              nextNode = null;
800          }
# Line 768 | Line 808 | public class LinkedTransferQueue<E> exte
808          }
809  
810          public final E next() {
811 <            Node<E> p = nextNode;
811 >            Node p = nextNode;
812              if (p == null) throw new NoSuchElementException();
813              E e = nextItem;
814              advance(p);
# Line 776 | Line 816 | public class LinkedTransferQueue<E> exte
816          }
817  
818          public final void remove() {
819 <            Node<E> p = lastRet;
819 >            Node p = lastRet;
820              if (p == null) throw new IllegalStateException();
821 <            lastRet = null;
822 <            findAndRemoveNode(p);
821 >            if (p.tryMatchData())
822 >                unsplice(lastPred, p);
823          }
824      }
825  
# Line 789 | Line 829 | public class LinkedTransferQueue<E> exte
829       * Unsplices (now or later) the given deleted/cancelled node with
830       * the given predecessor.
831       *
832 <     * @param pred predecessor of node to be unspliced
832 >     * @param pred a node that was at one time known to be the
833 >     * predecessor of s, or null or s itself if s is/was at head
834       * @param s the node to be unspliced
835       */
836 <    private void unsplice(Node<E> pred, Node<E> s) {
837 <        s.forgetContents(); // clear unneeded fields
836 >    final void unsplice(Node pred, Node s) {
837 >        s.forgetContents(); // forget unneeded fields
838          /*
839 <         * At any given time, exactly one node on list cannot be
840 <         * unlinked -- the last inserted node. To accommodate this, if
841 <         * we cannot unlink s, we save its predecessor as "cleanMe",
842 <         * processing the previously saved version first. Because only
843 <         * one node in the list can have a null next, at least one of
803 <         * node s or the node previously saved can always be
804 <         * processed, so this always terminates.
839 >         * See above for rationale. Briefly: if pred still points to
840 >         * s, try to unlink s.  If s cannot be unlinked, because it is
841 >         * trailing node or pred might be unlinked, and neither pred
842 >         * nor s are head or offlist, add to sweepVotes, and if enough
843 >         * votes have accumulated, sweep.
844           */
845 <        if (pred != null && pred != s) {
846 <            while (pred.next == s) {
847 <                Node<E> oldpred = (cleanMe == null) ? null : reclean();
848 <                Node<E> n = s.next;
849 <                if (n != null) {
850 <                    if (n != s)
851 <                        pred.casNext(s, n);
852 <                    break;
845 >        if (pred != null && pred != s && pred.next == s) {
846 >            Node n = s.next;
847 >            if (n == null ||
848 >                (n != s && pred.casNext(s, n) && pred.isMatched())) {
849 >                for (;;) {               // check if at, or could be, head
850 >                    Node h = head;
851 >                    if (h == pred || h == s || h == null)
852 >                        return;          // at head or list empty
853 >                    if (!h.isMatched())
854 >                        break;
855 >                    Node hn = h.next;
856 >                    if (hn == null)
857 >                        return;          // now empty
858 >                    if (hn != h && casHead(h, hn))
859 >                        h.forgetNext();  // advance head
860 >                }
861 >                if (pred.next != pred && s.next != s) { // recheck if offlist
862 >                    for (;;) {           // sweep now if enough votes
863 >                        int v = sweepVotes;
864 >                        if (v < SWEEP_THRESHOLD) {
865 >                            if (casSweepVotes(v, v + 1))
866 >                                break;
867 >                        }
868 >                        else if (casSweepVotes(v, 0)) {
869 >                            sweep();
870 >                            break;
871 >                        }
872 >                    }
873                  }
815                if (oldpred == pred ||      // Already saved
816                    (oldpred == null && casCleanMe(null, pred)))
817                    break;                  // Postpone cleaning
874              }
875          }
876      }
877  
878      /**
879 <     * Tries to unsplice the deleted/cancelled node held in cleanMe
824 <     * that was previously uncleanable because it was at tail.
825 <     *
826 <     * @return current cleanMe node (or null)
879 >     * Unlinks matched nodes encountered in a traversal from head.
880       */
881 <    private Node<E> reclean() {
882 <        /*
883 <         * cleanMe is, or at one time was, predecessor of a cancelled
884 <         * node s that was the tail so could not be unspliced.  If it
885 <         * is no longer the tail, try to unsplice if necessary and
886 <         * make cleanMe slot available.  This differs from similar
887 <         * code in unsplice() because we must check that pred still
835 <         * points to a matched node that can be unspliced -- if not,
836 <         * we can (must) clear cleanMe without unsplicing.  This can
837 <         * loop only due to contention.
838 <         */
839 <        Node<E> pred;
840 <        while ((pred = cleanMe) != null) {
841 <            Node<E> s = pred.next;
842 <            Node<E> n;
843 <            if (s == null || s == pred || !s.isMatched())
844 <                casCleanMe(pred, null); // already gone
845 <            else if ((n = s.next) != null) {
846 <                if (n != s)
847 <                    pred.casNext(s, n);
848 <                casCleanMe(pred, null);
849 <            }
850 <            else
881 >    private void sweep() {
882 >        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
883 >            if (p == s)                    // stale
884 >                p = head;
885 >            else if (!s.isMatched())
886 >                p = s;
887 >            else if ((n = s.next) == null) // trailing node is pinned
888                  break;
889 <        }
890 <        return pred;
854 <    }
855 <
856 <    /**
857 <     * Main implementation of Iterator.remove(). Find
858 <     * and unsplice the given node.
859 <     */
860 <    final void findAndRemoveNode(Node<E> s) {
861 <        if (s.tryMatchData()) {
862 <            Node<E> pred = null;
863 <            Node<E> p = head;
864 <            while (p != null) {
865 <                if (p == s) {
866 <                    unsplice(pred, p);
867 <                    break;
868 <                }
869 <                if (!p.isData && !p.isMatched())
870 <                    break;
871 <                pred = p;
872 <                if ((p = p.next) == pred) { // stale
873 <                    pred = null;
874 <                    p = head;
875 <                }
876 <            }
889 >            else
890 >                p.casNext(s, n);
891          }
892      }
893  
# Line 882 | Line 896 | public class LinkedTransferQueue<E> exte
896       */
897      private boolean findAndRemove(Object e) {
898          if (e != null) {
899 <            Node<E> pred = null;
886 <            Node<E> p = head;
887 <            while (p != null) {
899 >            for (Node pred = null, p = head; p != null; ) {
900                  Object item = p.item;
901                  if (p.isData) {
902                      if (item != null && item != p && e.equals(item) &&
# Line 896 | Line 908 | public class LinkedTransferQueue<E> exte
908                  else if (item == null)
909                      break;
910                  pred = p;
911 <                if ((p = p.next) == pred) {
911 >                if ((p = p.next) == pred) { // stale
912                      pred = null;
913                      p = head;
914                  }
# Line 1024 | Line 1036 | public class LinkedTransferQueue<E> exte
1036       */
1037      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1038          throws InterruptedException {
1039 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1039 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1040              return true;
1041          if (!Thread.interrupted())
1042              return false;
# Line 1040 | Line 1052 | public class LinkedTransferQueue<E> exte
1052      }
1053  
1054      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1055 <        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1055 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1056          if (e != null || !Thread.interrupted())
1057              return e;
1058          throw new InterruptedException();
# Line 1207 | Line 1219 | public class LinkedTransferQueue<E> exte
1219          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1220      private static final long tailOffset =
1221          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1222 <    private static final long cleanMeOffset =
1223 <        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
1222 >    private static final long sweepVotesOffset =
1223 >        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1224  
1225      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1226                                    String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines