ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.152 by jsr166, Tue Jan 17 02:44:59 2017 UTC vs.
Revision 1.153 by jsr166, Wed Jan 18 23:40:58 2017 UTC

# Line 244 | Line 244 | public class LinkedTransferQueue<E> exte
244       * method outweighs the code bulk and maintenance problems of
245       * using separate methods for each case.
246       *
247 <     * Operation consists of up to three phases. The first is
248 <     * implemented within method xfer, the second in tryAppend, and
249 <     * the third in method awaitMatch.
250 <     *
251 <     * 1. Try to match an existing node
252 <     *
253 <     *    Starting at head, skip already-matched nodes until finding
254 <     *    an unmatched node of opposite mode, if one exists, in which
255 <     *    case matching it and returning, also if necessary updating
256 <     *    head to one past the matched node (or the node itself if the
257 <     *    list has no other unmatched nodes). If the CAS misses, then
258 <     *    a loop retries advancing head by two steps until either
259 <     *    success or the slack is at most two. By requiring that each
260 <     *    attempt advances head by two (if applicable), we ensure that
261 <     *    the slack does not grow without bound. Traversals also check
262 <     *    if the initial head is now off-list, in which case they
263 <     *    restart at the new head.
264 <     *
265 <     *    If no candidates are found and the call was untimed
266 <     *    poll/offer (argument "how" is NOW), return.
267 <     *
268 <     * 2. Try to append a new node (method tryAppend)
269 <     *
270 <     *    Starting at current tail pointer, find the actual last node
271 <     *    and try to append a new node. Nodes can be appended only if
272 <     *    their predecessors are either already matched or are of the
273 <     *    same mode. If we detect otherwise, then a new node with
274 <     *    opposite mode must have been appended during traversal, so
275 <     *    we must restart at phase 1. The traversal and update steps
276 <     *    are otherwise similar to phase 1: Retrying upon CAS misses
277 <     *    and checking for staleness.  In particular, if a self-link
278 <     *    is encountered, then we can safely jump to a node on the
279 <     *    list by continuing the traversal at current head.
247 >     * Operation consists of up to two phases. The first is implemented
248 >     * in method xfer, the second in method awaitMatch.
249       *
250 <     *    On successful append, if the call was ASYNC, return.
250 >     * 1. Traverse until matching or appending (method xfer)
251       *
252 <     * 3. Await match or cancellation (method awaitMatch)
252 >     *    Conceptually, we simply traverse all nodes starting from head.
253 >     *    If we encounter an unmatched node of opposite mode, we match
254 >     *    it and return, also updating head (by at least 2 hops) to
255 >     *    one past the matched node (or the node itself if it's the
256 >     *    pinned trailing node).  Traversals also check for the
257 >     *    possibility of falling off-list, in which case they restart.
258 >     *
259 >     *    If the trailing node of the list is reached, a match is not
260 >     *    possible.  If this call was untimed poll or tryTransfer
261 >     *    (argument "how" is NOW), return empty-handed immediately.
262 >     *    Else a new node is CAS-appended.  On successful append, if
263 >     *    this call was ASYNC (e.g. offer), an element was
264 >     *    successfully added to the end of the queue and we return.
265 >     *
266 >     *    Of course, this naive traversal is O(n) when no match is
267 >     *    possible.  We optimize the traversal by maintaining a tail
268 >     *    pointer, which is expected to be "near" the end of the list.
269 >     *    It is only safe to fast-forward to tail (in the presence of
270 >     *    arbitrary concurrent changes) if it is pointing to a node of
271 >     *    the same mode, even if it is dead (in this case no preceding
272 >     *    node could still be matchable by this traversal).  If we
273 >     *    need to restart due to falling off-list, we can again
274 >     *    fast-forward to tail, but only if it has changed since the
275 >     *    last traversal (else we might loop forever).  If tail cannot
276 >     *    be used, traversal starts at head (but in this case we
277 >     *    expect to be able to match near head).  As with head, we
278 >     *    CAS-advance the tail pointer by at least two hops.
279 >     *
280 >     * 2. Await match or cancellation (method awaitMatch)
281       *
282       *    Wait for another thread to match node; instead cancelling if
283       *    the current thread was interrupted or the wait timed out. On
# Line 579 | Line 576 | public class LinkedTransferQueue<E> exte
576       * @param c the first dead node
577       * @param p the last dead node
578       * @param q p.next: the next live node, or null if at end
579 <     * @return either old pred or p if pred dead or CAS failed
579 >     * @return pred if pred still alive and CAS succeeded; else p
580       */
581      private Node skipDeadNodes(Node pred, Node c, Node p, Node q) {
582          // assert pred != c;
# Line 597 | Line 594 | public class LinkedTransferQueue<E> exte
594      }
595  
596      /**
597 <     * Collapses dead (matched) nodes between h and p.
598 <     * h was once head, and all nodes between h and p are dead.
597 >     * Collapses dead (matched) nodes from h (which was once head) to p.
598 >     * Caller ensures all nodes from h up to and including p are dead.
599       */
600      private void skipDeadNodesNearHead(Node h, Node p) {
601 +        // assert h != null;
602          // assert h != p;
603          // assert p.isMatched();
604 <        // find live or trailing node, starting at p
605 <        for (Node q; (q = p.next) != null; ) {
606 <            if (!q.isMatched()) {
607 <                p = q;
608 <                break;
611 <            }
612 <            if (p == (p = q))
613 <                return;
604 >        for (;;) {
605 >            final Node q;
606 >            if ((q = p.next) == null) break;
607 >            else if (!q.isMatched()) { p = q; break; }
608 >            else if (p == (p = q)) return;
609          }
610 <        if (h == HEAD.getAcquire(this) && casHead(h, p))
610 >        if (casHead(h, p))
611              h.selfLink();
612      }
613  
# Line 633 | Line 628 | public class LinkedTransferQueue<E> exte
628       * @return an item if matched, else e
629       * @throws NullPointerException if haveData mode but e is null
630       */
631 +    @SuppressWarnings("unchecked")
632      private E xfer(E e, boolean haveData, int how, long nanos) {
633          if (haveData && (e == null))
634              throw new NullPointerException();
639        Node s = null;                        // the node to append, if needed
635  
636 <        restartFromHead: for (;;) {
637 <            for (Node h = head, p = h; p != null;) { // find & match first node
638 <                final boolean isData;
639 <                final Object item;
640 <                if (((item = p.item) != null) == (isData = p.isData)) {
641 <                    // unmatched
642 <                    if (isData == haveData)   // can't match
648 <                        break;
636 >        restart: for (Node s = null, t = null, h = null;;) {
637 >            for (Node p = (t != (t = tail) && t.isData == haveData) ? t
638 >                     : (h = head);; ) {
639 >                final Node q; final Object item;
640 >                if (p.isData != haveData
641 >                    && haveData == ((item = p.item) == null)) {
642 >                    if (h == null) h = head;
643                      if (p.tryMatch(item, e)) {
650                        // collapse at least 2
644                          if (h != p) skipDeadNodesNearHead(h, p);
645 <                        @SuppressWarnings("unchecked") E itemE = (E) item;
653 <                        return itemE;
645 >                        return (E) item;
646                      }
647                  }
648 <                if (p == (p = p.next))
649 <                    continue restartFromHead;
650 <            }
651 <
652 <            if (how != NOW) {                 // No matches available
653 <                if (s == null)
654 <                    s = new Node(e);
663 <                Node pred = tryAppend(s, haveData);
664 <                if (pred == null)
665 <                    continue restartFromHead; // lost race vs opposite mode
666 <                if (how != ASYNC)
667 <                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
668 <            }
669 <            return e; // not waiting
670 <        }
671 <    }
672 <
673 <    /**
674 <     * Tries to append node s as tail.
675 <     *
676 <     * @param s the node to append
677 <     * @param haveData true if appending in data mode
678 <     * @return null on failure due to losing race with append in
679 <     * different mode, else s's predecessor
680 <     */
681 <    private Node tryAppend(Node s, boolean haveData) {
682 <        // assert head != null;
683 <        // assert tail != null;
684 <        // assert s.isData == haveData;
685 <        for (Node t = tail, p = t;;) {        // move p to last node and append
686 <            Node n;
687 <            if (p.cannotPrecede(haveData))
688 <                return null;                  // lost race vs opposite mode
689 <            else if ((n = p.next) != null)    // not last; keep traversing
690 <                p = (p != t && t != (t = tail)) ? t : // stale tail
691 <                    (p != n) ? n : head;      // restart if off list
692 <            else if (!p.casNext(null, s))
693 <                p = p.next;                   // re-read on CAS failure
694 <            else {
695 <                if (p != t) {                 // update if slack now >= 2
696 <                    while ((tail != t || !casTail(t, s)) &&
697 <                           (t = tail)   != null &&
698 <                           (s = t.next) != null && // advance and retry
699 <                           (s = s.next) != null && s != t);
648 >                if ((q = p.next) == null) {
649 >                    if (how == NOW) return e;
650 >                    if (s == null) s = new Node(e);
651 >                    if (!p.casNext(null, s)) continue;
652 >                    if (p != t) casTail(t, s);
653 >                    if (how == ASYNC) return e;
654 >                    return awaitMatch(s, p, e, (how == TIMED), nanos);
655                  }
656 <                return p;
656 >                if (p == (p = q)) continue restart;
657              }
658          }
659      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines