ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.49 by jsr166, Thu Oct 22 15:58:44 2009 UTC vs.
Revision 1.59 by jsr166, Thu Oct 29 00:29:16 2009 UTC

# Line 161 | Line 161 | public class LinkedTransferQueue<E> exte
161       * targets.  Even when using very small slack values, this
162       * approach works well for dual queues because it allows all
163       * operations up to the point of matching or appending an item
164 <     * (hence potentially releasing another thread) to be read-only,
165 <     * thus not introducing any further contention. As described
166 <     * below, we implement this by performing slack maintenance
167 <     * retries only after these points.
164 >     * (hence potentially allowing progress by another thread) to be
165 >     * read-only, thus not introducing any further contention. As
166 >     * described below, we implement this by performing slack
167 >     * maintenance retries only after these points.
168       *
169       * As an accompaniment to such techniques, traversal overhead can
170       * be further reduced without increasing contention of head
171 <     * pointer updates.  During traversals, threads may sometimes
172 <     * shortcut the "next" link path from the current "head" node to
173 <     * be closer to the currently known first unmatched node. Again,
174 <     * this may be triggered with using thresholds or randomization.
171 >     * pointer updates: Threads may sometimes shortcut the "next" link
172 >     * path from the current "head" node to be closer to the currently
173 >     * known first unmatched node, and similarly for tail. Again, this
174 >     * may be triggered with using thresholds or randomization.
175       *
176       * These ideas must be further extended to avoid unbounded amounts
177       * of costly-to-reclaim garbage caused by the sequential "next"
# Line 199 | Line 199 | public class LinkedTransferQueue<E> exte
199       * mechanics because an update may leave head at a detached node.
200       * And while direct writes are possible for tail updates, they
201       * increase the risk of long retraversals, and hence long garbage
202 <     * chains which can be much more costly than is worthwhile
202 >     * chains, which can be much more costly than is worthwhile
203       * considering that the cost difference of performing a CAS vs
204       * write is smaller when they are not triggered on each operation
205       * (especially considering that writes and CASes equally require
# Line 207 | Line 207 | public class LinkedTransferQueue<E> exte
207       * more costly than the writes themselves because of contention).
208       *
209       * Removal of interior nodes (due to timed out or interrupted
210 <     * waits, or calls to remove or Iterator.remove) uses a scheme
211 <     * roughly similar to that in Scherer, Lea, and Scott's
212 <     * SynchronousQueue. Given a predecessor, we can unsplice any node
213 <     * except the (actual) tail of the queue. To avoid build-up of
214 <     * cancelled trailing nodes, upon a request to remove a trailing
215 <     * node, it is placed in field "cleanMe" to be unspliced upon the
216 <     * next call to unsplice any other node.  Situations needing such
217 <     * mechanics are not common but do occur in practice; for example
218 <     * when an unbounded series of short timed calls to poll
219 <     * repeatedly time out but never otherwise fall off the list
220 <     * because of an untimed call to take at the front of the
221 <     * queue. (Note that maintaining field cleanMe does not otherwise
222 <     * much impact garbage retention even if never cleared by some
223 <     * other call because the held node will eventually either
224 <     * directly or indirectly lead to a self-link once off the list.)
210 >     * waits, or calls to remove(x) or Iterator.remove) can use a
211 >     * scheme roughly similar to that described in Scherer, Lea, and
212 >     * Scott's SynchronousQueue. Given a predecessor, we can unsplice
213 >     * any node except the (actual) tail of the queue. To avoid
214 >     * build-up of cancelled trailing nodes, upon a request to remove
215 >     * a trailing node, it is placed in field "cleanMe" to be
216 >     * unspliced upon the next call to unsplice any other node.
217 >     * Situations needing such mechanics are not common but do occur
218 >     * in practice; for example when an unbounded series of short
219 >     * timed calls to poll repeatedly time out but never otherwise
220 >     * fall off the list because of an untimed call to take at the
221 >     * front of the queue. Note that maintaining field cleanMe does
222 >     * not otherwise much impact garbage retention even if never
223 >     * cleared by some other call because the held node will
224 >     * eventually either directly or indirectly lead to a self-link
225 >     * once off the list.
226       *
227       * *** Overview of implementation ***
228       *
229 <     * We use a threshold-based approach to updates, with a target
230 <     * slack of two.  The slack value is hard-wired: a path greater
229 >     * We use a threshold-based approach to updates, with a slack
230 >     * threshold of two -- that is, we update head/tail when the
231 >     * current pointer appears to be two or more steps away from the
232 >     * first/last node. The slack value is hard-wired: a path greater
233       * than one is naturally implemented by checking equality of
234       * traversal pointers except when the list has only one element,
235 <     * in which case we keep target slack at one. Avoiding tracking
235 >     * in which case we keep slack threshold at one. Avoiding tracking
236       * explicit counts across method calls slightly simplifies an
237       * already-messy implementation. Using randomization would
238       * probably work better if there were a low-quality dirt-cheap
239       * per-thread one available, but even ThreadLocalRandom is too
240       * heavy for these purposes.
241       *
242 <     * With such a small target slack value, it is rarely worthwhile
243 <     * to augment this with path short-circuiting; i.e., unsplicing
244 <     * nodes between head and the first unmatched node, or similarly
245 <     * for tail, rather than advancing head or tail proper. However,
246 <     * it is used (in awaitMatch) immediately before a waiting thread
247 <     * starts to block, as a final bit of helping at a point when
248 <     * contention with others is extremely unlikely (since if other
249 <     * threads that could release it are operating, then the current
250 <     * thread wouldn't be blocking).
242 >     * With such a small slack threshold value, it is rarely
243 >     * worthwhile to augment this with path short-circuiting; i.e.,
244 >     * unsplicing nodes between head and the first unmatched node, or
245 >     * similarly for tail, rather than advancing head or tail
246 >     * proper. However, it is used (in awaitMatch) immediately before
247 >     * a waiting thread starts to block, as a final bit of helping at
248 >     * a point when contention with others is extremely unlikely
249 >     * (since if other threads that could release it are operating,
250 >     * then the current thread wouldn't be blocking).
251       *
252       * We allow both the head and tail fields to be null before any
253       * nodes are enqueued; initializing upon first append.  This
# Line 260 | Line 263 | public class LinkedTransferQueue<E> exte
263       * of offer, put, poll, take, or transfer (each possibly with
264       * timeout). The relative complexity of using one monolithic
265       * method outweighs the code bulk and maintenance problems of
266 <     * using nine separate methods.
266 >     * using separate methods for each case.
267       *
268       * Operation consists of up to three phases. The first is
269       * implemented within method xfer, the second in tryAppend, and
# Line 285 | Line 288 | public class LinkedTransferQueue<E> exte
288       *
289       * 2. Try to append a new node (method tryAppend)
290       *
291 <     *    Starting at current tail pointer, try to append a new node
292 <     *    to the list (or if head was null, establish the first
293 <     *    node). Nodes can be appended only if their predecessors are
294 <     *    either already matched or are of the same mode. If we detect
295 <     *    otherwise, then a new node with opposite mode must have been
296 <     *    appended during traversal, so must restart at phase 1. The
297 <     *    traversal and update steps are otherwise similar to phase 1:
298 <     *    Retrying upon CAS misses and checking for staleness.  In
299 <     *    particular, if a self-link is encountered, then we can
300 <     *    safely jump to a node on the list by continuing the
301 <     *    traversal at current head.
291 >     *    Starting at current tail pointer, find the actual last node
292 >     *    and try to append a new node (or if head was null, establish
293 >     *    the first node). Nodes can be appended only if their
294 >     *    predecessors are either already matched or are of the same
295 >     *    mode. If we detect otherwise, then a new node with opposite
296 >     *    mode must have been appended during traversal, so we must
297 >     *    restart at phase 1. The traversal and update steps are
298 >     *    otherwise similar to phase 1: Retrying upon CAS misses and
299 >     *    checking for staleness.  In particular, if a self-link is
300 >     *    encountered, then we can safely jump to a node on the list
301 >     *    by continuing the traversal at current head.
302       *
303       *    On successful append, if the call was ASYNC, return.
304       *
305       * 3. Await match or cancellation (method awaitMatch)
306       *
307       *    Wait for another thread to match node; instead cancelling if
308 <     *    current thread was interrupted or the wait timed out. On
308 >     *    the current thread was interrupted or the wait timed out. On
309       *    multiprocessors, we use front-of-queue spinning: If a node
310       *    appears to be the first unmatched node in the queue, it
311       *    spins a bit before blocking. In either case, before blocking
# Line 317 | Line 320 | public class LinkedTransferQueue<E> exte
320       *    to decide to occasionally perform a Thread.yield. While
321       *    yield has underdefined specs, we assume that might it help,
322       *    and will not hurt in limiting impact of spinning on busy
323 <     *    systems.  We also use much smaller (1/4) spins for nodes
324 <     *    that are not known to be front but whose predecessors have
325 <     *    not blocked -- these "chained" spins avoid artifacts of
323 >     *    systems.  We also use smaller (1/2) spins for nodes that are
324 >     *    not known to be front but whose predecessors have not
325 >     *    blocked -- these "chained" spins avoid artifacts of
326       *    front-of-queue rules which otherwise lead to alternating
327       *    nodes spinning vs blocking. Further, front threads that
328       *    represent phase changes (from data to request node or vice
329       *    versa) compared to their predecessors receive additional
330 <     *    spins, reflecting the longer code path lengths necessary to
331 <     *    release them under contention.
330 >     *    chained spins, reflecting longer paths typically required to
331 >     *    unblock threads during phase changes.
332       */
333  
334      /** True if on multiprocessor */
# Line 333 | Line 336 | public class LinkedTransferQueue<E> exte
336          Runtime.getRuntime().availableProcessors() > 1;
337  
338      /**
339 <     * The number of times to spin (with on average one randomly
340 <     * interspersed call to Thread.yield) on multiprocessor before
341 <     * blocking when a node is apparently the first waiter in the
342 <     * queue.  See above for explanation. Must be a power of two. The
343 <     * value is empirically derived -- it works pretty well across a
344 <     * variety of processors, numbers of CPUs, and OSes.
339 >     * The number of times to spin (with randomly interspersed calls
340 >     * to Thread.yield) on multiprocessor before blocking when a node
341 >     * is apparently the first waiter in the queue.  See above for
342 >     * explanation. Must be a power of two. The value is empirically
343 >     * derived -- it works pretty well across a variety of processors,
344 >     * numbers of CPUs, and OSes.
345       */
346      private static final int FRONT_SPINS   = 1 << 7;
347  
348      /**
349       * The number of times to spin before blocking when a node is
350 <     * preceded by another node that is apparently spinning.
350 >     * preceded by another node that is apparently spinning.  Also
351 >     * serves as an increment to FRONT_SPINS on phase changes, and as
352 >     * base average frequency for yielding during spins. Must be a
353 >     * power of two.
354       */
355 <    private static final int CHAINED_SPINS = FRONT_SPINS >>> 2;
355 >    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
356  
357      /**
358       * Queue nodes. Uses Object, not E, for items to allow forgetting
# Line 355 | Line 361 | public class LinkedTransferQueue<E> exte
361       * precede or follow CASes use simple relaxed forms.  Other
362       * cleanups use releasing/lazy writes.
363       */
364 <    static final class Node {
364 >    static final class Node<E> {
365          final boolean isData;   // false if this is a request node
366          volatile Object item;   // initially non-null if isData; CASed to match
367 <        volatile Node next;
367 >        volatile Node<E> next;
368          volatile Thread waiter; // null until waiting
369  
370          // CAS methods for fields
371 <        final boolean casNext(Node cmp, Node val) {
371 >        final boolean casNext(Node<E> cmp, Node<E> val) {
372              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
373          }
374  
375          final boolean casItem(Object cmp, Object val) {
376 +            assert cmp == null || cmp.getClass() != Node.class;
377              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
378          }
379  
# Line 374 | Line 381 | public class LinkedTransferQueue<E> exte
381           * Creates a new node. Uses relaxed write because item can only
382           * be seen if followed by CAS.
383           */
384 <        Node(Object item, boolean isData) {
384 >        Node(E item, boolean isData) {
385              UNSAFE.putObject(this, itemOffset, item); // relaxed write
386              this.isData = isData;
387          }
# Line 403 | Line 410 | public class LinkedTransferQueue<E> exte
410           */
411          final boolean isMatched() {
412              Object x = item;
413 <            return x == this || (x != null) != isData;
413 >            return (x == this) || ((x == null) == isData);
414 >        }
415 >
416 >        /**
417 >         * Returns true if this is an unmatched request node.
418 >         */
419 >        final boolean isUnmatchedRequest() {
420 >            return !isData && item == null;
421          }
422  
423          /**
# Line 421 | Line 435 | public class LinkedTransferQueue<E> exte
435           * Tries to artificially match a data node -- used by remove.
436           */
437          final boolean tryMatchData() {
438 +            assert isData;
439              Object x = item;
440              if (x != null && x != this && casItem(x, null)) {
441                  LockSupport.unpark(waiter);
# Line 442 | Line 457 | public class LinkedTransferQueue<E> exte
457      }
458  
459      /** head of the queue; null until first enqueue */
460 <    private transient volatile Node head;
460 >    transient volatile Node<E> head;
461  
462      /** predecessor of dangling unspliceable node */
463 <    private transient volatile Node cleanMe; // decl here to reduce contention
463 >    private transient volatile Node<E> cleanMe; // decl here reduces contention
464  
465      /** tail of the queue; null until first append */
466 <    private transient volatile Node tail;
466 >    private transient volatile Node<E> tail;
467  
468      // CAS methods for fields
469 <    private boolean casTail(Node cmp, Node val) {
469 >    private boolean casTail(Node<E> cmp, Node<E> val) {
470          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
471      }
472  
473 <    private boolean casHead(Node cmp, Node val) {
473 >    private boolean casHead(Node<E> cmp, Node<E> val) {
474          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
475      }
476  
477 <    private boolean casCleanMe(Node cmp, Node val) {
477 >    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
478          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
479      }
480  
# Line 472 | Line 487 | public class LinkedTransferQueue<E> exte
487      private static final int SYNC    = 2; // for transfer, take
488      private static final int TIMEOUT = 3; // for timed poll, tryTransfer
489  
490 +    @SuppressWarnings("unchecked")
491 +    static <E> E cast(Object item) {
492 +        assert item == null || item.getClass() != Node.class;
493 +        return (E) item;
494 +    }
495 +
496      /**
497       * Implements all queuing methods. See above for explanation.
498       *
# Line 482 | Line 503 | public class LinkedTransferQueue<E> exte
503       * @return an item if matched, else e
504       * @throws NullPointerException if haveData mode but e is null
505       */
506 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
506 >    private E xfer(E e, boolean haveData, int how, long nanos) {
507          if (haveData && (e == null))
508              throw new NullPointerException();
509 <        Node s = null;                        // the node to append, if needed
509 >        Node<E> s = null;                     // the node to append, if needed
510  
511          retry: for (;;) {                     // restart on append race
512  
513 <            for (Node h = head, p = h; p != null;) { // find & match first node
513 >            for (Node<E> h = head, p = h; p != null;) {
514 >                // find & match first node
515                  boolean isData = p.isData;
516                  Object item = p.item;
517                  if (item != p && (item != null) == isData) { // unmatched
518                      if (isData == haveData)   // can't match
519                          break;
520                      if (p.casItem(item, e)) { // match
521 <                        Thread w = p.waiter;
522 <                        while (p != h) {      // update head
523 <                            Node n = p.next;  // by 2 unless singleton
524 <                            if (n != null)
525 <                                p = n;
504 <                            if (head == h && casHead(h, p)) {
521 >                        for (Node<E> q = p; q != h;) {
522 >                            Node<E> n = q.next; // update head by 2
523 >                            if (n != null)    // unless singleton
524 >                                q = n;
525 >                            if (head == h && casHead(h, q)) {
526                                  h.forgetNext();
527                                  break;
528                              }                 // advance and retry
529                              if ((h = head)   == null ||
530 <                                (p = h.next) == null || !p.isMatched())
530 >                                (q = h.next) == null || !q.isMatched())
531                                  break;        // unless slack < 2
532                          }
533 <                        LockSupport.unpark(w);
534 <                        return item;
533 >                        LockSupport.unpark(p.waiter);
534 >                        return this.<E>cast(item);
535                      }
536                  }
537 <                Node n = p.next;
537 >                Node<E> n = p.next;
538                  p = (p != n) ? n : (h = head); // Use head if p offlist
539              }
540  
541              if (how >= ASYNC) {               // No matches available
542                  if (s == null)
543 <                    s = new Node(e, haveData);
544 <                Node pred = tryAppend(s, haveData);
543 >                    s = new Node<E>(e, haveData);
544 >                Node<E> pred = tryAppend(s, haveData);
545                  if (pred == null)
546                      continue retry;           // lost race vs opposite mode
547                  if (how >= SYNC)
548 <                    return awaitMatch(pred, s, e, how, nanos);
548 >                    return awaitMatch(s, pred, e, how, nanos);
549              }
550              return e; // not waiting
551          }
# Line 539 | Line 560 | public class LinkedTransferQueue<E> exte
560       * different mode, else s's predecessor, or s itself if no
561       * predecessor
562       */
563 <    private Node tryAppend(Node s, boolean haveData) {
564 <        for (Node t = tail, p = t;;) { // move p to last node and append
565 <            Node n, u;                        // temps for reads of next & tail
563 >    private Node<E> tryAppend(Node<E> s, boolean haveData) {
564 >        for (Node<E> t = tail, p = t;;) { // move p to last node and append
565 >            Node<E> n, u;                     // temps for reads of next & tail
566              if (p == null && (p = head) == null) {
567                  if (casHead(null, s))
568                      return s;                 // initialize
# Line 568 | Line 589 | public class LinkedTransferQueue<E> exte
589      /**
590       * Spins/yields/blocks until node s is matched or caller gives up.
591       *
571     * @param pred the predecessor of s, or s or null if none
592       * @param s the waiting node
593 +     * @param pred the predecessor of s, or s itself if it has no
594 +     * predecessor, or null if unknown (the null case does not occur
595 +     * in any current calls but may in possible future extensions)
596       * @param e the comparison value for checking match
597       * @param how either SYNC or TIMEOUT
598       * @param nanos timeout value
599       * @return matched item, or e if unmatched on interrupt or timeout
600       */
601 <    private Object awaitMatch(Node pred, Node s, Object e,
579 <                              int how, long nanos) {
601 >    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
602          long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
603          Thread w = Thread.currentThread();
604          int spins = -1; // initialized after first item and cancel checks
# Line 585 | Line 607 | public class LinkedTransferQueue<E> exte
607          for (;;) {
608              Object item = s.item;
609              if (item != e) {                  // matched
610 +                assert item != s;
611                  s.forgetContents();           // avoid garbage
612 <                return item;
612 >                return this.<E>cast(item);
613              }
614              if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
615 <                     s.casItem(e, s)) {       // cancel
615 >                    s.casItem(e, s)) {       // cancel
616                  unsplice(pred, s);
617                  return e;
618              }
# Line 598 | Line 621 | public class LinkedTransferQueue<E> exte
621                  if ((spins = spinsFor(pred, s.isData)) > 0)
622                      randomYields = ThreadLocalRandom.current();
623              }
624 <            else if (spins > 0) {             // spin, occasionally yield
625 <                if (randomYields.nextInt(FRONT_SPINS) == 0)
626 <                    Thread.yield();
627 <                --spins;
624 >            else if (spins > 0) {             // spin
625 >                if (--spins == 0)
626 >                    shortenHeadPath();        // reduce slack before blocking
627 >                else if (randomYields.nextInt(CHAINED_SPINS) == 0)
628 >                    Thread.yield();           // occasionally yield
629              }
630              else if (s.waiter == null) {
631 <                shortenHeadPath();            // reduce slack before blocking
608 <                s.waiter = w;                 // request unpark
631 >                s.waiter = w;                 // request unpark then recheck
632              }
633              else if (how == TIMEOUT) {
634                  long now = System.nanoTime();
# Line 615 | Line 638 | public class LinkedTransferQueue<E> exte
638              }
639              else {
640                  LockSupport.park(this);
641 +                s.waiter = null;
642                  spins = -1;                   // spin if front upon wakeup
643              }
644          }
# Line 624 | Line 648 | public class LinkedTransferQueue<E> exte
648       * Returns spin/yield value for a node with given predecessor and
649       * data mode. See above for explanation.
650       */
651 <    private static int spinsFor(Node pred, boolean haveData) {
651 >    private static int spinsFor(Node<?> pred, boolean haveData) {
652          if (MP && pred != null) {
653 <            boolean predData = pred.isData;
654 <            if (predData != haveData)         // front and phase change
655 <                return FRONT_SPINS + (FRONT_SPINS >>> 1);
632 <            if (predData != (pred.item != null)) // probably at front
653 >            if (pred.isData != haveData)      // phase change
654 >                return FRONT_SPINS + CHAINED_SPINS;
655 >            if (pred.isMatched())             // probably at front
656                  return FRONT_SPINS;
657              if (pred.waiter == null)          // pred apparently spinning
658                  return CHAINED_SPINS;
# Line 642 | Line 665 | public class LinkedTransferQueue<E> exte
665       * or trailing node; failing on contention.
666       */
667      private void shortenHeadPath() {
668 <        Node h, hn, p, q;
668 >        Node<E> h, hn, p, q;
669          if ((p = h = head) != null && h.isMatched() &&
670              (q = hn = h.next) != null) {
671 <            Node n;
671 >            Node<E> n;
672              while ((n = q.next) != q) {
673                  if (n == null || !q.isMatched()) {
674                      if (hn != q && h.next == hn)
# Line 664 | Line 687 | public class LinkedTransferQueue<E> exte
687       * Returns the first unmatched node of the given mode, or null if
688       * none.  Used by methods isEmpty, hasWaitingConsumer.
689       */
690 <    private Node firstOfMode(boolean data) {
691 <        for (Node p = head; p != null; ) {
690 >    private Node<E> firstOfMode(boolean data) {
691 >        for (Node<E> p = head; p != null; ) {
692              if (!p.isMatched())
693                  return (p.isData == data) ? p : null;
694 <            Node n = p.next;
694 >            Node<E> n = p.next;
695              p = (n != p) ? n : head;
696          }
697          return null;
# Line 676 | Line 699 | public class LinkedTransferQueue<E> exte
699  
700      /**
701       * Returns the item in the first unmatched node with isData; or
702 <     * null if none. Used by peek.
702 >     * null if none.  Used by peek.
703       */
704 <    private Object firstDataItem() {
705 <        for (Node p = head; p != null; ) {
704 >    private E firstDataItem() {
705 >        for (Node<E> p = head; p != null; ) {
706              boolean isData = p.isData;
707              Object item = p.item;
708              if (item != p && (item != null) == isData)
709 <                return isData ? item : null;
710 <            Node n = p.next;
709 >                return isData ? this.<E>cast(item) : null;
710 >            Node<E> n = p.next;
711              p = (n != p) ? n : head;
712          }
713          return null;
# Line 696 | Line 719 | public class LinkedTransferQueue<E> exte
719       */
720      private int countOfMode(boolean data) {
721          int count = 0;
722 <        for (Node p = head; p != null; ) {
722 >        for (Node<E> p = head; p != null; ) {
723              if (!p.isMatched()) {
724                  if (p.isData != data)
725                      return 0;
726                  if (++count == Integer.MAX_VALUE) // saturated
727                      break;
728              }
729 <            Node n = p.next;
729 >            Node<E> n = p.next;
730              if (n != p)
731                  p = n;
732              else {
# Line 715 | Line 738 | public class LinkedTransferQueue<E> exte
738      }
739  
740      final class Itr implements Iterator<E> {
741 <        private Node nextNode;   // next node to return item for
742 <        private Object nextItem; // the corresponding item
743 <        private Node lastRet;    // last returned node, to support remove
741 >        private Node<E> nextNode;   // next node to return item for
742 >        private E nextItem;         // the corresponding item
743 >        private Node<E> lastRet;    // last returned node, to support remove
744  
745          /**
746           * Moves to next node after prev, or first node if prev null.
747           */
748 <        private void advance(Node prev) {
748 >        private void advance(Node<E> prev) {
749              lastRet = prev;
750 <            Node p;
750 >            Node<E> p;
751              if (prev == null || (p = prev.next) == prev)
752                  p = head;
753              while (p != null) {
754                  Object item = p.item;
755                  if (p.isData) {
756                      if (item != null && item != p) {
757 <                        nextItem = item;
757 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
758                          nextNode = p;
759                          return;
760                      }
761                  }
762                  else if (item == null)
763                      break;
764 <                Node n = p.next;
764 >                Node<E> n = p.next;
765                  p = (n != p) ? n : head;
766              }
767              nextNode = null;
# Line 753 | Line 776 | public class LinkedTransferQueue<E> exte
776          }
777  
778          public final E next() {
779 <            Node p = nextNode;
779 >            Node<E> p = nextNode;
780              if (p == null) throw new NoSuchElementException();
781 <            Object e = nextItem;
781 >            E e = nextItem;
782              advance(p);
783 <            return (E) e;
783 >            return e;
784          }
785  
786          public final void remove() {
787 <            Node p = lastRet;
787 >            Node<E> p = lastRet;
788              if (p == null) throw new IllegalStateException();
789              lastRet = null;
790 <            findAndRemoveNode(p);
790 >            findAndRemoveDataNode(p);
791          }
792      }
793  
# Line 777 | Line 800 | public class LinkedTransferQueue<E> exte
800       * @param pred predecessor of node to be unspliced
801       * @param s the node to be unspliced
802       */
803 <    private void unsplice(Node pred, Node s) {
803 >    private void unsplice(Node<E> pred, Node<E> s) {
804          s.forgetContents(); // clear unneeded fields
805          /*
806           * At any given time, exactly one node on list cannot be
# Line 790 | Line 813 | public class LinkedTransferQueue<E> exte
813           */
814          if (pred != null && pred != s) {
815              while (pred.next == s) {
816 <                Node oldpred = (cleanMe == null) ? null : reclean();
817 <                Node n = s.next;
816 >                Node<E> oldpred = (cleanMe == null) ? null : reclean();
817 >                Node<E> n = s.next;
818                  if (n != null) {
819                      if (n != s)
820                          pred.casNext(s, n);
821                      break;
822                  }
823                  if (oldpred == pred ||      // Already saved
824 <                    (oldpred == null && casCleanMe(null, pred)))
825 <                    break;                  // Postpone cleaning
824 >                    ((oldpred == null || oldpred.next == s) &&
825 >                     casCleanMe(oldpred, pred))) {
826 >                    break;
827 >                }
828              }
829          }
830      }
# Line 810 | Line 835 | public class LinkedTransferQueue<E> exte
835       *
836       * @return current cleanMe node (or null)
837       */
838 <    private Node reclean() {
838 >    private Node<E> reclean() {
839          /*
840           * cleanMe is, or at one time was, predecessor of a cancelled
841           * node s that was the tail so could not be unspliced.  If it
# Line 821 | Line 846 | public class LinkedTransferQueue<E> exte
846           * we can (must) clear cleanMe without unsplicing.  This can
847           * loop only due to contention.
848           */
849 <        Node pred;
849 >        Node<E> pred;
850          while ((pred = cleanMe) != null) {
851 <            Node s = pred.next;
852 <            Node n;
851 >            Node<E> s = pred.next;
852 >            Node<E> n;
853              if (s == null || s == pred || !s.isMatched())
854                  casCleanMe(pred, null); // already gone
855              else if ((n = s.next) != null) {
# Line 840 | Line 865 | public class LinkedTransferQueue<E> exte
865  
866      /**
867       * Main implementation of Iterator.remove(). Find
868 <     * and unsplice the given node.
868 >     * and unsplice the given data node.
869       */
870 <    final void findAndRemoveNode(Node s) {
870 >    final void findAndRemoveDataNode(Node<E> s) {
871 >        assert s.isData;
872          if (s.tryMatchData()) {
873 <            Node pred = null;
848 <            Node p = head;
849 <            while (p != null) {
873 >            for (Node<E> pred = null, p = head; p != null; ) {
874                  if (p == s) {
875                      unsplice(pred, p);
876                      break;
877                  }
878 <                if (!p.isData && !p.isMatched())
878 >                if (p.isUnmatchedRequest())
879                      break;
880                  pred = p;
881                  if ((p = p.next) == pred) { // stale
# Line 867 | Line 891 | public class LinkedTransferQueue<E> exte
891       */
892      private boolean findAndRemove(Object e) {
893          if (e != null) {
894 <            Node pred = null;
871 <            Node p = head;
872 <            while (p != null) {
894 >            for (Node<E> pred = null, p = head; p != null; ) {
895                  Object item = p.item;
896                  if (p.isData) {
897                      if (item != null && item != p && e.equals(item) &&
# Line 881 | Line 903 | public class LinkedTransferQueue<E> exte
903                  else if (item == null)
904                      break;
905                  pred = p;
906 <                if ((p = p.next) == pred) {
906 >                if ((p = p.next) == pred) { // stale
907                      pred = null;
908                      p = head;
909                  }
# Line 1017 | Line 1039 | public class LinkedTransferQueue<E> exte
1039      }
1040  
1041      public E take() throws InterruptedException {
1042 <        Object e = xfer(null, false, SYNC, 0);
1042 >        E e = xfer(null, false, SYNC, 0);
1043          if (e != null)
1044 <            return (E)e;
1044 >            return e;
1045          Thread.interrupted();
1046          throw new InterruptedException();
1047      }
1048  
1049      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1050 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1050 >        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1051          if (e != null || !Thread.interrupted())
1052 <            return (E)e;
1052 >            return e;
1053          throw new InterruptedException();
1054      }
1055  
1056      public E poll() {
1057 <        return (E)xfer(null, false, NOW, 0);
1057 >        return xfer(null, false, NOW, 0);
1058      }
1059  
1060      /**
# Line 1089 | Line 1111 | public class LinkedTransferQueue<E> exte
1111      }
1112  
1113      public E peek() {
1114 <        return (E) firstDataItem();
1114 >        return firstDataItem();
1115      }
1116  
1117      /**
# Line 1185 | Line 1207 | public class LinkedTransferQueue<E> exte
1207          }
1208      }
1209  
1188
1210      // Unsafe mechanics
1211  
1212      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
# Line 1208 | Line 1229 | public class LinkedTransferQueue<E> exte
1229          }
1230      }
1231  
1232 <    private static sun.misc.Unsafe getUnsafe() {
1232 >    /**
1233 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1234 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
1235 >     * into a jdk.
1236 >     *
1237 >     * @return a sun.misc.Unsafe
1238 >     */
1239 >    static sun.misc.Unsafe getUnsafe() {
1240          try {
1241              return sun.misc.Unsafe.getUnsafe();
1242          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines