ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.53 by jsr166, Tue Oct 27 19:59:43 2009 UTC vs.
Revision 1.94 by jsr166, Sat Oct 3 18:17:51 2015 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.AbstractQueue;
10   import java.util.Collection;
13 import java.util.ConcurrentModificationException;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13   import java.util.Queue;
14 + import java.util.concurrent.TimeUnit;
15   import java.util.concurrent.locks.LockSupport;
16 +
17   /**
18   * An unbounded {@link TransferQueue} based on linked nodes.
19   * This queue orders elements FIFO (first-in-first-out) with respect
# Line 23 | Line 22 | import java.util.concurrent.locks.LockSu
22   * producer.  The <em>tail</em> of the queue is that element that has
23   * been on the queue the shortest time for some producer.
24   *
25 < * <p>Beware that, unlike in most collections, the {@code size}
26 < * method is <em>NOT</em> a constant-time operation. Because of the
25 > * <p>Beware that, unlike in most collections, the {@code size} method
26 > * is <em>NOT</em> a constant-time operation. Because of the
27   * asynchronous nature of these queues, determining the current number
28 < * of elements requires a traversal of the elements.
28 > * of elements requires a traversal of the elements, and so may report
29 > * inaccurate results if this collection is modified during traversal.
30 > * Additionally, the bulk operations {@code addAll},
31 > * {@code removeAll}, {@code retainAll}, {@code containsAll},
32 > * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
33 > * to be performed atomically. For example, an iterator operating
34 > * concurrently with an {@code addAll} operation might view only some
35 > * of the added elements.
36   *
37   * <p>This class and its iterator implement all of the
38   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 70 | Line 76 | public class LinkedTransferQueue<E> exte
76       *
77       * A FIFO dual queue may be implemented using a variation of the
78       * Michael & Scott (M&S) lock-free queue algorithm
79 <     * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
79 >     * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
80       * It maintains two pointer fields, "head", pointing to a
81       * (matched) node that in turn points to the first actual
82       * (unmatched) queue node (or null if empty); and "tail" that
# Line 206 | Line 212 | public class LinkedTransferQueue<E> exte
212       * additional GC bookkeeping ("write barriers") that are sometimes
213       * more costly than the writes themselves because of contention).
214       *
209     * Removal of interior nodes (due to timed out or interrupted
210     * waits, or calls to remove(x) or Iterator.remove) can use a
211     * scheme roughly similar to that described in Scherer, Lea, and
212     * Scott's SynchronousQueue. Given a predecessor, we can unsplice
213     * any node except the (actual) tail of the queue. To avoid
214     * build-up of cancelled trailing nodes, upon a request to remove
215     * a trailing node, it is placed in field "cleanMe" to be
216     * unspliced upon the next call to unsplice any other node.
217     * Situations needing such mechanics are not common but do occur
218     * in practice; for example when an unbounded series of short
219     * timed calls to poll repeatedly time out but never otherwise
220     * fall off the list because of an untimed call to take at the
221     * front of the queue. Note that maintaining field cleanMe does
222     * not otherwise much impact garbage retention even if never
223     * cleared by some other call because the held node will
224     * eventually either directly or indirectly lead to a self-link
225     * once off the list.
226     *
215       * *** Overview of implementation ***
216       *
217       * We use a threshold-based approach to updates, with a slack
# Line 239 | Line 227 | public class LinkedTransferQueue<E> exte
227       * per-thread one available, but even ThreadLocalRandom is too
228       * heavy for these purposes.
229       *
230 <     * With such a small slack threshold value, it is rarely
231 <     * worthwhile to augment this with path short-circuiting; i.e.,
232 <     * unsplicing nodes between head and the first unmatched node, or
233 <     * similarly for tail, rather than advancing head or tail
246 <     * proper. However, it is used (in awaitMatch) immediately before
247 <     * a waiting thread starts to block, as a final bit of helping at
248 <     * a point when contention with others is extremely unlikely
249 <     * (since if other threads that could release it are operating,
250 <     * then the current thread wouldn't be blocking).
230 >     * With such a small slack threshold value, it is not worthwhile
231 >     * to augment this with path short-circuiting (i.e., unsplicing
232 >     * interior nodes) except in the case of cancellation/removal (see
233 >     * below).
234       *
235       * We allow both the head and tail fields to be null before any
236       * nodes are enqueued; initializing upon first append.  This
# Line 318 | Line 301 | public class LinkedTransferQueue<E> exte
301       *    of less-contended queues.  During spins threads check their
302       *    interrupt status and generate a thread-local random number
303       *    to decide to occasionally perform a Thread.yield. While
304 <     *    yield has underdefined specs, we assume that might it help,
305 <     *    and will not hurt in limiting impact of spinning on busy
304 >     *    yield has underdefined specs, we assume that it might help,
305 >     *    and will not hurt, in limiting impact of spinning on busy
306       *    systems.  We also use smaller (1/2) spins for nodes that are
307       *    not known to be front but whose predecessors have not
308       *    blocked -- these "chained" spins avoid artifacts of
# Line 329 | Line 312 | public class LinkedTransferQueue<E> exte
312       *    versa) compared to their predecessors receive additional
313       *    chained spins, reflecting longer paths typically required to
314       *    unblock threads during phase changes.
315 +     *
316 +     *
317 +     * ** Unlinking removed interior nodes **
318 +     *
319 +     * In addition to minimizing garbage retention via self-linking
320 +     * described above, we also unlink removed interior nodes. These
321 +     * may arise due to timed out or interrupted waits, or calls to
322 +     * remove(x) or Iterator.remove.  Normally, given a node that was
323 +     * at one time known to be the predecessor of some node s that is
324 +     * to be removed, we can unsplice s by CASing the next field of
325 +     * its predecessor if it still points to s (otherwise s must
326 +     * already have been removed or is now offlist). But there are two
327 +     * situations in which we cannot guarantee to make node s
328 +     * unreachable in this way: (1) If s is the trailing node of list
329 +     * (i.e., with null next), then it is pinned as the target node
330 +     * for appends, so can only be removed later after other nodes are
331 +     * appended. (2) We cannot necessarily unlink s given a
332 +     * predecessor node that is matched (including the case of being
333 +     * cancelled): the predecessor may already be unspliced, in which
334 +     * case some previous reachable node may still point to s.
335 +     * (For further explanation see Herlihy & Shavit "The Art of
336 +     * Multiprocessor Programming" chapter 9).  Although, in both
337 +     * cases, we can rule out the need for further action if either s
338 +     * or its predecessor are (or can be made to be) at, or fall off
339 +     * from, the head of list.
340 +     *
341 +     * Without taking these into account, it would be possible for an
342 +     * unbounded number of supposedly removed nodes to remain
343 +     * reachable.  Situations leading to such buildup are uncommon but
344 +     * can occur in practice; for example when a series of short timed
345 +     * calls to poll repeatedly time out but never otherwise fall off
346 +     * the list because of an untimed call to take at the front of the
347 +     * queue.
348 +     *
349 +     * When these cases arise, rather than always retraversing the
350 +     * entire list to find an actual predecessor to unlink (which
351 +     * won't help for case (1) anyway), we record a conservative
352 +     * estimate of possible unsplice failures (in "sweepVotes").
353 +     * We trigger a full sweep when the estimate exceeds a threshold
354 +     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
355 +     * removal failures to tolerate before sweeping through, unlinking
356 +     * cancelled nodes that were not unlinked upon initial removal.
357 +     * We perform sweeps by the thread hitting threshold (rather than
358 +     * background threads or by spreading work to other threads)
359 +     * because in the main contexts in which removal occurs, the
360 +     * caller is already timed-out, cancelled, or performing a
361 +     * potentially O(n) operation (e.g. remove(x)), none of which are
362 +     * time-critical enough to warrant the overhead that alternatives
363 +     * would impose on other threads.
364 +     *
365 +     * Because the sweepVotes estimate is conservative, and because
366 +     * nodes become unlinked "naturally" as they fall off the head of
367 +     * the queue, and because we allow votes to accumulate even while
368 +     * sweeps are in progress, there are typically significantly fewer
369 +     * such nodes than estimated.  Choice of a threshold value
370 +     * balances the likelihood of wasted effort and contention, versus
371 +     * providing a worst-case bound on retention of interior nodes in
372 +     * quiescent queues. The value defined below was chosen
373 +     * empirically to balance these under various timeout scenarios.
374 +     *
375 +     * Note that we cannot self-link unlinked interior nodes during
376 +     * sweeps. However, the associated garbage chains terminate when
377 +     * some successor ultimately falls off the head of the list and is
378 +     * self-linked.
379       */
380  
381      /** True if on multiprocessor */
# Line 355 | Line 402 | public class LinkedTransferQueue<E> exte
402      private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
403  
404      /**
405 +     * The maximum number of estimated removal failures (sweepVotes)
406 +     * to tolerate before sweeping through the queue unlinking
407 +     * cancelled nodes that were not unlinked upon initial
408 +     * removal. See above for explanation. The value must be at least
409 +     * two to avoid useless sweeps when removing trailing nodes.
410 +     */
411 +    static final int SWEEP_THRESHOLD = 32;
412 +
413 +    /**
414       * Queue nodes. Uses Object, not E, for items to allow forgetting
415       * them after use.  Relies heavily on Unsafe mechanics to minimize
416 <     * unnecessary ordering constraints: Writes that intrinsically
417 <     * precede or follow CASes use simple relaxed forms.  Other
362 <     * cleanups use releasing/lazy writes.
416 >     * unnecessary ordering constraints: Writes that are intrinsically
417 >     * ordered wrt other accesses or CASes use simple relaxed forms.
418       */
419      static final class Node {
420          final boolean isData;   // false if this is a request node
# Line 373 | Line 428 | public class LinkedTransferQueue<E> exte
428          }
429  
430          final boolean casItem(Object cmp, Object val) {
431 +            // assert cmp == null || cmp.getClass() != Node.class;
432              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
433          }
434  
435          /**
436 <         * Creates a new node. Uses relaxed write because item can only
437 <         * be seen if followed by CAS.
436 >         * Constructs a new node.  Uses relaxed write because item can
437 >         * only be seen after publication via casNext.
438           */
439          Node(Object item, boolean isData) {
440              UNSAFE.putObject(this, itemOffset, item); // relaxed write
# Line 394 | Line 450 | public class LinkedTransferQueue<E> exte
450          }
451  
452          /**
453 <         * Sets item to self (using a releasing/lazy write) and waiter
454 <         * to null, to avoid garbage retention after extracting or
455 <         * cancelling.
453 >         * Sets item to self and waiter to null, to avoid garbage
454 >         * retention after matching or cancelling. Uses relaxed writes
455 >         * because order is already constrained in the only calling
456 >         * contexts: item is forgotten only after volatile/atomic
457 >         * mechanics that extract items.  Similarly, clearing waiter
458 >         * follows either CAS or return from park (if ever parked;
459 >         * else we don't care).
460           */
461          final void forgetContents() {
462 <            UNSAFE.putOrderedObject(this, itemOffset, this);
463 <            UNSAFE.putOrderedObject(this, waiterOffset, null);
462 >            UNSAFE.putObject(this, itemOffset, this);
463 >            UNSAFE.putObject(this, waiterOffset, null);
464          }
465  
466          /**
# Line 409 | Line 469 | public class LinkedTransferQueue<E> exte
469           */
470          final boolean isMatched() {
471              Object x = item;
472 <            return x == this || (x != null) != isData;
472 >            return (x == this) || ((x == null) == isData);
473 >        }
474 >
475 >        /**
476 >         * Returns true if this is an unmatched request node.
477 >         */
478 >        final boolean isUnmatchedRequest() {
479 >            return !isData && item == null;
480          }
481  
482          /**
# Line 427 | Line 494 | public class LinkedTransferQueue<E> exte
494           * Tries to artificially match a data node -- used by remove.
495           */
496          final boolean tryMatchData() {
497 +            // assert isData;
498              Object x = item;
499              if (x != null && x != this && casItem(x, null)) {
500                  LockSupport.unpark(waiter);
# Line 435 | Line 503 | public class LinkedTransferQueue<E> exte
503              return false;
504          }
505  
438        // Unsafe mechanics
439        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
440        private static final long nextOffset =
441            objectFieldOffset(UNSAFE, "next", Node.class);
442        private static final long itemOffset =
443            objectFieldOffset(UNSAFE, "item", Node.class);
444        private static final long waiterOffset =
445            objectFieldOffset(UNSAFE, "waiter", Node.class);
446
506          private static final long serialVersionUID = -3375979862319811754L;
507 +
508 +        // Unsafe mechanics
509 +        private static final sun.misc.Unsafe UNSAFE;
510 +        private static final long itemOffset;
511 +        private static final long nextOffset;
512 +        private static final long waiterOffset;
513 +        static {
514 +            try {
515 +                UNSAFE = getUnsafe();
516 +                Class<?> k = Node.class;
517 +                itemOffset = UNSAFE.objectFieldOffset
518 +                    (k.getDeclaredField("item"));
519 +                nextOffset = UNSAFE.objectFieldOffset
520 +                    (k.getDeclaredField("next"));
521 +                waiterOffset = UNSAFE.objectFieldOffset
522 +                    (k.getDeclaredField("waiter"));
523 +            } catch (Exception e) {
524 +                throw new Error(e);
525 +            }
526 +        }
527      }
528  
529      /** head of the queue; null until first enqueue */
530 <    private transient volatile Node head;
452 <
453 <    /** predecessor of dangling unspliceable node */
454 <    private transient volatile Node cleanMe; // decl here to reduce contention
530 >    transient volatile Node head;
531  
532      /** tail of the queue; null until first append */
533      private transient volatile Node tail;
534  
535 +    /** The number of apparent failures to unsplice removed nodes */
536 +    private transient volatile int sweepVotes;
537 +
538      // CAS methods for fields
539      private boolean casTail(Node cmp, Node val) {
540          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
# Line 465 | Line 544 | public class LinkedTransferQueue<E> exte
544          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
545      }
546  
547 <    private boolean casCleanMe(Node cmp, Node val) {
548 <        return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
547 >    private boolean casSweepVotes(int cmp, int val) {
548 >        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
549      }
550  
551      /*
552 <     * Possible values for "how" argument in xfer method. Beware that
474 <     * the order of assigned numerical values matters.
552 >     * Possible values for "how" argument in xfer method.
553       */
554 <    private static final int NOW     = 0; // for untimed poll, tryTransfer
555 <    private static final int ASYNC   = 1; // for offer, put, add
556 <    private static final int SYNC    = 2; // for transfer, take
557 <    private static final int TIMEOUT = 3; // for timed poll, tryTransfer
554 >    private static final int NOW   = 0; // for untimed poll, tryTransfer
555 >    private static final int ASYNC = 1; // for offer, put, add
556 >    private static final int SYNC  = 2; // for transfer, take
557 >    private static final int TIMED = 3; // for timed poll, tryTransfer
558 >
559 >    @SuppressWarnings("unchecked")
560 >    static <E> E cast(Object item) {
561 >        // assert item == null || item.getClass() != Node.class;
562 >        return (E) item;
563 >    }
564  
565      /**
566       * Implements all queuing methods. See above for explanation.
567       *
568       * @param e the item or null for take
569       * @param haveData true if this is a put, else a take
570 <     * @param how NOW, ASYNC, SYNC, or TIMEOUT
571 <     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
570 >     * @param how NOW, ASYNC, SYNC, or TIMED
571 >     * @param nanos timeout in nanosecs, used only if mode is TIMED
572       * @return an item if matched, else e
573       * @throws NullPointerException if haveData mode but e is null
574       */
575 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
575 >    private E xfer(E e, boolean haveData, int how, long nanos) {
576          if (haveData && (e == null))
577              throw new NullPointerException();
578          Node s = null;                        // the node to append, if needed
579  
580 <        retry: for (;;) {                     // restart on append race
580 >        retry:
581 >        for (;;) {                            // restart on append race
582  
583              for (Node h = head, p = h; p != null;) { // find & match first node
584                  boolean isData = p.isData;
# Line 503 | Line 588 | public class LinkedTransferQueue<E> exte
588                          break;
589                      if (p.casItem(item, e)) { // match
590                          for (Node q = p; q != h;) {
591 <                            Node n = q.next;  // update head by 2
592 <                            if (n != null)    // unless singleton
508 <                                q = n;
509 <                            if (head == h && casHead(h, q)) {
591 >                            Node n = q.next;  // update by 2 unless singleton
592 >                            if (head == h && casHead(h, n == null ? q : n)) {
593                                  h.forgetNext();
594                                  break;
595                              }                 // advance and retry
# Line 515 | Line 598 | public class LinkedTransferQueue<E> exte
598                                  break;        // unless slack < 2
599                          }
600                          LockSupport.unpark(p.waiter);
601 <                        return item;
601 >                        return LinkedTransferQueue.<E>cast(item);
602                      }
603                  }
604                  Node n = p.next;
605                  p = (p != n) ? n : (h = head); // Use head if p offlist
606              }
607  
608 <            if (how >= ASYNC) {               // No matches available
608 >            if (how != NOW) {                 // No matches available
609                  if (s == null)
610                      s = new Node(e, haveData);
611                  Node pred = tryAppend(s, haveData);
612                  if (pred == null)
613                      continue retry;           // lost race vs opposite mode
614 <                if (how >= SYNC)
615 <                    return awaitMatch(s, pred, e, how, nanos);
614 >                if (how != ASYNC)
615 >                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
616              }
617              return e; // not waiting
618          }
# Line 545 | Line 628 | public class LinkedTransferQueue<E> exte
628       * predecessor
629       */
630      private Node tryAppend(Node s, boolean haveData) {
631 <        for (Node t = tail, p = t;;) { // move p to last node and append
631 >        for (Node t = tail, p = t;;) {        // move p to last node and append
632              Node n, u;                        // temps for reads of next & tail
633              if (p == null && (p = head) == null) {
634                  if (casHead(null, s))
# Line 578 | Line 661 | public class LinkedTransferQueue<E> exte
661       * predecessor, or null if unknown (the null case does not occur
662       * in any current calls but may in possible future extensions)
663       * @param e the comparison value for checking match
664 <     * @param how either SYNC or TIMEOUT
665 <     * @param nanos timeout value
664 >     * @param timed if true, wait only until timeout elapses
665 >     * @param nanos timeout in nanosecs, used only if timed is true
666       * @return matched item, or e if unmatched on interrupt or timeout
667       */
668 <    private Object awaitMatch(Node s, Node pred, Object e,
669 <                              int how, long nanos) {
587 <        long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
668 >    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
669 >        long lastTime = timed ? System.nanoTime() : 0L;
670          Thread w = Thread.currentThread();
671          int spins = -1; // initialized after first item and cancel checks
672          ThreadLocalRandom randomYields = null; // bound if needed
# Line 592 | Line 674 | public class LinkedTransferQueue<E> exte
674          for (;;) {
675              Object item = s.item;
676              if (item != e) {                  // matched
677 +                // assert item != s;
678                  s.forgetContents();           // avoid garbage
679 <                return item;
679 >                return LinkedTransferQueue.<E>cast(item);
680              }
681 <            if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
682 <                     s.casItem(e, s)) {       // cancel
681 >            if ((w.isInterrupted() || (timed && nanos <= 0L)) &&
682 >                    s.casItem(e, s)) {        // cancel
683                  unsplice(pred, s);
684                  return e;
685              }
# Line 606 | Line 689 | public class LinkedTransferQueue<E> exte
689                      randomYields = ThreadLocalRandom.current();
690              }
691              else if (spins > 0) {             // spin
692 <                if (--spins == 0)
693 <                    shortenHeadPath();        // reduce slack before blocking
611 <                else if (randomYields.nextInt(CHAINED_SPINS) == 0)
692 >                --spins;
693 >                if (randomYields.nextInt(CHAINED_SPINS) == 0)
694                      Thread.yield();           // occasionally yield
695              }
696              else if (s.waiter == null) {
697                  s.waiter = w;                 // request unpark then recheck
698              }
699 <            else if (how == TIMEOUT) {
699 >            else if (timed) {
700                  long now = System.nanoTime();
701                  if ((nanos -= now - lastTime) > 0)
702                      LockSupport.parkNanos(this, nanos);
# Line 622 | Line 704 | public class LinkedTransferQueue<E> exte
704              }
705              else {
706                  LockSupport.park(this);
625                s.waiter = null;
626                spins = -1;                   // spin if front upon wakeup
707              }
708          }
709      }
# Line 644 | Line 724 | public class LinkedTransferQueue<E> exte
724          return 0;
725      }
726  
727 +    /* -------------- Traversal methods -------------- */
728 +
729      /**
730 <     * Tries (once) to unsplice nodes between head and first unmatched
731 <     * or trailing node; failing on contention.
732 <     */
733 <    private void shortenHeadPath() {
734 <        Node h, hn, p, q;
735 <        if ((p = h = head) != null && h.isMatched() &&
736 <            (q = hn = h.next) != null) {
655 <            Node n;
656 <            while ((n = q.next) != q) {
657 <                if (n == null || !q.isMatched()) {
658 <                    if (hn != q && h.next == hn)
659 <                        h.casNext(hn, q);
660 <                    break;
661 <                }
662 <                p = q;
663 <                q = n;
664 <            }
665 <        }
730 >     * Returns the successor of p, or the head node if p.next has been
731 >     * linked to self, which will only be true if traversing with a
732 >     * stale pointer that is now off the list.
733 >     */
734 >    final Node succ(Node p) {
735 >        Node next = p.next;
736 >        return (p == next) ? head : next;
737      }
738  
668    /* -------------- Traversal methods -------------- */
669
739      /**
740       * Returns the first unmatched node of the given mode, or null if
741       * none.  Used by methods isEmpty, hasWaitingConsumer.
742       */
743 <    private Node firstOfMode(boolean data) {
744 <        for (Node p = head; p != null; ) {
743 >    private Node firstOfMode(boolean isData) {
744 >        for (Node p = head; p != null; p = succ(p)) {
745              if (!p.isMatched())
746 <                return (p.isData == data) ? p : null;
678 <            Node n = p.next;
679 <            p = (n != p) ? n : head;
746 >                return (p.isData == isData) ? p : null;
747          }
748          return null;
749      }
750  
751      /**
752       * Returns the item in the first unmatched node with isData; or
753 <     * null if none. Used by peek.
753 >     * null if none.  Used by peek.
754       */
755 <    private Object firstDataItem() {
756 <        for (Node p = head; p != null; ) {
690 <            boolean isData = p.isData;
755 >    private E firstDataItem() {
756 >        for (Node p = head; p != null; p = succ(p)) {
757              Object item = p.item;
758 <            if (item != p && (item != null) == isData)
759 <                return isData ? item : null;
760 <            Node n = p.next;
761 <            p = (n != p) ? n : head;
758 >            if (p.isData) {
759 >                if (item != null && item != p)
760 >                    return LinkedTransferQueue.<E>cast(item);
761 >            }
762 >            else if (item == null)
763 >                return null;
764          }
765          return null;
766      }
# Line 723 | Line 791 | public class LinkedTransferQueue<E> exte
791  
792      final class Itr implements Iterator<E> {
793          private Node nextNode;   // next node to return item for
794 <        private Object nextItem; // the corresponding item
794 >        private E nextItem;      // the corresponding item
795          private Node lastRet;    // last returned node, to support remove
796 +        private Node lastPred;   // predecessor to unlink lastRet
797  
798          /**
799           * Moves to next node after prev, or first node if prev null.
800           */
801          private void advance(Node prev) {
802 <            lastRet = prev;
803 <            Node p;
804 <            if (prev == null || (p = prev.next) == prev)
805 <                p = head;
806 <            while (p != null) {
807 <                Object item = p.item;
808 <                if (p.isData) {
809 <                    if (item != null && item != p) {
810 <                        nextItem = item;
811 <                        nextNode = p;
802 >            /*
803 >             * To track and avoid buildup of deleted nodes in the face
804 >             * of calls to both Queue.remove and Itr.remove, we must
805 >             * include variants of unsplice and sweep upon each
806 >             * advance: Upon Itr.remove, we may need to catch up links
807 >             * from lastPred, and upon other removes, we might need to
808 >             * skip ahead from stale nodes and unsplice deleted ones
809 >             * found while advancing.
810 >             */
811 >
812 >            Node r, b; // reset lastPred upon possible deletion of lastRet
813 >            if ((r = lastRet) != null && !r.isMatched())
814 >                lastPred = r;    // next lastPred is old lastRet
815 >            else if ((b = lastPred) == null || b.isMatched())
816 >                lastPred = null; // at start of list
817 >            else {
818 >                Node s, n;       // help with removal of lastPred.next
819 >                while ((s = b.next) != null &&
820 >                       s != b && s.isMatched() &&
821 >                       (n = s.next) != null && n != s)
822 >                    b.casNext(s, n);
823 >            }
824 >
825 >            this.lastRet = prev;
826 >
827 >            for (Node p = prev, s, n;;) {
828 >                s = (p == null) ? head : p.next;
829 >                if (s == null)
830 >                    break;
831 >                else if (s == p) {
832 >                    p = null;
833 >                    continue;
834 >                }
835 >                Object item = s.item;
836 >                if (s.isData) {
837 >                    if (item != null && item != s) {
838 >                        nextItem = LinkedTransferQueue.<E>cast(item);
839 >                        nextNode = s;
840                          return;
841                      }
842                  }
843                  else if (item == null)
844                      break;
845 <                Node n = p.next;
846 <                p = (n != p) ? n : head;
845 >                // assert s.isMatched();
846 >                if (p == null)
847 >                    p = s;
848 >                else if ((n = s.next) == null)
849 >                    break;
850 >                else if (s == n)
851 >                    p = null;
852 >                else
853 >                    p.casNext(s, n);
854              }
855              nextNode = null;
856 +            nextItem = null;
857          }
858  
859          Itr() {
# Line 762 | Line 867 | public class LinkedTransferQueue<E> exte
867          public final E next() {
868              Node p = nextNode;
869              if (p == null) throw new NoSuchElementException();
870 <            Object e = nextItem;
870 >            E e = nextItem;
871              advance(p);
872 <            return (E) e;
872 >            return e;
873          }
874  
875          public final void remove() {
876 <            Node p = lastRet;
877 <            if (p == null) throw new IllegalStateException();
878 <            lastRet = null;
879 <            findAndRemoveNode(p);
876 >            final Node lastRet = this.lastRet;
877 >            if (lastRet == null)
878 >                throw new IllegalStateException();
879 >            this.lastRet = null;
880 >            if (lastRet.tryMatchData())
881 >                unsplice(lastPred, lastRet);
882          }
883      }
884  
# Line 781 | Line 888 | public class LinkedTransferQueue<E> exte
888       * Unsplices (now or later) the given deleted/cancelled node with
889       * the given predecessor.
890       *
891 <     * @param pred predecessor of node to be unspliced
891 >     * @param pred a node that was at one time known to be the
892 >     * predecessor of s, or null or s itself if s is/was at head
893       * @param s the node to be unspliced
894       */
895 <    private void unsplice(Node pred, Node s) {
896 <        s.forgetContents(); // clear unneeded fields
895 >    final void unsplice(Node pred, Node s) {
896 >        s.forgetContents(); // forget unneeded fields
897          /*
898 <         * At any given time, exactly one node on list cannot be
899 <         * unlinked -- the last inserted node. To accommodate this, if
900 <         * we cannot unlink s, we save its predecessor as "cleanMe",
901 <         * processing the previously saved version first. Because only
902 <         * one node in the list can have a null next, at least one of
795 <         * node s or the node previously saved can always be
796 <         * processed, so this always terminates.
898 >         * See above for rationale. Briefly: if pred still points to
899 >         * s, try to unlink s.  If s cannot be unlinked, because it is
900 >         * trailing node or pred might be unlinked, and neither pred
901 >         * nor s are head or offlist, add to sweepVotes, and if enough
902 >         * votes have accumulated, sweep.
903           */
904 <        if (pred != null && pred != s) {
905 <            while (pred.next == s) {
906 <                Node oldpred = (cleanMe == null) ? null : reclean();
907 <                Node n = s.next;
908 <                if (n != null) {
909 <                    if (n != s)
910 <                        pred.casNext(s, n);
911 <                    break;
904 >        if (pred != null && pred != s && pred.next == s) {
905 >            Node n = s.next;
906 >            if (n == null ||
907 >                (n != s && pred.casNext(s, n) && pred.isMatched())) {
908 >                for (;;) {               // check if at, or could be, head
909 >                    Node h = head;
910 >                    if (h == pred || h == s || h == null)
911 >                        return;          // at head or list empty
912 >                    if (!h.isMatched())
913 >                        break;
914 >                    Node hn = h.next;
915 >                    if (hn == null)
916 >                        return;          // now empty
917 >                    if (hn != h && casHead(h, hn))
918 >                        h.forgetNext();  // advance head
919 >                }
920 >                if (pred.next != pred && s.next != s) { // recheck if offlist
921 >                    for (;;) {           // sweep now if enough votes
922 >                        int v = sweepVotes;
923 >                        if (v < SWEEP_THRESHOLD) {
924 >                            if (casSweepVotes(v, v + 1))
925 >                                break;
926 >                        }
927 >                        else if (casSweepVotes(v, 0)) {
928 >                            sweep();
929 >                            break;
930 >                        }
931 >                    }
932                  }
807                if (oldpred == pred ||      // Already saved
808                    (oldpred == null && casCleanMe(null, pred)))
809                    break;                  // Postpone cleaning
933              }
934          }
935      }
936  
937      /**
938 <     * Tries to unsplice the deleted/cancelled node held in cleanMe
939 <     * that was previously uncleanable because it was at tail.
817 <     *
818 <     * @return current cleanMe node (or null)
938 >     * Unlinks matched (typically cancelled) nodes encountered in a
939 >     * traversal from head.
940       */
941 <    private Node reclean() {
942 <        /*
943 <         * cleanMe is, or at one time was, predecessor of a cancelled
944 <         * node s that was the tail so could not be unspliced.  If it
945 <         * is no longer the tail, try to unsplice if necessary and
946 <         * make cleanMe slot available.  This differs from similar
826 <         * code in unsplice() because we must check that pred still
827 <         * points to a matched node that can be unspliced -- if not,
828 <         * we can (must) clear cleanMe without unsplicing.  This can
829 <         * loop only due to contention.
830 <         */
831 <        Node pred;
832 <        while ((pred = cleanMe) != null) {
833 <            Node s = pred.next;
834 <            Node n;
835 <            if (s == null || s == pred || !s.isMatched())
836 <                casCleanMe(pred, null); // already gone
837 <            else if ((n = s.next) != null) {
838 <                if (n != s)
839 <                    pred.casNext(s, n);
840 <                casCleanMe(pred, null);
841 <            }
842 <            else
941 >    private void sweep() {
942 >        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
943 >            if (!s.isMatched())
944 >                // Unmatched nodes are never self-linked
945 >                p = s;
946 >            else if ((n = s.next) == null) // trailing node is pinned
947                  break;
948 <        }
949 <        return pred;
950 <    }
951 <
952 <    /**
849 <     * Main implementation of Iterator.remove(). Find
850 <     * and unsplice the given node.
851 <     */
852 <    final void findAndRemoveNode(Node s) {
853 <        if (s.tryMatchData()) {
854 <            Node pred = null;
855 <            Node p = head;
856 <            while (p != null) {
857 <                if (p == s) {
858 <                    unsplice(pred, p);
859 <                    break;
860 <                }
861 <                if (!p.isData && !p.isMatched())
862 <                    break;
863 <                pred = p;
864 <                if ((p = p.next) == pred) { // stale
865 <                    pred = null;
866 <                    p = head;
867 <                }
868 <            }
948 >            else if (s == n)    // stale
949 >                // No need to also check for p == s, since that implies s == n
950 >                p = head;
951 >            else
952 >                p.casNext(s, n);
953          }
954      }
955  
# Line 874 | Line 958 | public class LinkedTransferQueue<E> exte
958       */
959      private boolean findAndRemove(Object e) {
960          if (e != null) {
961 <            Node pred = null;
878 <            Node p = head;
879 <            while (p != null) {
961 >            for (Node pred = null, p = head; p != null; ) {
962                  Object item = p.item;
963                  if (p.isData) {
964                      if (item != null && item != p && e.equals(item) &&
# Line 888 | Line 970 | public class LinkedTransferQueue<E> exte
970                  else if (item == null)
971                      break;
972                  pred = p;
973 <                if ((p = p.next) == pred) {
973 >                if ((p = p.next) == pred) { // stale
974                      pred = null;
975                      p = head;
976                  }
# Line 934 | Line 1016 | public class LinkedTransferQueue<E> exte
1016       * return {@code false}.
1017       *
1018       * @return {@code true} (as specified by
1019 <     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1019 >     *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1020 >     *  BlockingQueue.offer})
1021       * @throws NullPointerException if the specified element is null
1022       */
1023      public boolean offer(E e, long timeout, TimeUnit unit) {
# Line 946 | Line 1029 | public class LinkedTransferQueue<E> exte
1029       * Inserts the specified element at the tail of this queue.
1030       * As the queue is unbounded, this method will never return {@code false}.
1031       *
1032 <     * @return {@code true} (as specified by
950 <     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1032 >     * @return {@code true} (as specified by {@link Queue#offer})
1033       * @throws NullPointerException if the specified element is null
1034       */
1035      public boolean offer(E e) {
# Line 1016 | Line 1098 | public class LinkedTransferQueue<E> exte
1098       */
1099      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1100          throws InterruptedException {
1101 <        if (xfer(e, true, TIMEOUT, unit.toNanos(timeout)) == null)
1101 >        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1102              return true;
1103          if (!Thread.interrupted())
1104              return false;
# Line 1024 | Line 1106 | public class LinkedTransferQueue<E> exte
1106      }
1107  
1108      public E take() throws InterruptedException {
1109 <        Object e = xfer(null, false, SYNC, 0);
1109 >        E e = xfer(null, false, SYNC, 0);
1110          if (e != null)
1111 <            return (E)e;
1111 >            return e;
1112          Thread.interrupted();
1113          throw new InterruptedException();
1114      }
1115  
1116      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1117 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1117 >        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1118          if (e != null || !Thread.interrupted())
1119 <            return (E)e;
1119 >            return e;
1120          throw new InterruptedException();
1121      }
1122  
1123      public E poll() {
1124 <        return (E)xfer(null, false, NOW, 0);
1124 >        return xfer(null, false, NOW, 0);
1125      }
1126  
1127      /**
# Line 1052 | Line 1134 | public class LinkedTransferQueue<E> exte
1134          if (c == this)
1135              throw new IllegalArgumentException();
1136          int n = 0;
1137 <        E e;
1056 <        while ( (e = poll()) != null) {
1137 >        for (E e; (e = poll()) != null;) {
1138              c.add(e);
1139              ++n;
1140          }
# Line 1070 | Line 1151 | public class LinkedTransferQueue<E> exte
1151          if (c == this)
1152              throw new IllegalArgumentException();
1153          int n = 0;
1154 <        E e;
1074 <        while (n < maxElements && (e = poll()) != null) {
1154 >        for (E e; n < maxElements && (e = poll()) != null;) {
1155              c.add(e);
1156              ++n;
1157          }
# Line 1079 | Line 1159 | public class LinkedTransferQueue<E> exte
1159      }
1160  
1161      /**
1162 <     * Returns an iterator over the elements in this queue in proper
1163 <     * sequence, from head to tail.
1162 >     * Returns an iterator over the elements in this queue in proper sequence.
1163 >     * The elements will be returned in order from first (head) to last (tail).
1164       *
1165       * <p>The returned iterator is a "weakly consistent" iterator that
1166 <     * will never throw
1167 <     * {@link ConcurrentModificationException ConcurrentModificationException},
1168 <     * and guarantees to traverse elements as they existed upon
1169 <     * construction of the iterator, and may (but is not guaranteed
1170 <     * to) reflect any modifications subsequent to construction.
1166 >     * will never throw {@link java.util.ConcurrentModificationException
1167 >     * ConcurrentModificationException}, and guarantees to traverse
1168 >     * elements as they existed upon construction of the iterator, and
1169 >     * may (but is not guaranteed to) reflect any modifications
1170 >     * subsequent to construction.
1171       *
1172       * @return an iterator over the elements in this queue in proper sequence
1173       */
# Line 1096 | Line 1176 | public class LinkedTransferQueue<E> exte
1176      }
1177  
1178      public E peek() {
1179 <        return (E) firstDataItem();
1179 >        return firstDataItem();
1180      }
1181  
1182      /**
# Line 1105 | Line 1185 | public class LinkedTransferQueue<E> exte
1185       * @return {@code true} if this queue contains no elements
1186       */
1187      public boolean isEmpty() {
1188 <        return firstOfMode(true) == null;
1188 >        for (Node p = head; p != null; p = succ(p)) {
1189 >            if (!p.isMatched())
1190 >                return !p.isData;
1191 >        }
1192 >        return true;
1193      }
1194  
1195      public boolean hasWaitingConsumer() {
# Line 1148 | Line 1232 | public class LinkedTransferQueue<E> exte
1232      }
1233  
1234      /**
1235 +     * Returns {@code true} if this queue contains the specified element.
1236 +     * More formally, returns {@code true} if and only if this queue contains
1237 +     * at least one element {@code e} such that {@code o.equals(e)}.
1238 +     *
1239 +     * @param o object to be checked for containment in this queue
1240 +     * @return {@code true} if this queue contains the specified element
1241 +     */
1242 +    public boolean contains(Object o) {
1243 +        if (o == null) return false;
1244 +        for (Node p = head; p != null; p = succ(p)) {
1245 +            Object item = p.item;
1246 +            if (p.isData) {
1247 +                if (item != null && item != p && o.equals(item))
1248 +                    return true;
1249 +            }
1250 +            else if (item == null)
1251 +                break;
1252 +        }
1253 +        return false;
1254 +    }
1255 +
1256 +    /**
1257       * Always returns {@code Integer.MAX_VALUE} because a
1258       * {@code LinkedTransferQueue} is not capacity constrained.
1259       *
1260       * @return {@code Integer.MAX_VALUE} (as specified by
1261 <     *         {@link BlockingQueue#remainingCapacity()})
1261 >     *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1262 >     *         BlockingQueue.remainingCapacity})
1263       */
1264      public int remainingCapacity() {
1265          return Integer.MAX_VALUE;
# Line 1184 | Line 1291 | public class LinkedTransferQueue<E> exte
1291          throws java.io.IOException, ClassNotFoundException {
1292          s.defaultReadObject();
1293          for (;;) {
1294 <            @SuppressWarnings("unchecked") E item = (E) s.readObject();
1294 >            @SuppressWarnings("unchecked")
1295 >            E item = (E) s.readObject();
1296              if (item == null)
1297                  break;
1298              else
# Line 1194 | Line 1302 | public class LinkedTransferQueue<E> exte
1302  
1303      // Unsafe mechanics
1304  
1305 <    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1306 <    private static final long headOffset =
1307 <        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1308 <    private static final long tailOffset =
1309 <        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1202 <    private static final long cleanMeOffset =
1203 <        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
1204 <
1205 <    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1206 <                                  String field, Class<?> klazz) {
1305 >    private static final sun.misc.Unsafe UNSAFE;
1306 >    private static final long headOffset;
1307 >    private static final long tailOffset;
1308 >    private static final long sweepVotesOffset;
1309 >    static {
1310          try {
1311 <            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1312 <        } catch (NoSuchFieldException e) {
1313 <            // Convert Exception to corresponding Error
1314 <            NoSuchFieldError error = new NoSuchFieldError(field);
1315 <            error.initCause(e);
1316 <            throw error;
1311 >            UNSAFE = getUnsafe();
1312 >            Class<?> k = LinkedTransferQueue.class;
1313 >            headOffset = UNSAFE.objectFieldOffset
1314 >                (k.getDeclaredField("head"));
1315 >            tailOffset = UNSAFE.objectFieldOffset
1316 >                (k.getDeclaredField("tail"));
1317 >            sweepVotesOffset = UNSAFE.objectFieldOffset
1318 >                (k.getDeclaredField("sweepVotes"));
1319 >        } catch (Exception e) {
1320 >            throw new Error(e);
1321          }
1322      }
1323  
# Line 1221 | Line 1328 | public class LinkedTransferQueue<E> exte
1328       *
1329       * @return a sun.misc.Unsafe
1330       */
1331 <    private static sun.misc.Unsafe getUnsafe() {
1331 >    static sun.misc.Unsafe getUnsafe() {
1332          try {
1333              return sun.misc.Unsafe.getUnsafe();
1334 <        } catch (SecurityException se) {
1335 <            try {
1336 <                return java.security.AccessController.doPrivileged
1337 <                    (new java.security
1338 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1339 <                        public sun.misc.Unsafe run() throws Exception {
1340 <                            java.lang.reflect.Field f = sun.misc
1341 <                                .Unsafe.class.getDeclaredField("theUnsafe");
1342 <                            f.setAccessible(true);
1343 <                            return (sun.misc.Unsafe) f.get(null);
1344 <                        }});
1345 <            } catch (java.security.PrivilegedActionException e) {
1346 <                throw new RuntimeException("Could not initialize intrinsics",
1347 <                                           e.getCause());
1348 <            }
1334 >        } catch (SecurityException tryReflectionInstead) {}
1335 >        try {
1336 >            return java.security.AccessController.doPrivileged
1337 >            (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
1338 >                public sun.misc.Unsafe run() throws Exception {
1339 >                    Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
1340 >                    for (java.lang.reflect.Field f : k.getDeclaredFields()) {
1341 >                        f.setAccessible(true);
1342 >                        Object x = f.get(null);
1343 >                        if (k.isInstance(x))
1344 >                            return k.cast(x);
1345 >                    }
1346 >                    throw new NoSuchFieldError("the Unsafe");
1347 >                }});
1348 >        } catch (java.security.PrivilegedActionException e) {
1349 >            throw new RuntimeException("Could not initialize intrinsics",
1350 >                                       e.getCause());
1351          }
1352      }
1244
1353   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines