[cvs] / jsr166 / src / main / java / util / concurrent / LinkedTransferQueue.java Repository:
ViewVC logotype

Diff of /jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.152, Tue Jan 17 02:44:59 2017 UTC revision 1.153, Wed Jan 18 23:40:58 2017 UTC
# Line 244  Line 244 
244       * method outweighs the code bulk and maintenance problems of       * method outweighs the code bulk and maintenance problems of
245       * using separate methods for each case.       * using separate methods for each case.
246       *       *
247       * Operation consists of up to three phases. The first is       * Operation consists of up to two phases. The first is implemented
248       * implemented within method xfer, the second in tryAppend, and       * in method xfer, the second in method awaitMatch.
      * the third in method awaitMatch.  
      *  
      * 1. Try to match an existing node  
      *  
      *    Starting at head, skip already-matched nodes until finding  
      *    an unmatched node of opposite mode, if one exists, in which  
      *    case matching it and returning, also if necessary updating  
      *    head to one past the matched node (or the node itself if the  
      *    list has no other unmatched nodes). If the CAS misses, then  
      *    a loop retries advancing head by two steps until either  
      *    success or the slack is at most two. By requiring that each  
      *    attempt advances head by two (if applicable), we ensure that  
      *    the slack does not grow without bound. Traversals also check  
      *    if the initial head is now off-list, in which case they  
      *    restart at the new head.  
      *  
      *    If no candidates are found and the call was untimed  
      *    poll/offer (argument "how" is NOW), return.  
      *  
      * 2. Try to append a new node (method tryAppend)  
      *  
      *    Starting at current tail pointer, find the actual last node  
      *    and try to append a new node. Nodes can be appended only if  
      *    their predecessors are either already matched or are of the  
      *    same mode. If we detect otherwise, then a new node with  
      *    opposite mode must have been appended during traversal, so  
      *    we must restart at phase 1. The traversal and update steps  
      *    are otherwise similar to phase 1: Retrying upon CAS misses  
      *    and checking for staleness.  In particular, if a self-link  
      *    is encountered, then we can safely jump to a node on the  
      *    list by continuing the traversal at current head.  
249       *       *
250       *    On successful append, if the call was ASYNC, return.       * 1. Traverse until matching or appending (method xfer)
251       *       *
252       * 3. Await match or cancellation (method awaitMatch)       *    Conceptually, we simply traverse all nodes starting from head.
253         *    If we encounter an unmatched node of opposite mode, we match
254         *    it and return, also updating head (by at least 2 hops) to
255         *    one past the matched node (or the node itself if it's the
256         *    pinned trailing node).  Traversals also check for the
257         *    possibility of falling off-list, in which case they restart.
258         *
259         *    If the trailing node of the list is reached, a match is not
260         *    possible.  If this call was untimed poll or tryTransfer
261         *    (argument "how" is NOW), return empty-handed immediately.
262         *    Else a new node is CAS-appended.  On successful append, if
263         *    this call was ASYNC (e.g. offer), an element was
264         *    successfully added to the end of the queue and we return.
265         *
266         *    Of course, this naive traversal is O(n) when no match is
267         *    possible.  We optimize the traversal by maintaining a tail
268         *    pointer, which is expected to be "near" the end of the list.
269         *    It is only safe to fast-forward to tail (in the presence of
270         *    arbitrary concurrent changes) if it is pointing to a node of
271         *    the same mode, even if it is dead (in this case no preceding
272         *    node could still be matchable by this traversal).  If we
273         *    need to restart due to falling off-list, we can again
274         *    fast-forward to tail, but only if it has changed since the
275         *    last traversal (else we might loop forever).  If tail cannot
276         *    be used, traversal starts at head (but in this case we
277         *    expect to be able to match near head).  As with head, we
278         *    CAS-advance the tail pointer by at least two hops.
279         *
280         * 2. Await match or cancellation (method awaitMatch)
281       *       *
282       *    Wait for another thread to match node; instead cancelling if       *    Wait for another thread to match node; instead cancelling if
283       *    the current thread was interrupted or the wait timed out. On       *    the current thread was interrupted or the wait timed out. On
# Line 579  Line 576 
576       * @param c the first dead node       * @param c the first dead node
577       * @param p the last dead node       * @param p the last dead node
578       * @param q p.next: the next live node, or null if at end       * @param q p.next: the next live node, or null if at end
579       * @return either old pred or p if pred dead or CAS failed       * @return pred if pred still alive and CAS succeeded; else p
580       */       */
581      private Node skipDeadNodes(Node pred, Node c, Node p, Node q) {      private Node skipDeadNodes(Node pred, Node c, Node p, Node q) {
582          // assert pred != c;          // assert pred != c;
# Line 597  Line 594 
594      }      }
595    
596      /**      /**
597       * Collapses dead (matched) nodes between h and p.       * Collapses dead (matched) nodes from h (which was once head) to p.
598       * h was once head, and all nodes between h and p are dead.       * Caller ensures all nodes from h up to and including p are dead.
599       */       */
600      private void skipDeadNodesNearHead(Node h, Node p) {      private void skipDeadNodesNearHead(Node h, Node p) {
601            // assert h != null;
602          // assert h != p;          // assert h != p;
603          // assert p.isMatched();          // assert p.isMatched();
604          // find live or trailing node, starting at p          for (;;) {
605          for (Node q; (q = p.next) != null; ) {              final Node q;
606              if (!q.isMatched()) {              if ((q = p.next) == null) break;
607                  p = q;              else if (!q.isMatched()) { p = q; break; }
608                  break;              else if (p == (p = q)) return;
             }  
             if (p == (p = q))  
                 return;  
609          }          }
610          if (h == HEAD.getAcquire(this) && casHead(h, p))          if (casHead(h, p))
611              h.selfLink();              h.selfLink();
612      }      }
613    
# Line 633  Line 628 
628       * @return an item if matched, else e       * @return an item if matched, else e
629       * @throws NullPointerException if haveData mode but e is null       * @throws NullPointerException if haveData mode but e is null
630       */       */
631        @SuppressWarnings("unchecked")
632      private E xfer(E e, boolean haveData, int how, long nanos) {      private E xfer(E e, boolean haveData, int how, long nanos) {
633          if (haveData && (e == null))          if (haveData && (e == null))
634              throw new NullPointerException();              throw new NullPointerException();
         Node s = null;                        // the node to append, if needed  
635    
636          restartFromHead: for (;;) {          restart: for (Node s = null, t = null, h = null;;) {
637              for (Node h = head, p = h; p != null;) { // find & match first node              for (Node p = (t != (t = tail) && t.isData == haveData) ? t
638                  final boolean isData;                       : (h = head);; ) {
639                  final Object item;                  final Node q; final Object item;
640                  if (((item = p.item) != null) == (isData = p.isData)) {                  if (p.isData != haveData
641                      // unmatched                      && haveData == ((item = p.item) == null)) {
642                      if (isData == haveData)   // can't match                      if (h == null) h = head;
                         break;  
643                      if (p.tryMatch(item, e)) {                      if (p.tryMatch(item, e)) {
                         // collapse at least 2  
644                          if (h != p) skipDeadNodesNearHead(h, p);                          if (h != p) skipDeadNodesNearHead(h, p);
645                          @SuppressWarnings("unchecked") E itemE = (E) item;                          return (E) item;
                         return itemE;  
                     }  
                 }  
                 if (p == (p = p.next))  
                     continue restartFromHead;  
646              }              }
   
             if (how != NOW) {                 // No matches available  
                 if (s == null)  
                     s = new Node(e);  
                 Node pred = tryAppend(s, haveData);  
                 if (pred == null)  
                     continue restartFromHead; // lost race vs opposite mode  
                 if (how != ASYNC)  
                     return awaitMatch(s, pred, e, (how == TIMED), nanos);  
             }  
             return e; // not waiting  
647          }          }
648                    if ((q = p.next) == null) {
649                        if (how == NOW) return e;
650                        if (s == null) s = new Node(e);
651                        if (!p.casNext(null, s)) continue;
652                        if (p != t) casTail(t, s);
653                        if (how == ASYNC) return e;
654                        return awaitMatch(s, p, e, (how == TIMED), nanos);
655      }      }
656                    if (p == (p = q)) continue restart;
     /**  
      * Tries to append node s as tail.  
      *  
      * @param s the node to append  
      * @param haveData true if appending in data mode  
      * @return null on failure due to losing race with append in  
      * different mode, else s's predecessor  
      */  
     private Node tryAppend(Node s, boolean haveData) {  
         // assert head != null;  
         // assert tail != null;  
         // assert s.isData == haveData;  
         for (Node t = tail, p = t;;) {        // move p to last node and append  
             Node n;  
             if (p.cannotPrecede(haveData))  
                 return null;                  // lost race vs opposite mode  
             else if ((n = p.next) != null)    // not last; keep traversing  
                 p = (p != t && t != (t = tail)) ? t : // stale tail  
                     (p != n) ? n : head;      // restart if off list  
             else if (!p.casNext(null, s))  
                 p = p.next;                   // re-read on CAS failure  
             else {  
                 if (p != t) {                 // update if slack now >= 2  
                     while ((tail != t || !casTail(t, s)) &&  
                            (t = tail)   != null &&  
                            (s = t.next) != null && // advance and retry  
                            (s = s.next) != null && s != t);  
                 }  
                 return p;  
657              }              }
658          }          }
659      }      }

Legend:
Removed from v.1.152  
changed lines
  Added in v.1.153

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8