ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.15 by jsr166, Mon Nov 2 19:06:43 2009 UTC vs.
Revision 1.16 by dl, Sat Nov 14 20:27:25 2009 UTC

# Line 204 | Line 204 | public class LinkedTransferQueue<E> exte
204       * additional GC bookkeeping ("write barriers") that are sometimes
205       * more costly than the writes themselves because of contention).
206       *
207     * Removal of interior nodes (due to timed out or interrupted
208     * waits, or calls to remove(x) or Iterator.remove) can use a
209     * scheme roughly similar to that described in Scherer, Lea, and
210     * Scott's SynchronousQueue. Given a predecessor, we can unsplice
211     * any node except the (actual) tail of the queue. To avoid
212     * build-up of cancelled trailing nodes, upon a request to remove
213     * a trailing node, it is placed in field "cleanMe" to be
214     * unspliced upon the next call to unsplice any other node.
215     * Situations needing such mechanics are not common but do occur
216     * in practice; for example when an unbounded series of short
217     * timed calls to poll repeatedly time out but never otherwise
218     * fall off the list because of an untimed call to take at the
219     * front of the queue. Note that maintaining field cleanMe does
220     * not otherwise much impact garbage retention even if never
221     * cleared by some other call because the held node will
222     * eventually either directly or indirectly lead to a self-link
223     * once off the list.
224     *
207       * *** Overview of implementation ***
208       *
209       * We use a threshold-based approach to updates, with a slack
# Line 237 | Line 219 | public class LinkedTransferQueue<E> exte
219       * per-thread one available, but even ThreadLocalRandom is too
220       * heavy for these purposes.
221       *
222 <     * With such a small slack threshold value, it is rarely
223 <     * worthwhile to augment this with path short-circuiting; i.e.,
224 <     * unsplicing nodes between head and the first unmatched node, or
225 <     * similarly for tail, rather than advancing head or tail
244 <     * proper. However, it is used (in awaitMatch) immediately before
245 <     * a waiting thread starts to block, as a final bit of helping at
246 <     * a point when contention with others is extremely unlikely
247 <     * (since if other threads that could release it are operating,
248 <     * then the current thread wouldn't be blocking).
222 >     * With such a small slack threshold value, it is not worthwhile
223 >     * to augment this with path short-circuiting (i.e., unsplicing
224 >     * interior nodes) except in the case of cancellation/removal (see
225 >     * below).
226       *
227       * We allow both the head and tail fields to be null before any
228       * nodes are enqueued; initializing upon first append.  This
# Line 327 | Line 304 | public class LinkedTransferQueue<E> exte
304       *    versa) compared to their predecessors receive additional
305       *    chained spins, reflecting longer paths typically required to
306       *    unblock threads during phase changes.
307 +     *
308 +     *
309 +     * ** Unlinking removed interior nodes **
310 +     *
311 +     * In addition to minimizing garbage retention via self-linking
312 +     * described above, we also unlink removed interior nodes. These
313 +     * may arise due to timed out or interrupted waits, or calls to
314 +     * remove(x) or Iterator.remove.  Normally, given a node that was
315 +     * at one time known to be the predecessor of some node s that is
316 +     * to be removed, we can unsplice s by CASing the next field of
317 +     * its predecessor if it still points to s (otherwise s must
318 +     * already have been removed or is now offlist). But there are two
319 +     * situations in which we cannot guarantee to make node s
320 +     * unreachable in this way: (1) If s is the trailing node of list
321 +     * (i.e., with null next), then it is pinned as the target node
322 +     * for appends, so can only be removed later when other nodes are
323 +     * appended. (2) We cannot necessarily unlink s given a
324 +     * predecessor node that is matched (including the case of being
325 +     * cancelled): the predecessor may already be already unspliced,
326 +     * in which case some previous reachable node may still point to
327 +     * s.  (For further explanation see Herlihy & Shavit "The Art of
328 +     * Multiprocessor Programming" chapter 9).  Although, in both
329 +     * cases, we can rule out the need for further action if either s
330 +     * or its predecessor are (or can be made to be) at, or fall off
331 +     * from, the head of list.
332 +     *
333 +     * Without taking these into account, it would be possible for an
334 +     * unbounded number of supposedly removed nodes to remain
335 +     * reachable.  Situations leading to such buildup are uncommon but
336 +     * can occur in practice; for example when a series of short timed
337 +     * calls to poll repeatedly time out but never otherwise fall off
338 +     * the list because of an untimed call to take at the front of the
339 +     * queue.
340 +     *
341 +     * When these cases arise, rather than always retraversing the
342 +     * entire list to find an actual predecessor to unlink (which
343 +     * won't help for case (1) anyway), we record a conservative
344 +     * estimate of possible unsplice failures (in "sweepVotes).  We
345 +     * trigger a full sweep when the estimate exceeds a threshold
346 +     * indicating the maximum number of estimated removal failures to
347 +     * tolerate before sweeping through, unlinking cancelled nodes
348 +     * that were not unlinked upon initial removal. We perform sweeps
349 +     * by the thread hitting threshold (rather than background threads
350 +     * or by spreading work to other threads) because in the main
351 +     * contexts in which removal occurs, the caller is already
352 +     * timed-out, cancelled, or performing a potentially O(n)
353 +     * operation (i.e., remove(x)), none of which are time-critical
354 +     * enough to warrant the overhead that alternatives would impose
355 +     * on other threads.
356 +     *
357 +     * Because the sweepVotes estimate is conservative, and because
358 +     * nodes become unlinked "naturally" as they fall off the head of
359 +     * the queue, and because we allow votes to accumulate even while
360 +     * sweeps are in progress, there are typically signficantly fewer
361 +     * such nodes than estimated.  Choice of a threshold value
362 +     * balances the likelihood of wasted effort and contention, versus
363 +     * providing a worst-case bound on retention of interior nodes in
364 +     * quiescent queues. The value defined below was chosen
365 +     * empirically to balance these under various timeout scenarios.
366 +     *
367 +     * Note that we cannot self-link unlinked interior nodes during
368 +     * sweeps. However, the associated garbage chains terminate when
369 +     * some successor ultimately falls off the head of the list and is
370 +     * self-linked.
371       */
372  
373      /** True if on multiprocessor */
# Line 353 | Line 394 | public class LinkedTransferQueue<E> exte
394      private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
395  
396      /**
397 +     * The maximum number of estimated removal failures (sweepVotes)
398 +     * to tolerate before sweeping through the queue unlinking
399 +     * cancelled nodes that were not unlinked upon initial
400 +     * removal. See above for explanation. The value must be at least
401 +     * two to avoid useless sweeps when removing trailing nodes.
402 +     */
403 +    static final int SWEEP_THRESHOLD = 32;
404 +
405 +    /**
406       * Queue nodes. Uses Object, not E, for items to allow forgetting
407       * them after use.  Relies heavily on Unsafe mechanics to minimize
408 <     * unnecessary ordering constraints: Writes that intrinsically
409 <     * precede or follow CASes use simple relaxed forms.  Other
360 <     * cleanups use releasing/lazy writes.
408 >     * unnecessary ordering constraints: Writes that are intrinsically
409 >     * ordered wrt other accesses or CASes use simple relaxed forms.
410       */
411      static final class Node {
412          final boolean isData;   // false if this is a request node
# Line 393 | Line 442 | public class LinkedTransferQueue<E> exte
442          }
443  
444          /**
445 <         * Sets item to self (using a releasing/lazy write) and waiter
446 <         * to null, to avoid garbage retention after extracting or
447 <         * cancelling.
445 >         * Sets item to self and waiter to null, to avoid garbage
446 >         * retention after matching or cancelling. Uses relaxed writes
447 >         * bacause order is already constrained in the only calling
448 >         * contexts: item is forgotten only after volatile/atomic
449 >         * mechanics that extract items.  Similarly, clearing waiter
450 >         * follows either CAS or return from park (if ever parked;
451 >         * else we don't care).
452           */
453          final void forgetContents() {
454 <            UNSAFE.putOrderedObject(this, itemOffset, this);
455 <            UNSAFE.putOrderedObject(this, waiterOffset, null);
454 >            UNSAFE.putObject(this, itemOffset, this);
455 >            UNSAFE.putObject(this, waiterOffset, null);
456          }
457  
458          /**
# Line 457 | Line 510 | public class LinkedTransferQueue<E> exte
510      /** head of the queue; null until first enqueue */
511      transient volatile Node head;
512  
460    /** predecessor of dangling unspliceable node */
461    private transient volatile Node cleanMe; // decl here reduces contention
462
513      /** tail of the queue; null until first append */
514      private transient volatile Node tail;
515  
516 +    /** The number of apparent failures to unsplice removed nodes */
517 +    private transient volatile int sweepVotes;
518 +
519      // CAS methods for fields
520      private boolean casTail(Node cmp, Node val) {
521          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
# Line 472 | Line 525 | public class LinkedTransferQueue<E> exte
525          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
526      }
527  
528 <    private boolean casCleanMe(Node cmp, Node val) {
529 <        return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
528 >    private boolean casSweepVotes(int cmp, int val) {
529 >        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
530      }
531  
532      /*
# Line 515 | Line 568 | public class LinkedTransferQueue<E> exte
568                          break;
569                      if (p.casItem(item, e)) { // match
570                          for (Node q = p; q != h;) {
571 <                            Node n = q.next;  // update head by 2
572 <                            if (n != null)    // unless singleton
520 <                                q = n;
521 <                            if (head == h && casHead(h, q)) {
571 >                            Node n = q.next;  // update by 2 unless singleton
572 >                            if (head == h && casHead(h, n == null? q : n)) {
573                                  h.forgetNext();
574                                  break;
575                              }                 // advance and retry
# Line 608 | Line 659 | public class LinkedTransferQueue<E> exte
659                  return this.<E>cast(item);
660              }
661              if ((w.isInterrupted() || (timed && nanos <= 0)) &&
662 <                    s.casItem(e, s)) {       // cancel
662 >                    s.casItem(e, s)) {        // cancel
663                  unsplice(pred, s);
664                  return e;
665              }
# Line 618 | Line 669 | public class LinkedTransferQueue<E> exte
669                      randomYields = ThreadLocalRandom.current();
670              }
671              else if (spins > 0) {             // spin
672 <                if (--spins == 0)
673 <                    shortenHeadPath();        // reduce slack before blocking
623 <                else if (randomYields.nextInt(CHAINED_SPINS) == 0)
672 >                --spins;
673 >                if (randomYields.nextInt(CHAINED_SPINS) == 0)
674                      Thread.yield();           // occasionally yield
675              }
676              else if (s.waiter == null) {
# Line 634 | Line 684 | public class LinkedTransferQueue<E> exte
684              }
685              else {
686                  LockSupport.park(this);
637                s.waiter = null;
638                spins = -1;                   // spin if front upon wakeup
687              }
688          }
689      }
# Line 656 | Line 704 | public class LinkedTransferQueue<E> exte
704          return 0;
705      }
706  
659    /**
660     * Tries (once) to unsplice nodes between head and first unmatched
661     * or trailing node; failing on contention.
662     */
663    private void shortenHeadPath() {
664        Node h, hn, p, q;
665        if ((p = h = head) != null && h.isMatched() &&
666            (q = hn = h.next) != null) {
667            Node n;
668            while ((n = q.next) != q) {
669                if (n == null || !q.isMatched()) {
670                    if (hn != q && h.next == hn)
671                        h.casNext(hn, q);
672                    break;
673                }
674                p = q;
675                q = n;
676            }
677        }
678    }
679
707      /* -------------- Traversal methods -------------- */
708  
709      /**
# Line 789 | Line 816 | public class LinkedTransferQueue<E> exte
816          public final void remove() {
817              Node p = lastRet;
818              if (p == null) throw new IllegalStateException();
819 <            findAndRemoveDataNode(lastPred, p);
819 >            if (p.tryMatchData())
820 >                unsplice(lastPred, p);
821          }
822      }
823  
# Line 799 | Line 827 | public class LinkedTransferQueue<E> exte
827       * Unsplices (now or later) the given deleted/cancelled node with
828       * the given predecessor.
829       *
830 <     * @param pred predecessor of node to be unspliced
830 >     * @param pred a node that was at one time known to be the
831 >     * predecessor of s, or null or s itself if s is/was at head
832       * @param s the node to be unspliced
833       */
834 <    private void unsplice(Node pred, Node s) {
835 <        s.forgetContents(); // clear unneeded fields
834 >    final void unsplice(Node pred, Node s) {
835 >        s.forgetContents(); // forget unneeded fields
836          /*
837 <         * At any given time, exactly one node on list cannot be
838 <         * unlinked -- the last inserted node. To accommodate this, if
839 <         * we cannot unlink s, we save its predecessor as "cleanMe",
840 <         * processing the previously saved version first. Because only
841 <         * one node in the list can have a null next, at least one of
813 <         * node s or the node previously saved can always be
814 <         * processed, so this always terminates.
837 >         * See above for rationale. Briefly: if pred still points to
838 >         * s, try to unlink s.  If s cannot be unlinked, because it is
839 >         * trailing node or pred might be unlinked, and neither pred
840 >         * nor s are head or offlist, add to sweepVotes, and if enough
841 >         * votes have accumulated, sweep.
842           */
843 <        if (pred != null && pred != s) {
844 <            while (pred.next == s) {
845 <                Node oldpred = (cleanMe == null) ? null : reclean();
846 <                Node n = s.next;
847 <                if (n != null) {
848 <                    if (n != s)
849 <                        pred.casNext(s, n);
850 <                    break;
843 >        if (pred != null && pred != s && pred.next == s) {
844 >            Node n = s.next;
845 >            if (n == null ||
846 >                (n != s && pred.casNext(s, n) && pred.isMatched())) {
847 >                for (;;) {               // check if at, or could be, head
848 >                    Node h = head;
849 >                    if (h == pred || h == s || h == null)
850 >                        return;          // at head or list empty
851 >                    if (!h.isMatched())
852 >                        break;
853 >                    Node hn = h.next;
854 >                    if (hn == null)
855 >                        return;          // now empty
856 >                    if (hn != h && casHead(h, hn))
857 >                        h.forgetNext();  // advance head
858                  }
859 <                if (oldpred == pred ||      // Already saved
860 <                    ((oldpred == null || oldpred.next == s) &&
861 <                     casCleanMe(oldpred, pred))) {
862 <                    break;
859 >                if (pred.next != pred && s.next != s) { // recheck if offlist
860 >                    for (;;) {           // sweep now if enough votes
861 >                        int v = sweepVotes;
862 >                        if (v < SWEEP_THRESHOLD) {
863 >                            if (casSweepVotes(v, v + 1))
864 >                                break;
865 >                        }
866 >                        else if (casSweepVotes(v, 0)) {
867 >                            sweep();
868 >                            break;
869 >                        }
870 >                    }
871                  }
872              }
873          }
874      }
875  
876      /**
877 <     * Tries to unsplice the deleted/cancelled node held in cleanMe
836 <     * that was previously uncleanable because it was at tail.
837 <     *
838 <     * @return current cleanMe node (or null)
877 >     * Unlink matched nodes encountered in a traversal from head
878       */
879 <    private Node reclean() {
880 <        /*
881 <         * cleanMe is, or at one time was, predecessor of a cancelled
882 <         * node s that was the tail so could not be unspliced.  If it
883 <         * is no longer the tail, try to unsplice if necessary and
884 <         * make cleanMe slot available.  This differs from similar
885 <         * code in unsplice() because we must check that pred still
847 <         * points to a matched node that can be unspliced -- if not,
848 <         * we can (must) clear cleanMe without unsplicing.  This can
849 <         * loop only due to contention.
850 <         */
851 <        Node pred;
852 <        while ((pred = cleanMe) != null) {
853 <            Node s = pred.next;
854 <            Node n;
855 <            if (s == null || s == pred || !s.isMatched())
856 <                casCleanMe(pred, null); // already gone
857 <            else if ((n = s.next) != null) {
858 <                if (n != s)
859 <                    pred.casNext(s, n);
860 <                casCleanMe(pred, null);
861 <            }
879 >    private void sweep() {
880 >        Node p = head, s, n;
881 >        while (p != null && (s = p.next) != null && (n = s.next) != null) {
882 >            if (p == s || s == n)
883 >                p = head; // stale
884 >            else if (s.isMatched())
885 >                p.casNext(s, n);
886              else
887 <                break;
864 <        }
865 <        return pred;
866 <    }
867 <
868 <    /**
869 <     * Main implementation of Iterator.remove(). Finds
870 <     * and unsplices the given data node.
871 <     *
872 <     * @param possiblePred possible predecessor of s
873 <     * @param s the node to remove
874 <     */
875 <    final void findAndRemoveDataNode(Node possiblePred, Node s) {
876 <        assert s.isData;
877 <        if (s.tryMatchData()) {
878 <            if (possiblePred != null && possiblePred.next == s)
879 <                unsplice(possiblePred, s); // was actual predecessor
880 <            else {
881 <                for (Node pred = null, p = head; p != null; ) {
882 <                    if (p == s) {
883 <                        unsplice(pred, p);
884 <                        break;
885 <                    }
886 <                    if (p.isUnmatchedRequest())
887 <                        break;
888 <                    pred = p;
889 <                    if ((p = p.next) == pred) { // stale
890 <                        pred = null;
891 <                        p = head;
892 <                    }
893 <                }
894 <            }
887 >                p = s;
888          }
889      }
890  
# Line 1223 | Line 1216 | public class LinkedTransferQueue<E> exte
1216          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1217      private static final long tailOffset =
1218          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1219 <    private static final long cleanMeOffset =
1220 <        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
1219 >    private static final long sweepVotesOffset =
1220 >        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1221  
1222      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1223                                    String field, Class<?> klazz) {
# Line 1237 | Line 1230 | public class LinkedTransferQueue<E> exte
1230              throw error;
1231          }
1232      }
1240
1233   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines