244 |
|
* method outweighs the code bulk and maintenance problems of |
245 |
|
* using separate methods for each case. |
246 |
|
* |
247 |
< |
* Operation consists of up to three phases. The first is |
248 |
< |
* implemented within method xfer, the second in tryAppend, and |
249 |
< |
* the third in method awaitMatch. |
250 |
< |
* |
251 |
< |
* 1. Try to match an existing node |
252 |
< |
* |
253 |
< |
* Starting at head, skip already-matched nodes until finding |
254 |
< |
* an unmatched node of opposite mode, if one exists, in which |
255 |
< |
* case matching it and returning, also if necessary updating |
256 |
< |
* head to one past the matched node (or the node itself if the |
257 |
< |
* list has no other unmatched nodes). If the CAS misses, then |
258 |
< |
* a loop retries advancing head by two steps until either |
259 |
< |
* success or the slack is at most two. By requiring that each |
260 |
< |
* attempt advances head by two (if applicable), we ensure that |
261 |
< |
* the slack does not grow without bound. Traversals also check |
262 |
< |
* if the initial head is now off-list, in which case they |
263 |
< |
* restart at the new head. |
264 |
< |
* |
265 |
< |
* If no candidates are found and the call was untimed |
266 |
< |
* poll/offer (argument "how" is NOW), return. |
267 |
< |
* |
268 |
< |
* 2. Try to append a new node (method tryAppend) |
269 |
< |
* |
270 |
< |
* Starting at current tail pointer, find the actual last node |
271 |
< |
* and try to append a new node. Nodes can be appended only if |
272 |
< |
* their predecessors are either already matched or are of the |
273 |
< |
* same mode. If we detect otherwise, then a new node with |
274 |
< |
* opposite mode must have been appended during traversal, so |
275 |
< |
* we must restart at phase 1. The traversal and update steps |
276 |
< |
* are otherwise similar to phase 1: Retrying upon CAS misses |
277 |
< |
* and checking for staleness. In particular, if a self-link |
278 |
< |
* is encountered, then we can safely jump to a node on the |
279 |
< |
* list by continuing the traversal at current head. |
247 |
> |
* Operation consists of up to two phases. The first is implemented |
248 |
> |
* in method xfer, the second in method awaitMatch. |
249 |
|
* |
250 |
< |
* On successful append, if the call was ASYNC, return. |
250 |
> |
* 1. Traverse until matching or appending (method xfer) |
251 |
|
* |
252 |
< |
* 3. Await match or cancellation (method awaitMatch) |
252 |
> |
* Conceptually, we simply traverse all nodes starting from head. |
253 |
> |
* If we encounter an unmatched node of opposite mode, we match |
254 |
> |
* it and return, also updating head (by at least 2 hops) to |
255 |
> |
* one past the matched node (or the node itself if it's the |
256 |
> |
* pinned trailing node). Traversals also check for the |
257 |
> |
* possibility of falling off-list, in which case they restart. |
258 |
> |
* |
259 |
> |
* If the trailing node of the list is reached, a match is not |
260 |
> |
* possible. If this call was untimed poll or tryTransfer |
261 |
> |
* (argument "how" is NOW), return empty-handed immediately. |
262 |
> |
* Else a new node is CAS-appended. On successful append, if |
263 |
> |
* this call was ASYNC (e.g. offer), an element was |
264 |
> |
* successfully added to the end of the queue and we return. |
265 |
> |
* |
266 |
> |
* Of course, this naive traversal is O(n) when no match is |
267 |
> |
* possible. We optimize the traversal by maintaining a tail |
268 |
> |
* pointer, which is expected to be "near" the end of the list. |
269 |
> |
* It is only safe to fast-forward to tail (in the presence of |
270 |
> |
* arbitrary concurrent changes) if it is pointing to a node of |
271 |
> |
* the same mode, even if it is dead (in this case no preceding |
272 |
> |
* node could still be matchable by this traversal). If we |
273 |
> |
* need to restart due to falling off-list, we can again |
274 |
> |
* fast-forward to tail, but only if it has changed since the |
275 |
> |
* last traversal (else we might loop forever). If tail cannot |
276 |
> |
* be used, traversal starts at head (but in this case we |
277 |
> |
* expect to be able to match near head). As with head, we |
278 |
> |
* CAS-advance the tail pointer by at least two hops. |
279 |
> |
* |
280 |
> |
* 2. Await match or cancellation (method awaitMatch) |
281 |
|
* |
282 |
|
* Wait for another thread to match node; instead cancelling if |
283 |
|
* the current thread was interrupted or the wait timed out. On |
576 |
|
* @param c the first dead node |
577 |
|
* @param p the last dead node |
578 |
|
* @param q p.next: the next live node, or null if at end |
579 |
< |
* @return either old pred or p if pred dead or CAS failed |
579 |
> |
* @return pred if pred still alive and CAS succeeded; else p |
580 |
|
*/ |
581 |
|
private Node skipDeadNodes(Node pred, Node c, Node p, Node q) { |
582 |
|
// assert pred != c; |
594 |
|
} |
595 |
|
|
596 |
|
/** |
597 |
< |
* Collapses dead (matched) nodes between h and p. |
598 |
< |
* h was once head, and all nodes between h and p are dead. |
597 |
> |
* Collapses dead (matched) nodes from h (which was once head) to p. |
598 |
> |
* Caller ensures all nodes from h up to and including p are dead. |
599 |
|
*/ |
600 |
|
private void skipDeadNodesNearHead(Node h, Node p) { |
601 |
+ |
// assert h != null; |
602 |
|
// assert h != p; |
603 |
|
// assert p.isMatched(); |
604 |
< |
// find live or trailing node, starting at p |
605 |
< |
for (Node q; (q = p.next) != null; ) { |
606 |
< |
if (!q.isMatched()) { |
607 |
< |
p = q; |
608 |
< |
break; |
611 |
< |
} |
612 |
< |
if (p == (p = q)) |
613 |
< |
return; |
604 |
> |
for (;;) { |
605 |
> |
final Node q; |
606 |
> |
if ((q = p.next) == null) break; |
607 |
> |
else if (!q.isMatched()) { p = q; break; } |
608 |
> |
else if (p == (p = q)) return; |
609 |
|
} |
610 |
< |
if (h == HEAD.getAcquire(this) && casHead(h, p)) |
610 |
> |
if (casHead(h, p)) |
611 |
|
h.selfLink(); |
612 |
|
} |
613 |
|
|
628 |
|
* @return an item if matched, else e |
629 |
|
* @throws NullPointerException if haveData mode but e is null |
630 |
|
*/ |
631 |
+ |
@SuppressWarnings("unchecked") |
632 |
|
private E xfer(E e, boolean haveData, int how, long nanos) { |
633 |
|
if (haveData && (e == null)) |
634 |
|
throw new NullPointerException(); |
639 |
– |
Node s = null; // the node to append, if needed |
635 |
|
|
636 |
< |
restartFromHead: for (;;) { |
637 |
< |
for (Node h = head, p = h; p != null;) { // find & match first node |
638 |
< |
final boolean isData; |
639 |
< |
final Object item; |
640 |
< |
if (((item = p.item) != null) == (isData = p.isData)) { |
641 |
< |
// unmatched |
642 |
< |
if (isData == haveData) // can't match |
648 |
< |
break; |
636 |
> |
restart: for (Node s = null, t = null, h = null;;) { |
637 |
> |
for (Node p = (t != (t = tail) && t.isData == haveData) ? t |
638 |
> |
: (h = head);; ) { |
639 |
> |
final Node q; final Object item; |
640 |
> |
if (p.isData != haveData |
641 |
> |
&& haveData == ((item = p.item) == null)) { |
642 |
> |
if (h == null) h = head; |
643 |
|
if (p.tryMatch(item, e)) { |
650 |
– |
// collapse at least 2 |
644 |
|
if (h != p) skipDeadNodesNearHead(h, p); |
645 |
< |
@SuppressWarnings("unchecked") E itemE = (E) item; |
653 |
< |
return itemE; |
645 |
> |
return (E) item; |
646 |
|
} |
647 |
|
} |
648 |
< |
if (p == (p = p.next)) |
649 |
< |
continue restartFromHead; |
650 |
< |
} |
651 |
< |
|
652 |
< |
if (how != NOW) { // No matches available |
653 |
< |
if (s == null) |
654 |
< |
s = new Node(e); |
663 |
< |
Node pred = tryAppend(s, haveData); |
664 |
< |
if (pred == null) |
665 |
< |
continue restartFromHead; // lost race vs opposite mode |
666 |
< |
if (how != ASYNC) |
667 |
< |
return awaitMatch(s, pred, e, (how == TIMED), nanos); |
668 |
< |
} |
669 |
< |
return e; // not waiting |
670 |
< |
} |
671 |
< |
} |
672 |
< |
|
673 |
< |
/** |
674 |
< |
* Tries to append node s as tail. |
675 |
< |
* |
676 |
< |
* @param s the node to append |
677 |
< |
* @param haveData true if appending in data mode |
678 |
< |
* @return null on failure due to losing race with append in |
679 |
< |
* different mode, else s's predecessor |
680 |
< |
*/ |
681 |
< |
private Node tryAppend(Node s, boolean haveData) { |
682 |
< |
// assert head != null; |
683 |
< |
// assert tail != null; |
684 |
< |
// assert s.isData == haveData; |
685 |
< |
for (Node t = tail, p = t;;) { // move p to last node and append |
686 |
< |
Node n; |
687 |
< |
if (p.cannotPrecede(haveData)) |
688 |
< |
return null; // lost race vs opposite mode |
689 |
< |
else if ((n = p.next) != null) // not last; keep traversing |
690 |
< |
p = (p != t && t != (t = tail)) ? t : // stale tail |
691 |
< |
(p != n) ? n : head; // restart if off list |
692 |
< |
else if (!p.casNext(null, s)) |
693 |
< |
p = p.next; // re-read on CAS failure |
694 |
< |
else { |
695 |
< |
if (p != t) { // update if slack now >= 2 |
696 |
< |
while ((tail != t || !casTail(t, s)) && |
697 |
< |
(t = tail) != null && |
698 |
< |
(s = t.next) != null && // advance and retry |
699 |
< |
(s = s.next) != null && s != t); |
648 |
> |
if ((q = p.next) == null) { |
649 |
> |
if (how == NOW) return e; |
650 |
> |
if (s == null) s = new Node(e); |
651 |
> |
if (!p.casNext(null, s)) continue; |
652 |
> |
if (p != t) casTail(t, s); |
653 |
> |
if (how == ASYNC) return e; |
654 |
> |
return awaitMatch(s, p, e, (how == TIMED), nanos); |
655 |
|
} |
656 |
< |
return p; |
656 |
> |
if (p == (p = q)) continue restart; |
657 |
|
} |
658 |
|
} |
659 |
|
} |