ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.45 by dl, Wed Oct 21 16:30:40 2009 UTC vs.
Revision 1.57 by jsr166, Wed Oct 28 09:28:30 2009 UTC

# Line 105 | Line 105 | public class LinkedTransferQueue<E> exte
105       * successful atomic operation per enq/deq pair. But it also
106       * enables lower cost variants of queue maintenance mechanics. (A
107       * variation of this idea applies even for non-dual queues that
108 <     * support deletion of embedded elements, such as
108 >     * support deletion of interior elements, such as
109       * j.u.c.ConcurrentLinkedQueue.)
110       *
111 <     * Once a node is matched, its item can never again change.  We
112 <     * may thus arrange that the linked list of them contains a prefix
113 <     * of zero or more matched nodes, followed by a suffix of zero or
114 <     * more unmatched nodes. (Note that we allow both the prefix and
115 <     * suffix to be zero length, which in turn means that we do not
116 <     * use a dummy header.)  If we were not concerned with either time
117 <     * or space efficiency, we could correctly perform enqueue and
118 <     * dequeue operations by traversing from a pointer to the initial
119 <     * node; CASing the item of the first unmatched node on match and
120 <     * CASing the next field of the trailing node on appends.  While
121 <     * this would be a terrible idea in itself, it does have the
122 <     * benefit of not requiring ANY atomic updates on head/tail
123 <     * fields.
111 >     * Once a node is matched, its match status can never again
112 >     * change.  We may thus arrange that the linked list of them
113 >     * contain a prefix of zero or more matched nodes, followed by a
114 >     * suffix of zero or more unmatched nodes. (Note that we allow
115 >     * both the prefix and suffix to be zero length, which in turn
116 >     * means that we do not use a dummy header.)  If we were not
117 >     * concerned with either time or space efficiency, we could
118 >     * correctly perform enqueue and dequeue operations by traversing
119 >     * from a pointer to the initial node; CASing the item of the
120 >     * first unmatched node on match and CASing the next field of the
121 >     * trailing node on appends. (Plus some special-casing when
122 >     * initially empty).  While this would be a terrible idea in
123 >     * itself, it does have the benefit of not requiring ANY atomic
124 >     * updates on head/tail fields.
125       *
126       * We introduce here an approach that lies between the extremes of
127 <     * never versus always updating queue (head and tail) pointers
128 <     * that reflects the tradeoff of sometimes require extra traversal
129 <     * steps to locate the first and/or last unmatched nodes, versus
130 <     * the reduced overhead and contention of fewer updates to queue
131 <     * pointers. For example, a possible snapshot of a queue is:
127 >     * never versus always updating queue (head and tail) pointers.
128 >     * This offers a tradeoff between sometimes requiring extra
129 >     * traversal steps to locate the first and/or last unmatched
130 >     * nodes, versus the reduced overhead and contention of fewer
131 >     * updates to queue pointers. For example, a possible snapshot of
132 >     * a queue is:
133       *
134       *  head           tail
135       *    |              |
# Line 139 | Line 141 | public class LinkedTransferQueue<E> exte
141       * similarly for "tail") is an empirical matter. We have found
142       * that using very small constants in the range of 1-3 work best
143       * over a range of platforms. Larger values introduce increasing
144 <     * costs of cache misses and risks of long traversal chains.
144 >     * costs of cache misses and risks of long traversal chains, while
145 >     * smaller values increase CAS contention and overhead.
146       *
147       * Dual queues with slack differ from plain M&S dual queues by
148       * virtue of only sometimes updating head or tail pointers when
# Line 158 | Line 161 | public class LinkedTransferQueue<E> exte
161       * targets.  Even when using very small slack values, this
162       * approach works well for dual queues because it allows all
163       * operations up to the point of matching or appending an item
164 <     * (hence potentially releasing another thread) to be read-only,
165 <     * thus not introducing any further contention. As described
166 <     * below, we implement this by performing slack maintenance
167 <     * retries only after these points.
164 >     * (hence potentially allowing progress by another thread) to be
165 >     * read-only, thus not introducing any further contention. As
166 >     * described below, we implement this by performing slack
167 >     * maintenance retries only after these points.
168       *
169       * As an accompaniment to such techniques, traversal overhead can
170       * be further reduced without increasing contention of head
171 <     * pointer updates.  During traversals, threads may sometimes
172 <     * shortcut the "next" link path from the current "head" node to
173 <     * be closer to the currently known first unmatched node. Again,
174 <     * this may be triggered with using thresholds or randomization.
171 >     * pointer updates: Threads may sometimes shortcut the "next" link
172 >     * path from the current "head" node to be closer to the currently
173 >     * known first unmatched node, and similarly for tail. Again, this
174 >     * may be triggered with using thresholds or randomization.
175       *
176       * These ideas must be further extended to avoid unbounded amounts
177       * of costly-to-reclaim garbage caused by the sequential "next"
# Line 180 | Line 183 | public class LinkedTransferQueue<E> exte
183       * (Similar issues arise in non-GC environments.)  To cope with
184       * this in our implementation, upon CASing to advance the head
185       * pointer, we set the "next" link of the previous head to point
186 <     * only to itself; thus limiting the length connected dead lists.
186 >     * only to itself; thus limiting the length of connected dead lists.
187       * (We also take similar care to wipe out possibly garbage
188       * retaining values held in other Node fields.)  However, doing so
189       * adds some further complexity to traversal: If any "next"
# Line 196 | Line 199 | public class LinkedTransferQueue<E> exte
199       * mechanics because an update may leave head at a detached node.
200       * And while direct writes are possible for tail updates, they
201       * increase the risk of long retraversals, and hence long garbage
202 <     * chains which can be much more costly than is worthwhile
202 >     * chains, which can be much more costly than is worthwhile
203       * considering that the cost difference of performing a CAS vs
204       * write is smaller when they are not triggered on each operation
205       * (especially considering that writes and CASes equally require
206       * additional GC bookkeeping ("write barriers") that are sometimes
207       * more costly than the writes themselves because of contention).
208       *
209 <     * Removal of internal nodes (due to timed out or interrupted
210 <     * waits, or calls to remove or Iterator.remove) uses a scheme
211 <     * roughly similar to that in Scherer, Lea, and Scott
212 <     * SynchronousQueue. Given a predecessor, we can unsplice any node
213 <     * except the (actual) tail of the queue. To avoid build-up of
214 <     * cancelled trailing nodes, upon a request to remove a trailing
215 <     * node, it is placed in field "cleanMe" to be unspliced later.
209 >     * Removal of interior nodes (due to timed out or interrupted
210 >     * waits, or calls to remove(x) or Iterator.remove) can use a
211 >     * scheme roughly similar to that described in Scherer, Lea, and
212 >     * Scott's SynchronousQueue. Given a predecessor, we can unsplice
213 >     * any node except the (actual) tail of the queue. To avoid
214 >     * build-up of cancelled trailing nodes, upon a request to remove
215 >     * a trailing node, it is placed in field "cleanMe" to be
216 >     * unspliced upon the next call to unsplice any other node.
217 >     * Situations needing such mechanics are not common but do occur
218 >     * in practice; for example when an unbounded series of short
219 >     * timed calls to poll repeatedly time out but never otherwise
220 >     * fall off the list because of an untimed call to take at the
221 >     * front of the queue. Note that maintaining field cleanMe does
222 >     * not otherwise much impact garbage retention even if never
223 >     * cleared by some other call because the held node will
224 >     * eventually either directly or indirectly lead to a self-link
225 >     * once off the list.
226       *
227       * *** Overview of implementation ***
228       *
229 <     * We use a threshold-based approach to updates, with a target
230 <     * slack of two.  The slack value is hard-wired: a path greater
229 >     * We use a threshold-based approach to updates, with a slack
230 >     * threshold of two -- that is, we update head/tail when the
231 >     * current pointer appears to be two or more steps away from the
232 >     * first/last node. The slack value is hard-wired: a path greater
233       * than one is naturally implemented by checking equality of
234       * traversal pointers except when the list has only one element,
235 <     * in which case we keep max slack at one. Avoiding tracking
236 <     * explicit counts across situations slightly simplifies an
235 >     * in which case we keep slack threshold at one. Avoiding tracking
236 >     * explicit counts across method calls slightly simplifies an
237       * already-messy implementation. Using randomization would
238       * probably work better if there were a low-quality dirt-cheap
239       * per-thread one available, but even ThreadLocalRandom is too
240       * heavy for these purposes.
241       *
242 <     * With such a small slack value, path short-circuiting is rarely
243 <     * worthwhile. However, it is used (in awaitMatch) immediately
244 <     * before a waiting thread starts to block, as a final bit of
245 <     * helping at a point when contention with others is extremely
246 <     * unlikely (since if other threads that could release it are
247 <     * operating, then the current thread wouldn't be blocking).
242 >     * With such a small slack threshold value, it is rarely
243 >     * worthwhile to augment this with path short-circuiting; i.e.,
244 >     * unsplicing nodes between head and the first unmatched node, or
245 >     * similarly for tail, rather than advancing head or tail
246 >     * proper. However, it is used (in awaitMatch) immediately before
247 >     * a waiting thread starts to block, as a final bit of helping at
248 >     * a point when contention with others is extremely unlikely
249 >     * (since if other threads that could release it are operating,
250 >     * then the current thread wouldn't be blocking).
251 >     *
252 >     * We allow both the head and tail fields to be null before any
253 >     * nodes are enqueued; initializing upon first append.  This
254 >     * simplifies some other logic, as well as providing more
255 >     * efficient explicit control paths instead of letting JVMs insert
256 >     * implicit NullPointerExceptions when they are null.  While not
257 >     * currently fully implemented, we also leave open the possibility
258 >     * of re-nulling these fields when empty (which is complicated to
259 >     * arrange, for little benefit.)
260       *
261       * All enqueue/dequeue operations are handled by the single method
262       * "xfer" with parameters indicating whether to act as some form
263       * of offer, put, poll, take, or transfer (each possibly with
264       * timeout). The relative complexity of using one monolithic
265       * method outweighs the code bulk and maintenance problems of
266 <     * using nine separate methods.
266 >     * using separate methods for each case.
267       *
268       * Operation consists of up to three phases. The first is
269       * implemented within method xfer, the second in tryAppend, and
# Line 249 | Line 276 | public class LinkedTransferQueue<E> exte
276       *    case matching it and returning, also if necessary updating
277       *    head to one past the matched node (or the node itself if the
278       *    list has no other unmatched nodes). If the CAS misses, then
279 <     *    a retry loops until the slack is at most two. Traversals
280 <     *    also check if the initial head is now off-list, in which
281 <     *    case they start at the new head.
279 >     *    a loop retries advancing head by two steps until either
280 >     *    success or the slack is at most two. By requiring that each
281 >     *    attempt advances head by two (if applicable), we ensure that
282 >     *    the slack does not grow without bound. Traversals also check
283 >     *    if the initial head is now off-list, in which case they
284 >     *    start at the new head.
285       *
286       *    If no candidates are found and the call was untimed
287       *    poll/offer, (argument "how" is NOW) return.
288       *
289       * 2. Try to append a new node (method tryAppend)
290       *
291 <     *    Starting at current tail pointer, try to append a new node
292 <     *    to the list (or if head was null, establish the first
293 <     *    node). Nodes can be appended only if their predecessors are
294 <     *    either already matched or are of the same mode. If we detect
295 <     *    otherwise, then a new node with opposite mode must have been
296 <     *    appended during traversal, so must restart at phase 1. The
297 <     *    traversal and update steps are otherwise similar to phase 1:
298 <     *    Retrying upon CAS misses and checking for staleness.  In
299 <     *    particular, if a self-link is encountered, then we can
300 <     *    safely jump to a node on the list by continuing the
301 <     *    traversal at current head.
291 >     *    Starting at current tail pointer, find the actual last node
292 >     *    and try to append a new node (or if head was null, establish
293 >     *    the first node). Nodes can be appended only if their
294 >     *    predecessors are either already matched or are of the same
295 >     *    mode. If we detect otherwise, then a new node with opposite
296 >     *    mode must have been appended during traversal, so we must
297 >     *    restart at phase 1. The traversal and update steps are
298 >     *    otherwise similar to phase 1: Retrying upon CAS misses and
299 >     *    checking for staleness.  In particular, if a self-link is
300 >     *    encountered, then we can safely jump to a node on the list
301 >     *    by continuing the traversal at current head.
302       *
303 <     *    On successful append, if the call was ASYNC, return
303 >     *    On successful append, if the call was ASYNC, return.
304       *
305       * 3. Await match or cancellation (method awaitMatch)
306       *
307       *    Wait for another thread to match node; instead cancelling if
308 <     *    current thread was interrupted or the wait timed out. On
308 >     *    the current thread was interrupted or the wait timed out. On
309       *    multiprocessors, we use front-of-queue spinning: If a node
310       *    appears to be the first unmatched node in the queue, it
311       *    spins a bit before blocking. In either case, before blocking
# Line 290 | Line 320 | public class LinkedTransferQueue<E> exte
320       *    to decide to occasionally perform a Thread.yield. While
321       *    yield has underdefined specs, we assume that might it help,
322       *    and will not hurt in limiting impact of spinning on busy
323 <     *    systems.  We also use much smaller (1/4) spins for nodes
324 <     *    that are not known to be front but whose predecessors have
325 <     *    not blocked -- these "chained" spins avoid artifacts of
323 >     *    systems.  We also use smaller (1/2) spins for nodes that are
324 >     *    not known to be front but whose predecessors have not
325 >     *    blocked -- these "chained" spins avoid artifacts of
326       *    front-of-queue rules which otherwise lead to alternating
327       *    nodes spinning vs blocking. Further, front threads that
328       *    represent phase changes (from data to request node or vice
329       *    versa) compared to their predecessors receive additional
330 <     *    spins, reflecting the longer code path lengths necessary to
331 <     *    release them under contention.
330 >     *    chained spins, reflecting longer paths typically required to
331 >     *    unblock threads during phase changes.
332       */
333  
334      /** True if on multiprocessor */
# Line 306 | Line 336 | public class LinkedTransferQueue<E> exte
336          Runtime.getRuntime().availableProcessors() > 1;
337  
338      /**
339 <     * The number of times to spin (with on average one randomly
340 <     * interspersed call to Thread.yield) on multiprocessor before
341 <     * blocking when a node is apparently the first waiter in the
342 <     * queue.  See above for explanation. Must be a power of two. The
343 <     * value is empirically derived -- it works pretty well across a
344 <     * variety of processors, numbers of CPUs, and OSes.
339 >     * The number of times to spin (with randomly interspersed calls
340 >     * to Thread.yield) on multiprocessor before blocking when a node
341 >     * is apparently the first waiter in the queue.  See above for
342 >     * explanation. Must be a power of two. The value is empirically
343 >     * derived -- it works pretty well across a variety of processors,
344 >     * numbers of CPUs, and OSes.
345       */
346      private static final int FRONT_SPINS   = 1 << 7;
347  
348      /**
349       * The number of times to spin before blocking when a node is
350 <     * preceded by another node that is apparently spinning.
350 >     * preceded by another node that is apparently spinning.  Also
351 >     * serves as an increment to FRONT_SPINS on phase changes, and as
352 >     * base average frequency for yielding during spins. Must be a
353 >     * power of two.
354       */
355 <    private static final int CHAINED_SPINS = FRONT_SPINS >>> 2;
355 >    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
356  
357      /**
358 <     * Queue nodes. Uses Object, not E for items to allow forgetting
358 >     * Queue nodes. Uses Object, not E, for items to allow forgetting
359       * them after use.  Relies heavily on Unsafe mechanics to minimize
360 <     * unecessary ordering constraints: Writes that intrinsically
360 >     * unnecessary ordering constraints: Writes that intrinsically
361       * precede or follow CASes use simple relaxed forms.  Other
362       * cleanups use releasing/lazy writes.
363       */
364 <    static final class Node {
364 >    static final class Node<E> {
365          final boolean isData;   // false if this is a request node
366 <        volatile Object item;   // initially nonnull if isData; CASed to match
367 <        volatile Node next;
366 >        volatile Object item;   // initially non-null if isData; CASed to match
367 >        volatile Node<E> next;
368          volatile Thread waiter; // null until waiting
369  
370          // CAS methods for fields
371 <        final boolean casNext(Node cmp, Node val) {
371 >        final boolean casNext(Node<E> cmp, Node<E> val) {
372              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
373          }
374  
375          final boolean casItem(Object cmp, Object val) {
376 +            assert cmp == null || cmp.getClass() != Node.class;
377              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
378          }
379  
380          /**
381 <         * Create a new node. Uses relaxed write because item can only
382 <         * be seen if followed by CAS
381 >         * Creates a new node. Uses relaxed write because item can only
382 >         * be seen if followed by CAS.
383           */
384 <        Node(Object item, boolean isData) {
384 >        Node(E item, boolean isData) {
385              UNSAFE.putObject(this, itemOffset, item); // relaxed write
386              this.isData = isData;
387          }
# Line 376 | Line 410 | public class LinkedTransferQueue<E> exte
410           */
411          final boolean isMatched() {
412              Object x = item;
413 <            return x == this || (x != null) != isData;
413 >            return (x == this) || ((x == null) == isData);
414          }
415  
416          /**
# Line 391 | Line 425 | public class LinkedTransferQueue<E> exte
425          }
426  
427          /**
428 <         * Tries to artifically match a data node -- used by remove.
428 >         * Tries to artificially match a data node -- used by remove.
429           */
430          final boolean tryMatchData() {
431              Object x = item;
# Line 415 | Line 449 | public class LinkedTransferQueue<E> exte
449      }
450  
451      /** head of the queue; null until first enqueue */
452 <    private transient volatile Node head;
452 >    transient volatile Node<E> head;
453  
454      /** predecessor of dangling unspliceable node */
455 <    private transient volatile Node cleanMe; // decl here to reduce contention
455 >    private transient volatile Node<E> cleanMe; // decl here reduces contention
456  
457      /** tail of the queue; null until first append */
458 <    private transient volatile Node tail;
458 >    private transient volatile Node<E> tail;
459  
460      // CAS methods for fields
461 <    private boolean casTail(Node cmp, Node val) {
461 >    private boolean casTail(Node<E> cmp, Node<E> val) {
462          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
463      }
464  
465 <    private boolean casHead(Node cmp, Node val) {
465 >    private boolean casHead(Node<E> cmp, Node<E> val) {
466          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
467      }
468  
469 <    private boolean casCleanMe(Node cmp, Node val) {
469 >    private boolean casCleanMe(Node<E> cmp, Node<E> val) {
470          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
471      }
472  
# Line 445 | Line 479 | public class LinkedTransferQueue<E> exte
479      private static final int SYNC    = 2; // for transfer, take
480      private static final int TIMEOUT = 3; // for timed poll, tryTransfer
481  
482 +    @SuppressWarnings("unchecked")
483 +    static <E> E cast(Object item) {
484 +        assert item == null || item.getClass() != Node.class;
485 +        return (E) item;
486 +    }
487 +
488      /**
489       * Implements all queuing methods. See above for explanation.
490       *
491       * @param e the item or null for take
492 <     * @param haveData true if this is a put else a take
492 >     * @param haveData true if this is a put, else a take
493       * @param how NOW, ASYNC, SYNC, or TIMEOUT
494       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
495 <     * @return an item if matched, else e;
495 >     * @return an item if matched, else e
496       * @throws NullPointerException if haveData mode but e is null
497       */
498 <    private Object xfer(Object e, boolean haveData, int how, long nanos) {
498 >    private E xfer(E e, boolean haveData, int how, long nanos) {
499          if (haveData && (e == null))
500              throw new NullPointerException();
501 <        Node s = null;                        // the node to append, if needed
501 >        Node<E> s = null;                     // the node to append, if needed
502  
503          retry: for (;;) {                     // restart on append race
504  
505 <            for (Node h = head, p = h; p != null;) { // find & match first node
505 >            for (Node<E> h = head, p = h; p != null;) {
506 >                // find & match first node
507                  boolean isData = p.isData;
508                  Object item = p.item;
509                  if (item != p && (item != null) == isData) { // unmatched
510                      if (isData == haveData)   // can't match
511                          break;
512                      if (p.casItem(item, e)) { // match
513 <                        Thread w = p.waiter;
514 <                        while (p != h) {      // update head
515 <                            Node n = p.next;  // by 2 unless singleton
516 <                            if (n != null)
517 <                                p = n;
477 <                            if (head == h && casHead(h, p)) {
513 >                        for (Node<E> q = p; q != h;) {
514 >                            Node<E> n = q.next; // update head by 2
515 >                            if (n != null)    // unless singleton
516 >                                q = n;
517 >                            if (head == h && casHead(h, q)) {
518                                  h.forgetNext();
519                                  break;
520                              }                 // advance and retry
521                              if ((h = head)   == null ||
522 <                                (p = h.next) == null || !p.isMatched())
522 >                                (q = h.next) == null || !q.isMatched())
523                                  break;        // unless slack < 2
524                          }
525 <                        LockSupport.unpark(w);
526 <                        return item;
525 >                        LockSupport.unpark(p.waiter);
526 >                        return this.<E>cast(item);
527                      }
528                  }
529 <                Node n = p.next;
530 <                p = p != n ? n : (h = head);  // Use head if p offlist
529 >                Node<E> n = p.next;
530 >                p = (p != n) ? n : (h = head); // Use head if p offlist
531              }
532  
533              if (how >= ASYNC) {               // No matches available
534                  if (s == null)
535 <                    s = new Node(e, haveData);
536 <                Node pred = tryAppend(s, haveData);
535 >                    s = new Node<E>(e, haveData);
536 >                Node<E> pred = tryAppend(s, haveData);
537                  if (pred == null)
538                      continue retry;           // lost race vs opposite mode
539                  if (how >= SYNC)
540 <                    return awaitMatch(pred, s, e, how, nanos);
540 >                    return awaitMatch(s, pred, e, how, nanos);
541              }
542              return e; // not waiting
543          }
544      }
545  
546      /**
547 <     * Tries to append node s as tail
548 <     * @param haveData true if appending in data mode
547 >     * Tries to append node s as tail.
548 >     *
549       * @param s the node to append
550 +     * @param haveData true if appending in data mode
551       * @return null on failure due to losing race with append in
552       * different mode, else s's predecessor, or s itself if no
553       * predecessor
554       */
555 <    private Node tryAppend(Node s, boolean haveData) {
556 <        for (Node t = tail, p = t;;) { // move p to actual tail and append
557 <            Node n, u;                        // temps for reads of next & tail
555 >    private Node<E> tryAppend(Node<E> s, boolean haveData) {
556 >        for (Node<E> t = tail, p = t;;) { // move p to last node and append
557 >            Node<E> n, u;                     // temps for reads of next & tail
558              if (p == null && (p = head) == null) {
559                  if (casHead(null, s))
560                      return s;                 // initialize
561              }
562              else if (p.cannotPrecede(haveData))
563                  return null;                  // lost race vs opposite mode
564 <            else if ((n = p.next) != null)    // Not tail; keep traversing
564 >            else if ((n = p.next) != null)    // not last; keep traversing
565                  p = p != t && t != (u = tail) ? (t = u) : // stale tail
566 <                    p != n ? n : null;        // restart if off list
566 >                    (p != n) ? n : null;      // restart if off list
567              else if (!p.casNext(null, s))
568                  p = p.next;                   // re-read on CAS failure
569              else {
570 <                if (p != t) {                 // Update if slack now >= 2
570 >                if (p != t) {                 // update if slack now >= 2
571                      while ((tail != t || !casTail(t, s)) &&
572                             (t = tail)   != null &&
573                             (s = t.next) != null && // advance and retry
# Line 540 | Line 581 | public class LinkedTransferQueue<E> exte
581      /**
582       * Spins/yields/blocks until node s is matched or caller gives up.
583       *
543     * @param pred the predecessor of s or s or null if none
584       * @param s the waiting node
585 +     * @param pred the predecessor of s, or s itself if it has no
586 +     * predecessor, or null if unknown (the null case does not occur
587 +     * in any current calls but may in possible future extensions)
588       * @param e the comparison value for checking match
589       * @param how either SYNC or TIMEOUT
590       * @param nanos timeout value
591       * @return matched item, or e if unmatched on interrupt or timeout
592       */
593 <    private Object awaitMatch(Node pred, Node s, Object e,
551 <                              int how, long nanos) {
593 >    private E awaitMatch(Node<E> s, Node<E> pred, E e, int how, long nanos) {
594          long lastTime = (how == TIMEOUT) ? System.nanoTime() : 0L;
595          Thread w = Thread.currentThread();
596          int spins = -1; // initialized after first item and cancel checks
# Line 557 | Line 599 | public class LinkedTransferQueue<E> exte
599          for (;;) {
600              Object item = s.item;
601              if (item != e) {                  // matched
602 +                assert item != s;
603                  s.forgetContents();           // avoid garbage
604 <                return item;
604 >                return this.<E>cast(item);
605              }
606              if ((w.isInterrupted() || (how == TIMEOUT && nanos <= 0)) &&
607 <                     s.casItem(e, s)) {       // cancel
607 >                    s.casItem(e, s)) {       // cancel
608                  unsplice(pred, s);
609                  return e;
610              }
# Line 570 | Line 613 | public class LinkedTransferQueue<E> exte
613                  if ((spins = spinsFor(pred, s.isData)) > 0)
614                      randomYields = ThreadLocalRandom.current();
615              }
616 <            else if (spins > 0) {             // spin, occasionally yield
617 <                if (randomYields.nextInt(FRONT_SPINS) == 0)
618 <                    Thread.yield();
619 <                --spins;
616 >            else if (spins > 0) {             // spin
617 >                if (--spins == 0)
618 >                    shortenHeadPath();        // reduce slack before blocking
619 >                else if (randomYields.nextInt(CHAINED_SPINS) == 0)
620 >                    Thread.yield();           // occasionally yield
621              }
622              else if (s.waiter == null) {
623 <                shortenHeadPath();            // reduce slack before blocking
580 <                s.waiter = w;                 // request unpark
623 >                s.waiter = w;                 // request unpark then recheck
624              }
625              else if (how == TIMEOUT) {
626                  long now = System.nanoTime();
# Line 587 | Line 630 | public class LinkedTransferQueue<E> exte
630              }
631              else {
632                  LockSupport.park(this);
633 +                s.waiter = null;
634                  spins = -1;                   // spin if front upon wakeup
635              }
636          }
637      }
638  
639      /**
640 <     * Return spin/yield value for a node with given predecessor and
640 >     * Returns spin/yield value for a node with given predecessor and
641       * data mode. See above for explanation.
642       */
643 <    private static int spinsFor(Node pred, boolean haveData) {
643 >    private static int spinsFor(Node<?> pred, boolean haveData) {
644          if (MP && pred != null) {
645 <            boolean predData = pred.isData;
646 <            if (predData != haveData)         // front and phase change
647 <                return FRONT_SPINS + (FRONT_SPINS >>> 1);
604 <            if (predData != (pred.item != null)) // probably at front
645 >            if (pred.isData != haveData)      // phase change
646 >                return FRONT_SPINS + CHAINED_SPINS;
647 >            if (pred.isMatched())             // probably at front
648                  return FRONT_SPINS;
649              if (pred.waiter == null)          // pred apparently spinning
650                  return CHAINED_SPINS;
# Line 614 | Line 657 | public class LinkedTransferQueue<E> exte
657       * or trailing node; failing on contention.
658       */
659      private void shortenHeadPath() {
660 <        Node h, hn, p, q;
660 >        Node<E> h, hn, p, q;
661          if ((p = h = head) != null && h.isMatched() &&
662              (q = hn = h.next) != null) {
663 <            Node n;
663 >            Node<E> n;
664              while ((n = q.next) != q) {
665                  if (n == null || !q.isMatched()) {
666                      if (hn != q && h.next == hn)
# Line 633 | Line 676 | public class LinkedTransferQueue<E> exte
676      /* -------------- Traversal methods -------------- */
677  
678      /**
679 <     * Return the first unmatched node of the given mode, or null if
679 >     * Returns the first unmatched node of the given mode, or null if
680       * none.  Used by methods isEmpty, hasWaitingConsumer.
681       */
682 <    private Node firstOfMode(boolean data) {
683 <        for (Node p = head; p != null; ) {
682 >    private Node<E> firstOfMode(boolean data) {
683 >        for (Node<E> p = head; p != null; ) {
684              if (!p.isMatched())
685 <                return p.isData == data? p : null;
686 <            Node n = p.next;
687 <            p = n != p ? n : head;
685 >                return (p.isData == data) ? p : null;
686 >            Node<E> n = p.next;
687 >            p = (n != p) ? n : head;
688          }
689          return null;
690      }
691  
692      /**
693       * Returns the item in the first unmatched node with isData; or
694 <     * null if none. Used by peek.
694 >     * null if none.  Used by peek.
695       */
696 <    private Object firstDataItem() {
697 <        for (Node p = head; p != null; ) {
696 >    private E firstDataItem() {
697 >        for (Node<E> p = head; p != null; ) {
698              boolean isData = p.isData;
699              Object item = p.item;
700              if (item != p && (item != null) == isData)
701 <                return isData ? item : null;
702 <            Node n = p.next;
703 <            p = n != p ? n : head;
701 >                return isData ? this.<E>cast(item) : null;
702 >            Node<E> n = p.next;
703 >            p = (n != p) ? n : head;
704          }
705          return null;
706      }
707  
708      /**
709 <     * Traverse and count nodes of the given mode.
710 <     * Used by methds size and getWaitingConsumerCount.
709 >     * Traverses and counts unmatched nodes of the given mode.
710 >     * Used by methods size and getWaitingConsumerCount.
711       */
712      private int countOfMode(boolean data) {
713          int count = 0;
714 <        for (Node p = head; p != null; ) {
714 >        for (Node<E> p = head; p != null; ) {
715              if (!p.isMatched()) {
716                  if (p.isData != data)
717                      return 0;
718                  if (++count == Integer.MAX_VALUE) // saturated
719                      break;
720              }
721 <            Node n = p.next;
721 >            Node<E> n = p.next;
722              if (n != p)
723                  p = n;
724              else {
# Line 687 | Line 730 | public class LinkedTransferQueue<E> exte
730      }
731  
732      final class Itr implements Iterator<E> {
733 <        private Node nextNode;   // next node to return item for
734 <        private Object nextItem; // the corresponding item
735 <        private Node lastRet;    // last returned node, to support remove
733 >        private Node<E> nextNode;   // next node to return item for
734 >        private E nextItem;         // the corresponding item
735 >        private Node<E> lastRet;    // last returned node, to support remove
736  
737          /**
738           * Moves to next node after prev, or first node if prev null.
739           */
740 <        private void advance(Node prev) {
740 >        private void advance(Node<E> prev) {
741              lastRet = prev;
742 <            Node p;
742 >            Node<E> p;
743              if (prev == null || (p = prev.next) == prev)
744                  p = head;
745              while (p != null) {
746                  Object item = p.item;
747                  if (p.isData) {
748                      if (item != null && item != p) {
749 <                        nextItem = item;
749 >                        nextItem = LinkedTransferQueue.this.<E>cast(item);
750                          nextNode = p;
751                          return;
752                      }
753                  }
754                  else if (item == null)
755                      break;
756 <                Node n = p.next;
757 <                p = n != p ? n : head;
756 >                Node<E> n = p.next;
757 >                p = (n != p) ? n : head;
758              }
759              nextNode = null;
760          }
# Line 725 | Line 768 | public class LinkedTransferQueue<E> exte
768          }
769  
770          public final E next() {
771 <            Node p = nextNode;
771 >            Node<E> p = nextNode;
772              if (p == null) throw new NoSuchElementException();
773 <            Object e = nextItem;
773 >            E e = nextItem;
774              advance(p);
775 <            return (E) e;
775 >            return e;
776          }
777  
778          public final void remove() {
779 <            Node p = lastRet;
779 >            Node<E> p = lastRet;
780              if (p == null) throw new IllegalStateException();
781              lastRet = null;
782              findAndRemoveNode(p);
# Line 749 | Line 792 | public class LinkedTransferQueue<E> exte
792       * @param pred predecessor of node to be unspliced
793       * @param s the node to be unspliced
794       */
795 <    private void unsplice(Node pred, Node s) {
795 >    private void unsplice(Node<E> pred, Node<E> s) {
796          s.forgetContents(); // clear unneeded fields
797          /*
798           * At any given time, exactly one node on list cannot be
799 <         * deleted -- the last inserted node. To accommodate this, if
800 <         * we cannot delete s, we save its predecessor as "cleanMe",
799 >         * unlinked -- the last inserted node. To accommodate this, if
800 >         * we cannot unlink s, we save its predecessor as "cleanMe",
801           * processing the previously saved version first. Because only
802           * one node in the list can have a null next, at least one of
803           * node s or the node previously saved can always be
# Line 762 | Line 805 | public class LinkedTransferQueue<E> exte
805           */
806          if (pred != null && pred != s) {
807              while (pred.next == s) {
808 <                Node oldpred = cleanMe == null? null : reclean();
809 <                Node n = s.next;
808 >                Node<E> oldpred = (cleanMe == null) ? null : reclean();
809 >                Node<E> n = s.next;
810                  if (n != null) {
811                      if (n != s)
812                          pred.casNext(s, n);
# Line 782 | Line 825 | public class LinkedTransferQueue<E> exte
825       *
826       * @return current cleanMe node (or null)
827       */
828 <    private Node reclean() {
828 >    private Node<E> reclean() {
829          /*
830           * cleanMe is, or at one time was, predecessor of a cancelled
831           * node s that was the tail so could not be unspliced.  If it
# Line 793 | Line 836 | public class LinkedTransferQueue<E> exte
836           * we can (must) clear cleanMe without unsplicing.  This can
837           * loop only due to contention.
838           */
839 <        Node pred;
839 >        Node<E> pred;
840          while ((pred = cleanMe) != null) {
841 <            Node s = pred.next;
842 <            Node n;
841 >            Node<E> s = pred.next;
842 >            Node<E> n;
843              if (s == null || s == pred || !s.isMatched())
844                  casCleanMe(pred, null); // already gone
845              else if ((n = s.next) != null) {
# Line 814 | Line 857 | public class LinkedTransferQueue<E> exte
857       * Main implementation of Iterator.remove(). Find
858       * and unsplice the given node.
859       */
860 <    final void findAndRemoveNode(Node s) {
860 >    final void findAndRemoveNode(Node<E> s) {
861          if (s.tryMatchData()) {
862 <            Node pred = null;
863 <            Node p = head;
862 >            Node<E> pred = null;
863 >            Node<E> p = head;
864              while (p != null) {
865                  if (p == s) {
866                      unsplice(pred, p);
# Line 839 | Line 882 | public class LinkedTransferQueue<E> exte
882       */
883      private boolean findAndRemove(Object e) {
884          if (e != null) {
885 <            Node pred = null;
886 <            Node p = head;
885 >            Node<E> pred = null;
886 >            Node<E> p = head;
887              while (p != null) {
888                  Object item = p.item;
889                  if (p.isData) {
# Line 989 | Line 1032 | public class LinkedTransferQueue<E> exte
1032      }
1033  
1034      public E take() throws InterruptedException {
1035 <        Object e = xfer(null, false, SYNC, 0);
1035 >        E e = xfer(null, false, SYNC, 0);
1036          if (e != null)
1037 <            return (E)e;
1037 >            return e;
1038          Thread.interrupted();
1039          throw new InterruptedException();
1040      }
1041  
1042      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1043 <        Object e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1043 >        E e = xfer(null, false, TIMEOUT, unit.toNanos(timeout));
1044          if (e != null || !Thread.interrupted())
1045 <            return (E)e;
1045 >            return e;
1046          throw new InterruptedException();
1047      }
1048  
1049      public E poll() {
1050 <        return (E)xfer(null, false, NOW, 0);
1050 >        return xfer(null, false, NOW, 0);
1051      }
1052  
1053      /**
# Line 1061 | Line 1104 | public class LinkedTransferQueue<E> exte
1104      }
1105  
1106      public E peek() {
1107 <        return (E) firstDataItem();
1107 >        return firstDataItem();
1108      }
1109  
1110      /**
# Line 1124 | Line 1167 | public class LinkedTransferQueue<E> exte
1167      }
1168  
1169      /**
1170 <     * Save the state to a stream (that is, serialize it).
1170 >     * Saves the state to a stream (that is, serializes it).
1171       *
1172       * @serialData All of the elements (each an {@code E}) in
1173       * the proper order, followed by a null
# Line 1140 | Line 1183 | public class LinkedTransferQueue<E> exte
1183      }
1184  
1185      /**
1186 <     * Reconstitute the Queue instance from a stream (that is,
1187 <     * deserialize it).
1186 >     * Reconstitutes the Queue instance from a stream (that is,
1187 >     * deserializes it).
1188       *
1189       * @param s the stream
1190       */
# Line 1157 | Line 1200 | public class LinkedTransferQueue<E> exte
1200          }
1201      }
1202  
1160
1203      // Unsafe mechanics
1204  
1205      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
# Line 1180 | Line 1222 | public class LinkedTransferQueue<E> exte
1222          }
1223      }
1224  
1225 <    private static sun.misc.Unsafe getUnsafe() {
1225 >    /**
1226 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1227 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
1228 >     * into a jdk.
1229 >     *
1230 >     * @return a sun.misc.Unsafe
1231 >     */
1232 >    static sun.misc.Unsafe getUnsafe() {
1233          try {
1234              return sun.misc.Unsafe.getUnsafe();
1235          } catch (SecurityException se) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines