130 |
|
* correctly perform enqueue and dequeue operations by traversing |
131 |
|
* from a pointer to the initial node; CASing the item of the |
132 |
|
* first unmatched node on match and CASing the next field of the |
133 |
< |
* trailing node on appends. (Plus some special-casing when |
134 |
< |
* initially empty). While this would be a terrible idea in |
135 |
< |
* itself, it does have the benefit of not requiring ANY atomic |
133 |
> |
* trailing node on appends. While this would be a terrible idea |
134 |
> |
* in itself, it does have the benefit of not requiring ANY atomic |
135 |
|
* updates on head/tail fields. |
136 |
|
* |
137 |
|
* We introduce here an approach that lies between the extremes of |
237 |
|
* interior nodes) except in the case of cancellation/removal (see |
238 |
|
* below). |
239 |
|
* |
241 |
– |
* We allow both the head and tail fields to be null before any |
242 |
– |
* nodes are enqueued; initializing upon first append. This |
243 |
– |
* simplifies some other logic, as well as providing more |
244 |
– |
* efficient explicit control paths instead of letting JVMs insert |
245 |
– |
* implicit NullPointerExceptions when they are null. While not |
246 |
– |
* currently fully implemented, we also leave open the possibility |
247 |
– |
* of re-nulling these fields when empty (which is complicated to |
248 |
– |
* arrange, for little benefit.) |
249 |
– |
* |
240 |
|
* All enqueue/dequeue operations are handled by the single method |
241 |
|
* "xfer" with parameters indicating whether to act as some form |
242 |
|
* of offer, put, poll, take, or transfer (each possibly with |
268 |
|
* 2. Try to append a new node (method tryAppend) |
269 |
|
* |
270 |
|
* Starting at current tail pointer, find the actual last node |
271 |
< |
* and try to append a new node (or if head was null, establish |
272 |
< |
* the first node). Nodes can be appended only if their |
273 |
< |
* predecessors are either already matched or are of the same |
274 |
< |
* mode. If we detect otherwise, then a new node with opposite |
275 |
< |
* mode must have been appended during traversal, so we must |
276 |
< |
* restart at phase 1. The traversal and update steps are |
277 |
< |
* otherwise similar to phase 1: Retrying upon CAS misses and |
278 |
< |
* checking for staleness. In particular, if a self-link is |
279 |
< |
* encountered, then we can safely jump to a node on the list |
290 |
< |
* by continuing the traversal at current head. |
271 |
> |
* and try to append a new node. Nodes can be appended only if |
272 |
> |
* their predecessors are either already matched or are of the |
273 |
> |
* same mode. If we detect otherwise, then a new node with |
274 |
> |
* opposite mode must have been appended during traversal, so |
275 |
> |
* we must restart at phase 1. The traversal and update steps |
276 |
> |
* are otherwise similar to phase 1: Retrying upon CAS misses |
277 |
> |
* and checking for staleness. In particular, if a self-link |
278 |
> |
* is encountered, then we can safely jump to a node on the |
279 |
> |
* list by continuing the traversal at current head. |
280 |
|
* |
281 |
|
* On successful append, if the call was ASYNC, return. |
282 |
|
* |
429 |
|
} |
430 |
|
|
431 |
|
/** |
432 |
< |
* Constructs a new node. Uses relaxed write because item can |
433 |
< |
* only be seen after publication via casNext. |
432 |
> |
* Constructs a data node holding item if item is non-null, |
433 |
> |
* else a request node. Uses relaxed write because item can |
434 |
> |
* only be seen after piggy-backing publication via CAS. |
435 |
|
*/ |
436 |
|
Node(Object item) { |
437 |
|
ITEM.set(this, item); |
438 |
|
isData = (item != null); |
439 |
|
} |
440 |
|
|
441 |
+ |
/** Constructs a dead (matched data) dummy node. */ |
442 |
+ |
Node() { |
443 |
+ |
isData = true; |
444 |
+ |
} |
445 |
+ |
|
446 |
|
/** |
447 |
|
* Links node to itself to avoid garbage retention. Called |
448 |
|
* only after CASing head field, so uses relaxed write. |
451 |
|
NEXT.setRelease(this, this); |
452 |
|
} |
453 |
|
|
454 |
+ |
final void appendRelaxed(Node next) { |
455 |
+ |
// assert next != null; |
456 |
+ |
// assert this.next == null; |
457 |
+ |
NEXT.set(this, next); |
458 |
+ |
} |
459 |
+ |
|
460 |
|
/** |
461 |
|
* Sets item (of a request node) to self and waiter to null, |
462 |
|
* to avoid garbage retention after matching or cancelling. |
506 |
|
} |
507 |
|
|
508 |
|
private static final long serialVersionUID = -3375979862319811754L; |
508 |
– |
|
509 |
– |
// VarHandle mechanics |
510 |
– |
private static final VarHandle ITEM; |
511 |
– |
private static final VarHandle NEXT; |
512 |
– |
private static final VarHandle WAITER; |
513 |
– |
static { |
514 |
– |
try { |
515 |
– |
MethodHandles.Lookup l = MethodHandles.lookup(); |
516 |
– |
ITEM = l.findVarHandle(Node.class, "item", Object.class); |
517 |
– |
NEXT = l.findVarHandle(Node.class, "next", Node.class); |
518 |
– |
WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); |
519 |
– |
} catch (ReflectiveOperationException e) { |
520 |
– |
throw new Error(e); |
521 |
– |
} |
522 |
– |
} |
509 |
|
} |
510 |
|
|
511 |
< |
/** head of the queue; null until first enqueue */ |
511 |
> |
/** |
512 |
> |
* A node from which the first live (non-matched) node (if any) |
513 |
> |
* can be reached in O(1) time. |
514 |
> |
* Invariants: |
515 |
> |
* - all live nodes are reachable from head via .next |
516 |
> |
* - head != null |
517 |
> |
* - (tmp = head).next != tmp || tmp != head |
518 |
> |
* Non-invariants: |
519 |
> |
* - head may or may not be live |
520 |
> |
* - it is permitted for tail to lag behind head, that is, for tail |
521 |
> |
* to not be reachable from head! |
522 |
> |
*/ |
523 |
|
transient volatile Node head; |
524 |
|
|
525 |
< |
/** tail of the queue; null until first append */ |
525 |
> |
/** |
526 |
> |
* A node from which the last node on list (that is, the unique |
527 |
> |
* node with node.next == null) can be reached in O(1) time. |
528 |
> |
* Invariants: |
529 |
> |
* - the last node is always reachable from tail via .next |
530 |
> |
* - tail != null |
531 |
> |
* Non-invariants: |
532 |
> |
* - tail may or may not be live |
533 |
> |
* - it is permitted for tail to lag behind head, that is, for tail |
534 |
> |
* to not be reachable from head! |
535 |
> |
* - tail.next may or may not be self-linked. |
536 |
> |
*/ |
537 |
|
private transient volatile Node tail; |
538 |
|
|
539 |
|
/** The number of apparent failures to unsplice removed nodes */ |
540 |
|
private transient volatile int sweepVotes; |
541 |
|
|
542 |
|
private boolean casTail(Node cmp, Node val) { |
543 |
+ |
// assert cmp != null; |
544 |
+ |
// assert val != null; |
545 |
|
return TAIL.compareAndSet(this, cmp, val); |
546 |
|
} |
547 |
|
|
665 |
|
* predecessor |
666 |
|
*/ |
667 |
|
private Node tryAppend(Node s, boolean haveData) { |
668 |
+ |
// assert head != null; |
669 |
+ |
// assert tail != null; |
670 |
|
for (Node t = tail, p = t;;) { // move p to last node and append |
671 |
|
Node n, u; // temps for reads of next & tail |
672 |
< |
if (p == null && (p = head) == null) { |
673 |
< |
if (casHead(null, s)) |
662 |
< |
return s; // initialize |
663 |
< |
} |
672 |
> |
if (p == null) |
673 |
> |
p = head; |
674 |
|
else if (p.cannotPrecede(haveData)) |
675 |
|
return null; // lost race vs opposite mode |
676 |
|
else if ((n = p.next) != null) // not last; keep traversing |
1233 |
|
* Creates an initially empty {@code LinkedTransferQueue}. |
1234 |
|
*/ |
1235 |
|
public LinkedTransferQueue() { |
1236 |
+ |
head = tail = new Node(); |
1237 |
|
} |
1238 |
|
|
1239 |
|
/** |
1246 |
|
* of its elements are null |
1247 |
|
*/ |
1248 |
|
public LinkedTransferQueue(Collection<? extends E> c) { |
1249 |
< |
this(); |
1250 |
< |
addAll(c); |
1249 |
> |
Node h = null, t = null; |
1250 |
> |
for (E e : c) { |
1251 |
> |
Node newNode = new Node(Objects.requireNonNull(e)); |
1252 |
> |
if (h == null) |
1253 |
> |
h = t = newNode; |
1254 |
> |
else |
1255 |
> |
t.appendRelaxed(t = newNode); |
1256 |
> |
} |
1257 |
> |
if (h == null) |
1258 |
> |
h = t = new Node(); |
1259 |
> |
head = h; |
1260 |
> |
tail = t; |
1261 |
|
} |
1262 |
|
|
1263 |
|
/** |
1601 |
|
*/ |
1602 |
|
private void readObject(java.io.ObjectInputStream s) |
1603 |
|
throws java.io.IOException, ClassNotFoundException { |
1604 |
< |
s.defaultReadObject(); |
1605 |
< |
for (;;) { |
1604 |
> |
|
1605 |
> |
// Read in elements until trailing null sentinel found |
1606 |
> |
Node h = null, t = null; |
1607 |
> |
for (Object item; (item = s.readObject()) != null; ) { |
1608 |
|
@SuppressWarnings("unchecked") |
1609 |
< |
E item = (E) s.readObject(); |
1610 |
< |
if (item == null) |
1611 |
< |
break; |
1609 |
> |
Node newNode = new Node((E) item); |
1610 |
> |
if (h == null) |
1611 |
> |
h = t = newNode; |
1612 |
|
else |
1613 |
< |
offer(item); |
1613 |
> |
t.appendRelaxed(t = newNode); |
1614 |
|
} |
1615 |
+ |
if (h == null) |
1616 |
+ |
h = t = new Node(); |
1617 |
+ |
head = h; |
1618 |
+ |
tail = t; |
1619 |
|
} |
1620 |
|
|
1621 |
|
/** |
1728 |
|
private static final VarHandle HEAD; |
1729 |
|
private static final VarHandle TAIL; |
1730 |
|
private static final VarHandle SWEEPVOTES; |
1731 |
+ |
static final VarHandle ITEM; |
1732 |
+ |
static final VarHandle NEXT; |
1733 |
+ |
static final VarHandle WAITER; |
1734 |
|
static { |
1735 |
|
try { |
1736 |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
1740 |
|
Node.class); |
1741 |
|
SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes", |
1742 |
|
int.class); |
1743 |
+ |
ITEM = l.findVarHandle(Node.class, "item", Object.class); |
1744 |
+ |
NEXT = l.findVarHandle(Node.class, "next", Node.class); |
1745 |
+ |
WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); |
1746 |
|
} catch (ReflectiveOperationException e) { |
1747 |
|
throw new Error(e); |
1748 |
|
} |