[cvs] / jsr166 / src / main / java / util / concurrent / LinkedTransferQueue.java Repository:
ViewVC logotype

Diff of /jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.15, Mon Nov 2 19:06:43 2009 UTC revision 1.16, Sat Nov 14 20:27:25 2009 UTC
# Line 204  Line 204 
204       * additional GC bookkeeping ("write barriers") that are sometimes       * additional GC bookkeeping ("write barriers") that are sometimes
205       * more costly than the writes themselves because of contention).       * more costly than the writes themselves because of contention).
206       *       *
      * Removal of interior nodes (due to timed out or interrupted  
      * waits, or calls to remove(x) or Iterator.remove) can use a  
      * scheme roughly similar to that described in Scherer, Lea, and  
      * Scott's SynchronousQueue. Given a predecessor, we can unsplice  
      * any node except the (actual) tail of the queue. To avoid  
      * build-up of cancelled trailing nodes, upon a request to remove  
      * a trailing node, it is placed in field "cleanMe" to be  
      * unspliced upon the next call to unsplice any other node.  
      * Situations needing such mechanics are not common but do occur  
      * in practice; for example when an unbounded series of short  
      * timed calls to poll repeatedly time out but never otherwise  
      * fall off the list because of an untimed call to take at the  
      * front of the queue. Note that maintaining field cleanMe does  
      * not otherwise much impact garbage retention even if never  
      * cleared by some other call because the held node will  
      * eventually either directly or indirectly lead to a self-link  
      * once off the list.  
      *  
207       * *** Overview of implementation ***       * *** Overview of implementation ***
208       *       *
209       * We use a threshold-based approach to updates, with a slack       * We use a threshold-based approach to updates, with a slack
# Line 237  Line 219 
219       * per-thread one available, but even ThreadLocalRandom is too       * per-thread one available, but even ThreadLocalRandom is too
220       * heavy for these purposes.       * heavy for these purposes.
221       *       *
222       * With such a small slack threshold value, it is rarely       * With such a small slack threshold value, it is not worthwhile
223       * worthwhile to augment this with path short-circuiting; i.e.,       * to augment this with path short-circuiting (i.e., unsplicing
224       * unsplicing nodes between head and the first unmatched node, or       * interior nodes) except in the case of cancellation/removal (see
225       * similarly for tail, rather than advancing head or tail       * below).
      * proper. However, it is used (in awaitMatch) immediately before  
      * a waiting thread starts to block, as a final bit of helping at  
      * a point when contention with others is extremely unlikely  
      * (since if other threads that could release it are operating,  
      * then the current thread wouldn't be blocking).  
226       *       *
227       * We allow both the head and tail fields to be null before any       * We allow both the head and tail fields to be null before any
228       * nodes are enqueued; initializing upon first append.  This       * nodes are enqueued; initializing upon first append.  This
# Line 327  Line 304 
304       *    versa) compared to their predecessors receive additional       *    versa) compared to their predecessors receive additional
305       *    chained spins, reflecting longer paths typically required to       *    chained spins, reflecting longer paths typically required to
306       *    unblock threads during phase changes.       *    unblock threads during phase changes.
307         *
308         *
309         * ** Unlinking removed interior nodes **
310         *
311         * In addition to minimizing garbage retention via self-linking
312         * described above, we also unlink removed interior nodes. These
313         * may arise due to timed out or interrupted waits, or calls to
314         * remove(x) or Iterator.remove.  Normally, given a node that was
315         * at one time known to be the predecessor of some node s that is
316         * to be removed, we can unsplice s by CASing the next field of
317         * its predecessor if it still points to s (otherwise s must
318         * already have been removed or is now offlist). But there are two
319         * situations in which we cannot guarantee to make node s
320         * unreachable in this way: (1) If s is the trailing node of list
321         * (i.e., with null next), then it is pinned as the target node
322         * for appends, so can only be removed later when other nodes are
323         * appended. (2) We cannot necessarily unlink s given a
324         * predecessor node that is matched (including the case of being
325         * cancelled): the predecessor may already be already unspliced,
326         * in which case some previous reachable node may still point to
327         * s.  (For further explanation see Herlihy & Shavit "The Art of
328         * Multiprocessor Programming" chapter 9).  Although, in both
329         * cases, we can rule out the need for further action if either s
330         * or its predecessor are (or can be made to be) at, or fall off
331         * from, the head of list.
332         *
333         * Without taking these into account, it would be possible for an
334         * unbounded number of supposedly removed nodes to remain
335         * reachable.  Situations leading to such buildup are uncommon but
336         * can occur in practice; for example when a series of short timed
337         * calls to poll repeatedly time out but never otherwise fall off
338         * the list because of an untimed call to take at the front of the
339         * queue.
340         *
341         * When these cases arise, rather than always retraversing the
342         * entire list to find an actual predecessor to unlink (which
343         * won't help for case (1) anyway), we record a conservative
344         * estimate of possible unsplice failures (in "sweepVotes).  We
345         * trigger a full sweep when the estimate exceeds a threshold
346         * indicating the maximum number of estimated removal failures to
347         * tolerate before sweeping through, unlinking cancelled nodes
348         * that were not unlinked upon initial removal. We perform sweeps
349         * by the thread hitting threshold (rather than background threads
350         * or by spreading work to other threads) because in the main
351         * contexts in which removal occurs, the caller is already
352         * timed-out, cancelled, or performing a potentially O(n)
353         * operation (i.e., remove(x)), none of which are time-critical
354         * enough to warrant the overhead that alternatives would impose
355         * on other threads.
356         *
357         * Because the sweepVotes estimate is conservative, and because
358         * nodes become unlinked "naturally" as they fall off the head of
359         * the queue, and because we allow votes to accumulate even while
360         * sweeps are in progress, there are typically signficantly fewer
361         * such nodes than estimated.  Choice of a threshold value
362         * balances the likelihood of wasted effort and contention, versus
363         * providing a worst-case bound on retention of interior nodes in
364         * quiescent queues. The value defined below was chosen
365         * empirically to balance these under various timeout scenarios.
366         *
367         * Note that we cannot self-link unlinked interior nodes during
368         * sweeps. However, the associated garbage chains terminate when
369         * some successor ultimately falls off the head of the list and is
370         * self-linked.
371       */       */
372    
373      /** True if on multiprocessor */      /** True if on multiprocessor */
# Line 353  Line 394 
394      private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;      private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
395    
396      /**      /**
397         * The maximum number of estimated removal failures (sweepVotes)
398         * to tolerate before sweeping through the queue unlinking
399         * cancelled nodes that were not unlinked upon initial
400         * removal. See above for explanation. The value must be at least
401         * two to avoid useless sweeps when removing trailing nodes.
402         */
403        static final int SWEEP_THRESHOLD = 32;
404    
405        /**
406       * Queue nodes. Uses Object, not E, for items to allow forgetting       * Queue nodes. Uses Object, not E, for items to allow forgetting
407       * them after use.  Relies heavily on Unsafe mechanics to minimize       * them after use.  Relies heavily on Unsafe mechanics to minimize
408       * unnecessary ordering constraints: Writes that intrinsically       * unnecessary ordering constraints: Writes that are intrinsically
409       * precede or follow CASes use simple relaxed forms.  Other       * ordered wrt other accesses or CASes use simple relaxed forms.
      * cleanups use releasing/lazy writes.  
410       */       */
411      static final class Node {      static final class Node {
412          final boolean isData;   // false if this is a request node          final boolean isData;   // false if this is a request node
# Line 393  Line 442 
442          }          }
443    
444          /**          /**
445           * Sets item to self (using a releasing/lazy write) and waiter           * Sets item to self and waiter to null, to avoid garbage
446           * to null, to avoid garbage retention after extracting or           * retention after matching or cancelling. Uses relaxed writes
447           * cancelling.           * bacause order is already constrained in the only calling
448             * contexts: item is forgotten only after volatile/atomic
449             * mechanics that extract items.  Similarly, clearing waiter
450             * follows either CAS or return from park (if ever parked;
451             * else we don't care).
452           */           */
453          final void forgetContents() {          final void forgetContents() {
454              UNSAFE.putOrderedObject(this, itemOffset, this);              UNSAFE.putObject(this, itemOffset, this);
455              UNSAFE.putOrderedObject(this, waiterOffset, null);              UNSAFE.putObject(this, waiterOffset, null);
456          }          }
457    
458          /**          /**
# Line 457  Line 510 
510      /** head of the queue; null until first enqueue */      /** head of the queue; null until first enqueue */
511      transient volatile Node head;      transient volatile Node head;
512    
     /** predecessor of dangling unspliceable node */  
     private transient volatile Node cleanMe; // decl here reduces contention  
   
513      /** tail of the queue; null until first append */      /** tail of the queue; null until first append */
514      private transient volatile Node tail;      private transient volatile Node tail;
515    
516        /** The number of apparent failures to unsplice removed nodes */
517        private transient volatile int sweepVotes;
518    
519      // CAS methods for fields      // CAS methods for fields
520      private boolean casTail(Node cmp, Node val) {      private boolean casTail(Node cmp, Node val) {
521          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);          return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
# Line 472  Line 525 
525          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);          return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
526      }      }
527    
528      private boolean casCleanMe(Node cmp, Node val) {      private boolean casSweepVotes(int cmp, int val) {
529          return UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);          return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
530      }      }
531    
532      /*      /*
# Line 515  Line 568 
568                          break;                          break;
569                      if (p.casItem(item, e)) { // match                      if (p.casItem(item, e)) { // match
570                          for (Node q = p; q != h;) {                          for (Node q = p; q != h;) {
571                              Node n = q.next;  // update head by 2                              Node n = q.next;  // update by 2 unless singleton
572                              if (n != null)    // unless singleton                              if (head == h && casHead(h, n == null? q : n)) {
                                 q = n;  
                             if (head == h && casHead(h, q)) {  
573                                  h.forgetNext();                                  h.forgetNext();
574                                  break;                                  break;
575                              }                 // advance and retry                              }                 // advance and retry
# Line 618  Line 669 
669                      randomYields = ThreadLocalRandom.current();                      randomYields = ThreadLocalRandom.current();
670              }              }
671              else if (spins > 0) {             // spin              else if (spins > 0) {             // spin
672                  if (--spins == 0)                  --spins;
673                      shortenHeadPath();        // reduce slack before blocking                  if (randomYields.nextInt(CHAINED_SPINS) == 0)
                 else if (randomYields.nextInt(CHAINED_SPINS) == 0)  
674                      Thread.yield();           // occasionally yield                      Thread.yield();           // occasionally yield
675              }              }
676              else if (s.waiter == null) {              else if (s.waiter == null) {
# Line 634  Line 684 
684              }              }
685              else {              else {
686                  LockSupport.park(this);                  LockSupport.park(this);
                 s.waiter = null;  
                 spins = -1;                   // spin if front upon wakeup  
687              }              }
688          }          }
689      }      }
# Line 656  Line 704 
704          return 0;          return 0;
705      }      }
706    
     /**  
      * Tries (once) to unsplice nodes between head and first unmatched  
      * or trailing node; failing on contention.  
      */  
     private void shortenHeadPath() {  
         Node h, hn, p, q;  
         if ((p = h = head) != null && h.isMatched() &&  
             (q = hn = h.next) != null) {  
             Node n;  
             while ((n = q.next) != q) {  
                 if (n == null || !q.isMatched()) {  
                     if (hn != q && h.next == hn)  
                         h.casNext(hn, q);  
                     break;  
                 }  
                 p = q;  
                 q = n;  
             }  
         }  
     }  
   
