ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.64 by jsr166, Mon Nov 2 06:12:02 2009 UTC vs.
Revision 1.81 by jsr166, Sun Nov 14 20:34:47 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.AbstractQueue;
10   import java.util.Collection;
11   import java.util.ConcurrentModificationException;
12   import java.util.Iterator;
13   import java.util.NoSuchElementException;
14   import java.util.Queue;
15 + import java.util.concurrent.TimeUnit;
16   import java.util.concurrent.locks.LockSupport;
17 +
18   /**
19   * An unbounded {@link TransferQueue} based on linked nodes.
20   * This queue orders elements FIFO (first-in-first-out) with respect
# Line 206 | Line 206 | public class LinkedTransferQueue<E> exte
206       * additional GC bookkeeping ("write barriers") that are sometimes
207       * more costly than the writes themselves because of contention).
208       *
209     * Removal of interior nodes (due to timed out or interrupted
210     * waits, or calls to remove(x) or Iterator.remove) can use a
211     * scheme roughly similar to that described in Scherer, Lea, and
212     * Scott's SynchronousQueue. Given a predecessor, we can unsplice
213     * any node except the (actual) tail of the queue. To avoid
214     * build-up of cancelled trailing nodes, upon a request to remove
215     * a trailing node, it is placed in field "cleanMe" to be
216     * unspliced upon the next call to unsplice any other node.
217     * Situations needing such mechanics are not common but do occur
218     * in practice; for example when an unbounded series of short
219     * timed calls to poll repeatedly time out but never otherwise
220     * fall off the list because of an untimed call to take at the
221     * front of the queue. Note that maintaining field cleanMe does
222     * not otherwise much impact garbage retention even if never
223     * cleared by some other call because the held node will
224     * eventually either directly or indirectly lead to a self-link
225     * once off the list.
226     *
209       * *** Overview of implementation ***
210       *
211       * We use a threshold-based approach to updates, with a slack
# Line 239 | Line 221 | public class LinkedTransferQueue<E> exte
221       * per-thread one available, but even ThreadLocalRandom is too
222       * heavy for these purposes.
223       *
224 <     * With such a small slack threshold value, it is rarely
225 <     * worthwhile to augment this with path short-circuiting; i.e.,
226 <     * unsplicing nodes between head and the first unmatched node, or
227 <     * similarly for tail, rather than advancing head or tail
246 <     * proper. However, it is used (in awaitMatch) immediately before
247 <     * a waiting thread starts to block, as a final bit of helping at
248 <     * a point when contention with others is extremely unlikely
249 <     * (since if other threads that could release it are operating,
250 <     * then the current thread wouldn't be blocking).
224 >     * With such a small slack threshold value, it is not worthwhile
225 >     * to augment this with path short-circuiting (i.e., unsplicing
226 >     * interior nodes) except in the case of cancellation/removal (see
227 >     * below).
228       *
229       * We allow both the head and tail fields to be null before any
230       * nodes are enqueued; initializing upon first append.  This
# Line 329 | Line 306 | public class LinkedTransferQueue<E> exte
306       *    versa) compared to their predecessors receive additional
307       *    chained spins, reflecting longer paths typically required to
308       *    unblock threads during phase changes.
309 +     *
310 +     *
311 +     * ** Unlinking removed interior nodes **
312 +     *
313 +     * In addition to minimizing garbage retention via self-linking
314 +     * described above, we also unlink removed interior nodes. These
315 +     * may arise due to timed out or interrupted waits, or calls to
316 +     * remove(x) or Iterator.remove.  Normally, given a node that was
317 +     * at one time known to be the predecessor of some node s that is
318 +     * to be removed, we can unsplice s by CASing the next field of
319 +     * its predecessor if it still points to s (otherwise s must
320 +     * already have been removed or is now offlist). But there are two
321 +     * situations in which we cannot guarantee to make node s
322 +     * unreachable in this way: (1) If s is the trailing node of list
323 +     * (i.e., with null next), then it is pinned as the target node
324 +     * for appends, so can only be removed later after other nodes are
325 +     * appended. (2) We cannot necessarily unlink s given a
326 +     * predecessor node that is matched (including the case of being
327 +     * cancelled): the predecessor may already be unspliced, in which
328 +     * case some previous reachable node may still point to s.
329 +     * (For further explanation see Herlihy & Shavit "The Art of
330 +     * Multiprocessor Programming" chapter 9).  Although, in both
331 +     * cases, we can rule out the need for further action if either s
332 +     * or its predecessor are (or can be made to be) at, or fall off
333 +     * from, the head of list.
334 +     *
335 +     * Without taking these into account, it would be possible for an
336 +     * unbounded number of supposedly removed nodes to remain
337 +     * reachable.  Situations leading to such buildup are uncommon but
338 +     * can occur in practice; for example when a series of short timed
339 +     * calls to poll repeatedly time out but never otherwise fall off
340 +     * the list because of an untimed call to take at the front of the
341 +     * queue.
342 +     *
343 +     * When these cases arise, rather than always retraversing the
344 +     * entire list to find an actual predecessor to unlink (which
345 +     * won't help for case (1) anyway), we record a conservative
346 +     * estimate of possible unsplice failures (in "sweepVotes").
347 +     * We trigger a full sweep when the estimate exceeds a threshold
348 +     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
349 +     * removal failures to tolerate before sweeping through, unlinking
350 +     * cancelled nodes that were not unlinked upon initial removal.
351 +     * We perform sweeps by the thread hitting threshold (rather than
352 +     * background threads or by spreading work to other threads)
353 +     * because in the main contexts in which removal occurs, the
354 +     * caller is already timed-out, cancelled, or performing a
355 +     * potentially O(n) operation (e.g. remove(x)), none of which are
356 +     * time-critical enough to warrant the overhead that alternatives
357 +     * would impose on other threads.
358 +     *
359 +     * Because the sweepVotes estimate is conservative, and because
360 +     * nodes become unlinked "naturally" as they fall off the head of
361 +     * the queue, and because we allow votes to accumulate even while
362 +     * sweeps are in progress, there are typically significantly fewer
363 +     * such nodes than estimated.  Choice of a threshold value
364 +     * balances the likelihood of wasted effort and contention, versus
365 +     * providing a worst-case bound on retention of interior nodes in
366 +     * quiescent queues. The value defined below was chosen
367 +     * empirically to balance these under various timeout scenarios.
368 +     *
369 +     * Note that we cannot self-link unlinked interior nodes during
370 +     * sweeps. However, the associated garbage chains terminate when
371 +     * some successor ultimately falls off the head of the list and is
372 +     * self-linked.
373       */
374  
375      /** True if on multiprocessor */
# Line 355 | Line 396 | public class LinkedTransferQueue<E> exte
396      private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
397  
398      /**
399 +     * The maximum number of estimated removal failures (sweepVotes)
400 +     * to tolerate before sweeping through the queue unlinking
401 +     * cancelled nodes that were not unlinked upon initial
402 +     * removal. See above for explanation. The value must be at least
403 +     * two to avoid useless sweeps when removing trailing nodes.
404 +     */
405 +    static final int SWEEP_THRESHOLD = 32;
406 +
407 +    /**
408       * Queue nodes. Uses Object, not E, for items to allow forgetting
409       * them after use.  Relies heavily on Unsafe mechanics to minimize
410 <     * unnecessary ordering constraints: Writes that intrinsically
411 <     * precede or follow CASes use simple relaxed forms.  Other
362 <     * cleanups use releasing/lazy writes.
410 >     * unnecessary ordering constraints: Writes that are intrinsically
411 >     * ordered wrt other accesses or CASes use simple relaxed forms.
412       */
413      static final class Node {
414          final boolean isData;   // false if this is a request node
# Line 373 | Line 422 | public class LinkedTransferQueue<E> exte
422          }
423  
424          final boolean casItem(Object cmp, Object val) {
425 <            assert cmp == null || cmp.getClass() != Node.class;
425 >            // assert cmp == null || cmp.getClass() != Node.class;
426              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
427          }
428  
429          /**
430 <         * Creates a new node. Uses relaxed write because item can only
431 <         * be seen if followed by CAS.
430 >         * Constructs a new node.  Uses relaxed write because item can
431 >         * only be seen after publication via casNext.
432           */
433          Node(Object item, boolean isData) {
434              UNSAFE.putObject(this, itemOffset, item); // relaxed write
# Line 395 | Line 444 | public class LinkedTransferQueue<E> exte
444          }
445  
446          /**
447 <         * Sets item to self (using a releasing/lazy write) and waiter
448 <         * to null, to avoid garbage retention after extracting or
449 <         * cancelling.
447 >         * Sets item to self and waiter to null, to avoid garbage
448 >         * retention after matching or cancelling. Uses relaxed writes
449 >         * because order is already constrained in the only calling
450 >         * contexts: item is forgotten only after volatile/atomic
451 >         * mechanics that extract items.  Similarly, clearing waiter
452 >         * follows either CAS or return from park (if ever parked;
453 >         * else we don't care).
454           */
455          final void forgetContents() {
456 <            UNSAFE.putOrderedObject(this, itemOffset, this);
457 <            UNSAFE.putOrderedObject(this, waiterOffset, null);
456 >            UNSAFE.putObject(this, itemOffset, this);
457 >            UNSAFE.putObject(this, waiterOffset, null);
458          }
459  
460          /**
# Line 435 | Line 488 | public class LinkedTransferQueue<E> exte
488           * Tries to artificially match a data node -- used by remove.
489           */
490          final boolean tryMatchData() {
491 <            assert isData;
491 >            // assert isData;
492              Object x = item;
493              if (x != null && x != this && casItem(x, null)) {
494                  LockSupport.unpark(waiter);
# Line 459 | Line 512 | public class LinkedTransferQueue<E> exte
512      /** head of the queue; null until first enqueue */
513      transient volatile Node head;
514  
462    /** predecessor of dangling unspliceable node */
463    private transient volatile Node cleanMe; // decl here reduces contention
464
515      /** tail of the queue; null until first append */
516      private transient volatile Node tail;
517  
518 +    /** The number of apparent failures to unsplice removed nodes */
519 +    private transient volatile int sweepVotes;
520 +
521      // CAS methods for fields
522      private boolean casTail(Node cmp, Node val) {
523          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
# Line 474 | Line 527 | public class LinkedTransferQueue<E> exte
527          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
528      }
529  
530 <    private boolean casCleanMe(Node cmp, Node val) {
531 <        return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
530 >    private boolean casSweepVotes(int cmp, int val) {
531 >        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
532      }
533  
534      /*
535       * Possible values for "how" argument in xfer method.
536       */
537 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
538 <    private static final int ASYNC   = 1; // for offer, put, add
539 <    private static final int SYNC    = 2; // for transfer, take
540 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
537 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
538 >    private static final int ASYNC = 1; // for offer, put, add
539 >    private static final int SYNC  = 2; // for transfer, take
540 >    private static final int TIMED = 3; // for timed poll, tryTransfer
541  
542      @SuppressWarnings("unchecked")
543      static <E> E cast(Object item) {
544 <        assert item == null || item.getClass() != Node.class;
544 >        // assert item == null || item.getClass() != Node.class;
545          return (E) item;
546      }
547  
# Line 497 | Line 550 | public class LinkedTransferQueue<E> exte
550       *
551       * @param e the item or null for take
552       * @param haveData true if this is a put, else a take
553 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
554 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
553 >     * @param how NOW, ASYNC, SYNC, or TIMED
554 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
555       * @return an item if matched, else e
556       * @throws NullPointerException if haveData mode but e is null
557       */
# Line 507 | Line 560 | public class LinkedTransferQueue<E> exte
560              throw new NullPointerException();
561          Node s = null;                        // the node to append, if needed
562  
563 <        retry: for (;;) {                     // restart on append race
563 >        retry:
564 >        for (;;) {                            // restart on append race
565  
566              for (Node h = head, p = h; p != null;) { // find & match first node
567                  boolean isData = p.isData;
# Line 517 | Line 571 | public class LinkedTransferQueue<E> exte
571                          break;
572                      if (p.casItem(item, e)) { // match
573                          for (Node q = p; q != h;) {
574 <                            Node n = q.next;  // update head by 2
575 <                            if (n != null)    // unless singleton
522 <                                q = n;
523 <                            if (head == h && casHead(h, q)) {
574 >                            Node n = q.next;  // update by 2 unless singleton
575 >                            if (head == h && casHead(h, n == null? q : n)) {
576                                  h.forgetNext();
577                                  break;
578                              }                 // advance and retry
# Line 543 | Line 595 | public class LinkedTransferQueue<E> exte
595                  if (pred == null)
596                      continue retry;           // lost race vs opposite mode
597                  if (how != ASYNC)
598 <                    return awaitMatch(s, pred, e, (how == TIMEOUT), nanos);
598 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
599              }
600              return e; // not waiting
601          }
# Line 593 | Line 645 | public class LinkedTransferQueue<E> exte
645       * in any current calls but may in possible future extensions)
646       * @param e the comparison value for checking match
647       * @param timed if true, wait only until timeout elapses
648 <     * @param nanos timeout value
648 >     * @param nanos timeout in nanosecs, used only if timed is true
649       * @return matched item, or e if unmatched on interrupt or timeout
650       */
651      private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
# Line 605 | Line 657 | public class LinkedTransferQueue<E> exte
657          for (;;) {
658              Object item = s.item;
659              if (item != e) {                  // matched
660 <                assert item != s;
660 >                // assert item != s;
661                  s.forgetContents();           // avoid garbage
662                  return this.<E>cast(item);
663              }
664              if ((w.isInterrupted() || (timed && nanos <= 0)) &&
665 <                    s.casItem(e, s)) {       // cancel
665 >                    s.casItem(e, s)) {        // cancel
666                  unsplice(pred, s);
667                  return e;
668              }
# Line 620 | Line 672 | public class LinkedTransferQueue<E> exte
672                      randomYields = ThreadLocalRandom.current();
673              }
674              else if (spins > 0) {             // spin
675 <                if (--spins == 0)
676 <                    shortenHeadPath();        // reduce slack before blocking
625 <                else if (randomYields.nextInt(CHAINED_SPINS) == 0)
675 >                --spins;
676 >                if (randomYields.nextInt(CHAINED_SPINS) == 0)
677                      Thread.yield();           // occasionally yield
678              }
679              else if (s.waiter == null) {
# Line 636 | Line 687 | public class LinkedTransferQueue<E> exte
687              }
688              else {
689                  LockSupport.park(this);
639                s.waiter = null;
640                spins = -1;                   // spin if front upon wakeup
690              }
691          }
692      }
# Line 658 | Line 707 | public class LinkedTransferQueue<E> exte
707          return 0;
708      }
709  
661    /**
662     * Tries (once) to unsplice nodes between head and first unmatched
663     * or trailing node; failing on contention.
664     */
665    private void shortenHeadPath() {
666        Node h, hn, p, q;
667        if ((p = h = head) != null && h.isMatched() &&
668            (q = hn = h.next) != null) {
669            Node n;
670            while ((n = q.next) != q) {
671                if (n == null || !q.isMatched()) {
672                    if (hn != q && h.next == hn)
673                        h.casNext(hn, q);
674                    break;
675                }
676                p = q;
677                q = n;
678            }
679        }
680    }
681
710      /* -------------- Traversal methods -------------- */
711  
712      /**
# Line 754 | Line 782 | public class LinkedTransferQueue<E> exte
782           * Moves to next node after prev, or first node if prev null.
783           */
784          private void advance(Node prev) {
785 <            lastPred = lastRet;
786 <            lastRet = prev;
787 <            for (Node p = (prev == null) ? head : succ(prev);
788 <                 p != null; p = succ(p)) {
789 <                Object item = p.item;
790 <                if (p.isData) {
791 <                    if (item != null && item != p) {
792 <                        nextItem = LinkedTransferQueue.this.<E>cast(item);
793 <                        nextNode = p;
785 >            /*
786 >             * To track and avoid buildup of deleted nodes in the face
787 >             * of calls to both Queue.remove and Itr.remove, we must
788 >             * include variants of unsplice and sweep upon each
789 >             * advance: Upon Itr.remove, we may need to catch up links
790 >             * from lastPred, and upon other removes, we might need to
791 >             * skip ahead from stale nodes and unsplice deleted ones
792 >             * found while advancing.
793 >             */
794 >
795 >            Node r, b; // reset lastPred upon possible deletion of lastRet
796 >            if ((r = lastRet) != null && !r.isMatched())
797 >                lastPred = r;    // next lastPred is old lastRet
798 >            else if ((b = lastPred) == null || b.isMatched())
799 >                lastPred = null; // at start of list
800 >            else {
801 >                Node s, n;       // help with removal of lastPred.next
802 >                while ((s = b.next) != null &&
803 >                       s != b && s.isMatched() &&
804 >                       (n = s.next) != null && n != s)
805 >                    b.casNext(s, n);
806 >            }
807 >
808 >            this.lastRet = prev;
809 >            for (Node p = prev, s, n;;) {
810 >                s = (p == null) ? head : p.next;
811 >                if (s == null)
812 >                    break;
813 >                else if (s == p) {
814 >                    p = null;
815 >                    continue;
816 >                }
817 >                Object item = s.item;
818 >                if (s.isData) {
819 >                    if (item != null && item != s) {
820 >                        nextItem = LinkedTransferQueue.<E>cast(item);
821 >                        nextNode = s;
822                          return;
823                      }
824                  }
825                  else if (item == null)
826                      break;
827 +                // assert s.isMatched();
828 +                if (p == null)
829 +                    p = s;
830 +                else if ((n = s.next) == null)
831 +                    break;
832 +                else if (s == n)
833 +                    p = null;
834 +                else
835 +                    p.casNext(s, n);
836              }
837              nextNode = null;
838 +            nextItem = null;
839          }
840  
841          Itr() {
# Line 789 | Line 855 | public class LinkedTransferQueue<E> exte
855          }
856  
857          public final void remove() {
858 <            Node p = lastRet;
859 <            if (p == null) throw new IllegalStateException();
860 <            findAndRemoveDataNode(lastPred, p);
858 >            final Node lastRet = this.lastRet;
859 >            if (lastRet == null)
860 >                throw new IllegalStateException();
861 >            this.lastRet = null;
862 >            if (lastRet.tryMatchData())
863 >                unsplice(lastPred, lastRet);
864          }
865      }
866  
# Line 801 | Line 870 | public class LinkedTransferQueue<E> exte
870       * Unsplices (now or later) the given deleted/cancelled node with
871       * the given predecessor.
872       *
873 <     * @param pred predecessor of node to be unspliced
873 >     * @param pred a node that was at one time known to be the
874 >     * predecessor of s, or null or s itself if s is/was at head
875       * @param s the node to be unspliced
876       */
877 <    private void unsplice(Node pred, Node s) {
878 <        s.forgetContents(); // clear unneeded fields
877 >    final void unsplice(Node pred, Node s) {
878 >        s.forgetContents(); // forget unneeded fields
879          /*
880 <         * At any given time, exactly one node on list cannot be
881 <         * unlinked -- the last inserted node. To accommodate this, if
882 <         * we cannot unlink s, we save its predecessor as "cleanMe",
883 <         * processing the previously saved version first. Because only
884 <         * one node in the list can have a null next, at least one of
815 <         * node s or the node previously saved can always be
816 <         * processed, so this always terminates.
880 >         * See above for rationale. Briefly: if pred still points to
881 >         * s, try to unlink s.  If s cannot be unlinked, because it is
882 >         * trailing node or pred might be unlinked, and neither pred
883 >         * nor s are head or offlist, add to sweepVotes, and if enough
884 >         * votes have accumulated, sweep.
885           */
886 <        if (pred != null && pred != s) {
887 <            while (pred.next == s) {
888 <                Node oldpred = (cleanMe == null) ? null : reclean();
889 <                Node n = s.next;
890 <                if (n != null) {
891 <                    if (n != s)
892 <                        pred.casNext(s, n);
893 <                    break;
886 >        if (pred != null && pred != s && pred.next == s) {
887 >            Node n = s.next;
888 >            if (n == null ||
889 >                (n != s && pred.casNext(s, n) && pred.isMatched())) {
890 >                for (;;) {               // check if at, or could be, head
891 >                    Node h = head;
892 >                    if (h == pred || h == s || h == null)
893 >                        return;          // at head or list empty
894 >                    if (!h.isMatched())
895 >                        break;
896 >                    Node hn = h.next;
897 >                    if (hn == null)
898 >                        return;          // now empty
899 >                    if (hn != h && casHead(h, hn))
900 >                        h.forgetNext();  // advance head
901                  }
902 <                if (oldpred == pred ||      // Already saved
903 <                    ((oldpred == null || oldpred.next == s) &&
904 <                     casCleanMe(oldpred, pred))) {
905 <                    break;
902 >                if (pred.next != pred && s.next != s) { // recheck if offlist
903 >                    for (;;) {           // sweep now if enough votes
904 >                        int v = sweepVotes;
905 >                        if (v < SWEEP_THRESHOLD) {
906 >                            if (casSweepVotes(v, v + 1))
907 >                                break;
908 >                        }
909 >                        else if (casSweepVotes(v, 0)) {
910 >                            sweep();
911 >                            break;
912 >                        }
913 >                    }
914                  }
915              }
916          }
917      }
918  
919      /**
920 <     * Tries to unsplice the deleted/cancelled node held in cleanMe
921 <     * that was previously uncleanable because it was at tail.
839 <     *
840 <     * @return current cleanMe node (or null)
920 >     * Unlinks matched (typically cancelled) nodes encountered in a
921 >     * traversal from head.
922       */
923 <    private Node reclean() {
924 <        /*
925 <         * cleanMe is, or at one time was, predecessor of a cancelled
926 <         * node s that was the tail so could not be unspliced.  If it
927 <         * is no longer the tail, try to unsplice if necessary and
928 <         * make cleanMe slot available.  This differs from similar
848 <         * code in unsplice() because we must check that pred still
849 <         * points to a matched node that can be unspliced -- if not,
850 <         * we can (must) clear cleanMe without unsplicing.  This can
851 <         * loop only due to contention.
852 <         */
853 <        Node pred;
854 <        while ((pred = cleanMe) != null) {
855 <            Node s = pred.next;
856 <            Node n;
857 <            if (s == null || s == pred || !s.isMatched())
858 <                casCleanMe(pred, null); // already gone
859 <            else if ((n = s.next) != null) {
860 <                if (n != s)
861 <                    pred.casNext(s, n);
862 <                casCleanMe(pred, null);
863 <            }
864 <            else
923 >    private void sweep() {
924 >        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
925 >            if (!s.isMatched())
926 >                // Unmatched nodes are never self-linked
927 >                p = s;
928 >            else if ((n = s.next) == null) // trailing node is pinned
929                  break;
930 <        }
931 <        return pred;
932 <    }
933 <
934 <    /**
871 <     * Main implementation of Iterator.remove(). Find
872 <     * and unsplice the given data node.
873 <     * @param possiblePred possible predecessor of s
874 <     * @param s the node to remove
875 <     */
876 <    final void findAndRemoveDataNode(Node possiblePred, Node s) {
877 <        assert s.isData;
878 <        if (s.tryMatchData()) {
879 <            if (possiblePred != null && possiblePred.next == s)
880 <                unsplice(possiblePred, s); // was actual predecessor
881 <            else {
882 <                for (Node pred = null, p = head; p != null; ) {
883 <                    if (p == s) {
884 <                        unsplice(pred, p);
885 <                        break;
886 <                    }
887 <                    if (p.isUnmatchedRequest())
888 <                        break;
889 <                    pred = p;
890 <                    if ((p = p.next) == pred) { // stale
891 <                        pred = null;
892 <                        p = head;
893 <                    }
894 <                }
895 <            }
930 >            else if (s == n)    // stale
931 >                // No need to also check for p == s, since that implies s == n
932 >                p = head;
933 >            else
934 >                p.casNext(s, n);
935          }
936      }
937  
# Line 971 | Line 1010 | public class LinkedTransferQueue<E> exte
1010       * Inserts the specified element at the tail of this queue.
1011       * As the queue is unbounded, this method will never return {@code false}.
1012       *
1013 <     * @return {@code true} (as specified by
975 <     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1013 >     * @return {@code true} (as specified by {@link Queue#offer})
1014       * @throws NullPointerException if the specified element is null
1015       */
1016      public boolean offer(E e) {
# Line 1041 | Line 1079 | public class LinkedTransferQueue<E> exte
1079       */
1080      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1081          throws InterruptedException {
1082 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1082 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1083              return true;
1084          if (!Thread.interrupted())
1085              return false;
# Line 1057 | Line 1095 | public class LinkedTransferQueue<E> exte
1095      }
1096  
1097      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1098 <        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1098 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1099          if (e != null || !Thread.interrupted())
1100              return e;
1101          throw new InterruptedException();
# Line 1130 | Line 1168 | public class LinkedTransferQueue<E> exte
1168       * @return {@code true} if this queue contains no elements
1169       */
1170      public boolean isEmpty() {
1171 <        return firstOfMode(true) == null;
1171 >        for (Node p = head; p != null; p = succ(p)) {
1172 >            if (!p.isMatched())
1173 >                return !p.isData;
1174 >        }
1175 >        return true;
1176      }
1177  
1178      public boolean hasWaitingConsumer() {
# Line 1173 | Line 1215 | public class LinkedTransferQueue<E> exte
1215      }
1216  
1217      /**
1218 +     * Returns {@code true} if this queue contains the specified element.
1219 +     * More formally, returns {@code true} if and only if this queue contains
1220 +     * at least one element {@code e} such that {@code o.equals(e)}.
1221 +     *
1222 +     * @param o object to be checked for containment in this queue
1223 +     * @return {@code true} if this queue contains the specified element
1224 +     */
1225 +    public boolean contains(Object o) {
1226 +        if (o == null) return false;
1227 +        for (Node p = head; p != null; p = succ(p)) {
1228 +            Object item = p.item;
1229 +            if (p.isData) {
1230 +                if (item != null && item != p && o.equals(item))
1231 +                    return true;
1232 +            }
1233 +            else if (item == null)
1234 +                break;
1235 +        }
1236 +        return false;
1237 +    }
1238 +
1239 +    /**
1240       * Always returns {@code Integer.MAX_VALUE} because a
1241       * {@code LinkedTransferQueue} is not capacity constrained.
1242       *
# Line 1224 | Line 1288 | public class LinkedTransferQueue<E> exte
1288          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1289      private static final long tailOffset =
1290          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1291 <    private static final long cleanMeOffset =
1292 <        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
1291 >    private static final long sweepVotesOffset =
1292 >        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1293  
1294      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1295                                    String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines