10 |
|
import java.util.concurrent.atomic.*; |
11 |
|
import java.util.*; |
12 |
|
import java.io.*; |
13 |
+ |
import sun.misc.Unsafe; |
14 |
+ |
import java.lang.reflect.*; |
15 |
|
|
16 |
|
/** |
17 |
|
* An unbounded {@linkplain TransferQueue} based on linked nodes. |
41 |
|
* <a href="{@docRoot}/../technotes/guides/collections/index.html"> |
42 |
|
* Java Collections Framework</a>. |
43 |
|
* |
44 |
< |
* @since 1.5 |
44 |
> |
* @since 1.7 |
45 |
|
* @author Doug Lea |
46 |
|
* @param <E> the type of elements held in this collection |
47 |
|
* |
51 |
|
private static final long serialVersionUID = -3223113410248163686L; |
52 |
|
|
53 |
|
/* |
54 |
< |
* This is still a work in prgress... |
54 |
> |
* This is still a work in progress... |
55 |
|
* |
56 |
|
* This class extends the approach used in FIFO-mode |
57 |
|
* SynchronousQueues. See the internal documentation, as well as |
81 |
|
* seems not to vary with number of CPUs (beyond 2) so is just |
82 |
|
* a constant. |
83 |
|
*/ |
84 |
< |
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; |
84 |
> |
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; |
85 |
|
|
86 |
|
/** |
87 |
|
* The number of times to spin before blocking in untimed waits. |
96 |
|
*/ |
97 |
|
static final long spinForTimeoutThreshold = 1000L; |
98 |
|
|
99 |
< |
/** |
99 |
> |
/** |
100 |
|
* Node class for LinkedTransferQueue. Opportunistically subclasses from |
101 |
|
* AtomicReference to represent item. Uses Object, not E, to allow |
102 |
|
* setting item to "this" after use, to avoid garbage |
133 |
|
} |
134 |
|
|
135 |
|
|
136 |
< |
private final QNode dummy = new QNode(null, false); |
137 |
< |
private final PaddedAtomicReference<QNode> head = |
138 |
< |
new PaddedAtomicReference<QNode>(dummy); |
139 |
< |
private final PaddedAtomicReference<QNode> tail = |
138 |
< |
new PaddedAtomicReference<QNode>(dummy); |
136 |
> |
/** head of the queue */ |
137 |
> |
private transient final PaddedAtomicReference<QNode> head; |
138 |
> |
/** tail of the queue */ |
139 |
> |
private transient final PaddedAtomicReference<QNode> tail; |
140 |
|
|
141 |
|
/** |
142 |
|
* Reference to a cancelled node that might not yet have been |
143 |
|
* unlinked from queue because it was the last inserted node |
144 |
|
* when it cancelled. |
145 |
|
*/ |
146 |
< |
private final PaddedAtomicReference<QNode> cleanMe = |
146 |
< |
new PaddedAtomicReference<QNode>(null); |
146 |
> |
private transient final PaddedAtomicReference<QNode> cleanMe; |
147 |
|
|
148 |
|
/** |
149 |
|
* Tries to cas nh as new head; if successful, unlink |
156 |
|
} |
157 |
|
return false; |
158 |
|
} |
159 |
< |
|
159 |
> |
|
160 |
|
/** |
161 |
|
* Puts or takes an item. Used for most queue operations (except |
162 |
|
* poll() and tryTransfer()) |
163 |
< |
* @param e the item or if null, signfies that this is a take |
163 |
> |
* @param e the item or if null, signifies that this is a take |
164 |
|
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT |
165 |
|
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT |
166 |
|
* @return an item, or null on failure |
188 |
|
return awaitFulfill(t, s, e, mode, nanos); |
189 |
|
} |
190 |
|
} |
191 |
< |
|
191 |
> |
|
192 |
|
else if (h != null) { |
193 |
|
QNode first = h.next; |
194 |
< |
if (t == tail.get() && first != null && |
194 |
> |
if (t == tail.get() && first != null && |
195 |
|
advanceHead(h, first)) { |
196 |
|
Object x = first.get(); |
197 |
|
if (x != first && first.compareAndSet(x, e)) { |
206 |
|
|
207 |
|
/** |
208 |
|
* Version of xfer for poll() and tryTransfer, which |
209 |
< |
* simpifies control paths both here and in xfer |
209 |
> |
* simplifies control paths both here and in xfer |
210 |
|
*/ |
211 |
|
private Object fulfill(Object e) { |
212 |
|
boolean isData = (e != null); |
228 |
|
} |
229 |
|
else if (h != null) { |
230 |
|
QNode first = h.next; |
231 |
< |
if (t == tail.get() && |
231 |
> |
if (t == tail.get() && |
232 |
|
first != null && |
233 |
|
advanceHead(h, first)) { |
234 |
|
Object x = first.get(); |
252 |
|
* @param nanos timeout value |
253 |
|
* @return matched item, or s if cancelled |
254 |
|
*/ |
255 |
< |
private Object awaitFulfill(QNode pred, QNode s, Object e, |
255 |
> |
private Object awaitFulfill(QNode pred, QNode s, Object e, |
256 |
|
int mode, long nanos) { |
257 |
|
if (mode == NOWAIT) |
258 |
|
return null; |
268 |
|
advanceHead(pred, s); // unlink if head |
269 |
|
if (x == s) // was cancelled |
270 |
|
return clean(pred, s); |
271 |
< |
else if (x != null) { |
271 |
> |
else if (x != null) { |
272 |
|
s.set(s); // avoid garbage retention |
273 |
|
return x; |
274 |
|
} |
288 |
|
if (spins < 0) { |
289 |
|
QNode h = head.get(); // only spin if at head |
290 |
|
spins = ((h != null && h.next == s) ? |
291 |
< |
(mode == TIMEOUT? |
291 |
> |
(mode == TIMEOUT? |
292 |
|
maxTimedSpins : maxUntimedSpins) : 0); |
293 |
|
} |
294 |
|
if (spins > 0) |
321 |
|
if (w != Thread.currentThread()) |
322 |
|
LockSupport.unpark(w); |
323 |
|
} |
324 |
< |
|
324 |
> |
|
325 |
|
for (;;) { |
326 |
|
if (pred.next != s) // already cleaned |
327 |
< |
return null; |
327 |
> |
return null; |
328 |
|
QNode h = head.get(); |
329 |
|
QNode hn = h.next; // Absorb cancelled first node as head |
330 |
|
if (hn != null && hn.next == hn) { |
360 |
|
cleanMe.compareAndSet(dp, null); |
361 |
|
if (dp == pred) |
362 |
|
return null; // s is already saved node |
363 |
< |
} |
363 |
> |
} |
364 |
|
else if (cleanMe.compareAndSet(null, pred)) |
365 |
|
return null; // Postpone cleaning s |
366 |
|
} |
367 |
|
} |
368 |
< |
|
368 |
> |
|
369 |
|
/** |
370 |
|
* Creates an initially empty <tt>LinkedTransferQueue</tt>. |
371 |
|
*/ |
372 |
|
public LinkedTransferQueue() { |
373 |
+ |
QNode dummy = new QNode(null, false); |
374 |
+ |
head = new PaddedAtomicReference<QNode>(dummy); |
375 |
+ |
tail = new PaddedAtomicReference<QNode>(dummy); |
376 |
+ |
cleanMe = new PaddedAtomicReference<QNode>(null); |
377 |
|
} |
378 |
|
|
379 |
|
/** |
385 |
|
* of its elements are null |
386 |
|
*/ |
387 |
|
public LinkedTransferQueue(Collection<? extends E> c) { |
388 |
+ |
this(); |
389 |
|
addAll(c); |
390 |
|
} |
391 |
|
|
395 |
|
xfer(e, NOWAIT, 0); |
396 |
|
} |
397 |
|
|
398 |
< |
public boolean offer(E e, long timeout, TimeUnit unit) |
398 |
> |
public boolean offer(E e, long timeout, TimeUnit unit) |
399 |
|
throws InterruptedException { |
400 |
|
if (e == null) throw new NullPointerException(); |
401 |
|
if (Thread.interrupted()) throw new InterruptedException(); |
412 |
|
public void transfer(E e) throws InterruptedException { |
413 |
|
if (e == null) throw new NullPointerException(); |
414 |
|
if (xfer(e, WAIT, 0) == null) { |
415 |
< |
Thread.interrupted(); |
415 |
> |
Thread.interrupted(); |
416 |
|
throw new InterruptedException(); |
417 |
< |
} |
417 |
> |
} |
418 |
|
} |
419 |
|
|
420 |
|
public boolean tryTransfer(E e, long timeout, TimeUnit unit) |
436 |
|
Object e = xfer(null, WAIT, 0); |
437 |
|
if (e != null) |
438 |
|
return (E)e; |
439 |
< |
Thread.interrupted(); |
439 |
> |
Thread.interrupted(); |
440 |
|
throw new InterruptedException(); |
441 |
|
} |
442 |
|
|
492 |
|
QNode last = t.next; |
493 |
|
QNode first = h.next; |
494 |
|
if (t == tail.get()) { |
495 |
< |
if (last != null) |
495 |
> |
if (last != null) |
496 |
|
tail.compareAndSet(t, last); |
497 |
|
else if (first != null) { |
498 |
|
Object x = first.get(); |
499 |
< |
if (x == first) |
500 |
< |
advanceHead(h, first); |
499 |
> |
if (x == first) |
500 |
> |
advanceHead(h, first); |
501 |
|
else |
502 |
|
return h; |
503 |
|
} |
514 |
|
} |
515 |
|
|
516 |
|
/** |
517 |
< |
* Iterators. Basic strategy os to travers list, treating |
517 |
> |
* Iterators. Basic strategy is to traverse list, treating |
518 |
|
* non-data (i.e., request) nodes as terminating list. |
519 |
|
* Once a valid data node is found, the item is cached |
520 |
|
* so that the next call to next() will return it even |
525 |
|
QNode currentNode; // last returned node, for remove() |
526 |
|
QNode prevNode; // predecessor of last returned node |
527 |
|
E nextItem; // Cache of next item, once commited to in next |
528 |
< |
|
528 |
> |
|
529 |
|
Itr() { |
530 |
|
nextNode = traversalHead(); |
531 |
|
advance(); |
532 |
|
} |
533 |
< |
|
533 |
> |
|
534 |
|
E advance() { |
535 |
|
prevNode = currentNode; |
536 |
|
currentNode = nextNode; |
537 |
|
E x = nextItem; |
538 |
< |
|
538 |
> |
|
539 |
|
QNode p = nextNode.next; |
540 |
|
for (;;) { |
541 |
|
if (p == null || !p.isData) { |
548 |
|
nextNode = p; |
549 |
|
nextItem = (E)item; |
550 |
|
return x; |
551 |
< |
} |
551 |
> |
} |
552 |
|
prevNode = p; |
553 |
|
p = p.next; |
554 |
|
} |
555 |
|
} |
556 |
< |
|
556 |
> |
|
557 |
|
public boolean hasNext() { |
558 |
|
return nextNode != null; |
559 |
|
} |
560 |
< |
|
560 |
> |
|
561 |
|
public E next() { |
562 |
|
if (nextNode == null) throw new NoSuchElementException(); |
563 |
|
return advance(); |
564 |
|
} |
565 |
< |
|
565 |
> |
|
566 |
|
public void remove() { |
567 |
|
QNode p = currentNode; |
568 |
|
QNode prev = prevNode; |
569 |
< |
if (prev == null || p == null) |
569 |
> |
if (prev == null || p == null) |
570 |
|
throw new IllegalStateException(); |
571 |
|
Object x = p.get(); |
572 |
|
if (x != null && x != p && p.compareAndSet(x, p)) |
613 |
|
if (p == null) |
614 |
|
return false; |
615 |
|
Object x = p.get(); |
616 |
< |
if (p != x) |
616 |
> |
if (p != x) |
617 |
|
return !p.isData; |
618 |
|
} |
619 |
|
} |
620 |
< |
|
620 |
> |
|
621 |
|
/** |
622 |
|
* Returns the number of elements in this queue. If this queue |
623 |
|
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns |
635 |
|
QNode h = traversalHead(); |
636 |
|
for (QNode p = h.next; p != null && p.isData; p = p.next) { |
637 |
|
Object x = p.get(); |
638 |
< |
if (x != null && x != p) { |
638 |
> |
if (x != null && x != p) { |
639 |
|
if (++count == Integer.MAX_VALUE) // saturated |
640 |
|
break; |
641 |
|
} |
683 |
|
private void readObject(java.io.ObjectInputStream s) |
684 |
|
throws java.io.IOException, ClassNotFoundException { |
685 |
|
s.defaultReadObject(); |
686 |
+ |
resetHeadAndTail(); |
687 |
|
for (;;) { |
688 |
|
E item = (E)s.readObject(); |
689 |
|
if (item == null) |
692 |
|
offer(item); |
693 |
|
} |
694 |
|
} |
695 |
+ |
|
696 |
+ |
|
697 |
+ |
// Support for resetting head/tail while deserializing |
698 |
+ |
|
699 |
+ |
// Temporary Unsafe mechanics for preliminary release |
700 |
+ |
private static final Unsafe _unsafe; |
701 |
+ |
private static final long headOffset; |
702 |
+ |
private static final long tailOffset; |
703 |
+ |
private static final long cleanMeOffset; |
704 |
+ |
static { |
705 |
+ |
try { |
706 |
+ |
if (LinkedTransferQueue.class.getClassLoader() != null) { |
707 |
+ |
Field f = Unsafe.class.getDeclaredField("theUnsafe"); |
708 |
+ |
f.setAccessible(true); |
709 |
+ |
_unsafe = (Unsafe)f.get(null); |
710 |
+ |
} |
711 |
+ |
else |
712 |
+ |
_unsafe = Unsafe.getUnsafe(); |
713 |
+ |
headOffset = _unsafe.objectFieldOffset |
714 |
+ |
(LinkedTransferQueue.class.getDeclaredField("head")); |
715 |
+ |
tailOffset = _unsafe.objectFieldOffset |
716 |
+ |
(LinkedTransferQueue.class.getDeclaredField("tail")); |
717 |
+ |
cleanMeOffset = _unsafe.objectFieldOffset |
718 |
+ |
(LinkedTransferQueue.class.getDeclaredField("cleanMe")); |
719 |
+ |
} catch (Exception e) { |
720 |
+ |
throw new RuntimeException("Could not initialize intrinsics", e); |
721 |
+ |
} |
722 |
+ |
} |
723 |
+ |
|
724 |
+ |
private void resetHeadAndTail() { |
725 |
+ |
QNode dummy = new QNode(null, false); |
726 |
+ |
_unsafe.putObjectVolatile(this, headOffset, dummy); |
727 |
+ |
_unsafe.putObjectVolatile(this, tailOffset, dummy); |
728 |
+ |
_unsafe.putObjectVolatile(this, cleanMeOffset, |
729 |
+ |
new PaddedAtomicReference<QNode>(null)); |
730 |
+ |
|
731 |
+ |
} |
732 |
+ |
|
733 |
|
} |