707      /* -------------- Traversal methods -------------- */      /* -------------- Traversal methods -------------- */
708    
709      /**      /**
# Line 789  Line 816 
816          public final void remove() {          public final void remove() {
817              Node p = lastRet;              Node p = lastRet;
818              if (p == null) throw new IllegalStateException();              if (p == null) throw new IllegalStateException();
819              findAndRemoveDataNode(lastPred, p);              if (p.tryMatchData())
820                    unsplice(lastPred, p);
821          }          }
822      }      }
823    
# Line 799  Line 827 
827       * Unsplices (now or later) the given deleted/cancelled node with       * Unsplices (now or later) the given deleted/cancelled node with
828       * the given predecessor.       * the given predecessor.
829       *       *
830       * @param pred predecessor of node to be unspliced       * @param pred a node that was at one time known to be the
831         * predecessor of s, or null or s itself if s is/was at head
832       * @param s the node to be unspliced       * @param s the node to be unspliced
833       */       */
834      private void unsplice(Node pred, Node s) {      final void unsplice(Node pred, Node s) {
835          s.forgetContents(); // clear unneeded fields          s.forgetContents(); // forget unneeded fields
836          /*          /*
837           * At any given time, exactly one node on list cannot be           * See above for rationale. Briefly: if pred still points to
838           * unlinked -- the last inserted node. To accommodate this, if           * s, try to unlink s.  If s cannot be unlinked, because it is
839           * we cannot unlink s, we save its predecessor as "cleanMe",           * trailing node or pred might be unlinked, and neither pred
840           * processing the previously saved version first. Because only           * nor s are head or offlist, add to sweepVotes, and if enough
841           * one node in the list can have a null next, at least one of           * votes have accumulated, sweep.
842           * node s or the node previously saved can always be           */
843           * processed, so this always terminates.          if (pred != null && pred != s && pred.next == s) {
          */  
         if (pred != null && pred != s) {  
             while (pred.next == s) {  
                 Node oldpred = (cleanMe == null) ? null : reclean();  
844                  Node n = s.next;                  Node n = s.next;
845                  if (n != null) {              if (n == null ||
846                      if (n != s)                  (n != s && pred.casNext(s, n) && pred.isMatched())) {
847                          pred.casNext(s, n);                  for (;;) {               // check if at, or could be, head
848                        Node h = head;
849                        if (h == pred || h == s || h == null)
850                            return;          // at head or list empty
851                        if (!h.isMatched())
852                            break;
853                        Node hn = h.next;
854                        if (hn == null)
855                            return;          // now empty
856                        if (hn != h && casHead(h, hn))
857                            h.forgetNext();  // advance head
858                    }
859                    if (pred.next != pred && s.next != s) { // recheck if offlist
860                        for (;;) {           // sweep now if enough votes
861                            int v = sweepVotes;
862                            if (v < SWEEP_THRESHOLD) {
863                                if (casSweepVotes(v, v + 1))
864                      break;                      break;
865                  }                  }
866                  if (oldpred == pred ||      // Already saved                          else if (casSweepVotes(v, 0)) {
867                      ((oldpred == null || oldpred.next == s) &&                              sweep();
                      casCleanMe(oldpred, pred))) {  
868                      break;                      break;
869                  }                  }
870              }              }
871          }          }
872      }      }
   
     /**  
      * Tries to unsplice the deleted/cancelled node held in cleanMe  
      * that was previously uncleanable because it was at tail.  
      *  
      * @return current cleanMe node (or null)  
      */  
     private Node reclean() {  
         /*  
          * cleanMe is, or at one time was, predecessor of a cancelled  
          * node s that was the tail so could not be unspliced.  If it  
          * is no longer the tail, try to unsplice if necessary and  
          * make cleanMe slot available.  This differs from similar  
          * code in unsplice() because we must check that pred still  
          * points to a matched node that can be unspliced -- if not,  
          * we can (must) clear cleanMe without unsplicing.  This can  
          * loop only due to contention.  
          */  
         Node pred;  
         while ((pred = cleanMe) != null) {  
             Node s = pred.next;  
             Node n;  
             if (s == null || s == pred || !s.isMatched())  
                 casCleanMe(pred, null); // already gone  
             else if ((n = s.next) != null) {  
                 if (n != s)  
                     pred.casNext(s, n);  
                 casCleanMe(pred, null);  
             }  
             else  
                 break;  
873          }          }
         return pred;  
874      }      }
875    
876      /**      /**
877       * Main implementation of Iterator.remove(). Finds       * Unlink matched nodes encountered in a traversal from head
      * and unsplices the given data node.  
      *  
      * @param possiblePred possible predecessor of s  
      * @param s the node to remove  
878       */       */
879      final void findAndRemoveDataNode(Node possiblePred, Node s) {      private void sweep() {
880          assert s.isData;          Node p = head, s, n;
881          if (s.tryMatchData()) {          while (p != null && (s = p.next) != null && (n = s.next) != null) {
882              if (possiblePred != null && possiblePred.next == s)              if (p == s || s == n)
883                  unsplice(possiblePred, s); // was actual predecessor                  p = head; // stale
884              else {              else if (s.isMatched())
885                  for (Node pred = null, p = head; p != null; ) {                  p.casNext(s, n);
886                      if (p == s) {              else
887                          unsplice(pred, p);                  p = s;
                         break;  
                     }  
                     if (p.isUnmatchedRequest())  
                         break;  
                     pred = p;  
                     if ((p = p.next) == pred) { // stale  
                         pred = null;  
                         p = head;  
                     }  
                 }  
             }  
888          }          }
889      }      }
890    
# Line 1223  Line 1216 
1216          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);          objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1217      private static final long tailOffset =      private static final long tailOffset =
1218          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);          objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1219      private static final long cleanMeOffset =      private static final long sweepVotesOffset =
1220          objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);          objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1221    
1222      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,      static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1223                                    String field, Class<?> klazz) {                                    String field, Class<?> klazz) {
# Line 1237  Line 1230 
1230              throw error;              throw error;
1231          }          }
1232      }      }
   
1233  }  }

Legend:
Removed from v.1.15  
changed lines
  Added in v.1.16

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8