ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
Revision: 1.8
Committed: Fri Oct 3 00:39:48 2008 UTC (15 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.7: +4 -2 lines
Log Message:
Fix serialization

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.util.concurrent.*;
9 import java.util.concurrent.locks.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.*;
12 import java.io.*;
13 import sun.misc.Unsafe;
14 import java.lang.reflect.*;
15
16 /**
17 * An unbounded {@linkplain TransferQueue} based on linked nodes.
18 * This queue orders elements FIFO (first-in-first-out) with respect
19 * to any given producer. The <em>head</em> of the queue is that
20 * element that has been on the queue the longest time for some
21 * producer. The <em>tail</em> of the queue is that element that has
22 * been on the queue the shortest time for some producer.
23 *
24 * <p>Beware that, unlike in most collections, the <tt>size</tt>
25 * method is <em>NOT</em> a constant-time operation. Because of the
26 * asynchronous nature of these queues, determining the current number
27 * of elements requires a traversal of the elements.
28 *
29 * <p>This class and its iterator implement all of the
30 * <em>optional</em> methods of the {@link Collection} and {@link
31 * Iterator} interfaces.
32 *
33 * <p>Memory consistency effects: As with other concurrent
34 * collections, actions in a thread prior to placing an object into a
35 * {@code LinkedTransferQueue}
36 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
37 * actions subsequent to the access or removal of that element from
38 * the {@code LinkedTransferQueue} in another thread.
39 *
40 * <p>This class is a member of the
41 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 * Java Collections Framework</a>.
43 *
44 * @since 1.7
45 * @author Doug Lea
46 * @param <E> the type of elements held in this collection
47 *
48 */
49 public class LinkedTransferQueue<E> extends AbstractQueue<E>
50 implements TransferQueue<E>, java.io.Serializable {
51 private static final long serialVersionUID = -3223113410248163686L;
52
53 /*
54 * This is still a work in progress...
55 *
56 * This class extends the approach used in FIFO-mode
57 * SynchronousQueues. See the internal documentation, as well as
58 * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
59 * Lea & Scott
60 * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
61 *
62 * The main extension is to provide different Wait modes
63 * for the main "xfer" method that puts or takes items.
64 * These don't impact the basic dual-queue logic, but instead
65 * control whether or how threads block upon insertion
66 * of request or data nodes into the dual queue.
67 */
68
69 // Wait modes for xfer method
70 static final int NOWAIT = 0;
71 static final int TIMEOUT = 1;
72 static final int WAIT = 2;
73
74 /** The number of CPUs, for spin control */
75 static final int NCPUS = Runtime.getRuntime().availableProcessors();
76
77 /**
78 * The number of times to spin before blocking in timed waits.
79 * The value is empirically derived -- it works well across a
80 * variety of processors and OSes. Empirically, the best value
81 * seems not to vary with number of CPUs (beyond 2) so is just
82 * a constant.
83 */
84 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
85
86 /**
87 * The number of times to spin before blocking in untimed waits.
88 * This is greater than timed value because untimed waits spin
89 * faster since they don't need to check times on each spin.
90 */
91 static final int maxUntimedSpins = maxTimedSpins * 16;
92
93 /**
94 * The number of nanoseconds for which it is faster to spin
95 * rather than to use timed park. A rough estimate suffices.
96 */
97 static final long spinForTimeoutThreshold = 1000L;
98
99 /**
100 * Node class for LinkedTransferQueue. Opportunistically subclasses from
101 * AtomicReference to represent item. Uses Object, not E, to allow
102 * setting item to "this" after use, to avoid garbage
103 * retention. Similarly, setting the next field to this is used as
104 * sentinel that node is off list.
105 */
106 static final class QNode extends AtomicReference<Object> {
107 volatile QNode next;
108 volatile Thread waiter; // to control park/unpark
109 final boolean isData;
110 QNode(Object item, boolean isData) {
111 super(item);
112 this.isData = isData;
113 }
114
115 static final AtomicReferenceFieldUpdater<QNode, QNode>
116 nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117 (QNode.class, QNode.class, "next");
118
119 boolean casNext(QNode cmp, QNode val) {
120 return nextUpdater.compareAndSet(this, cmp, val);
121 }
122 }
123
124 /**
125 * Padded version of AtomicReference used for head, tail and
126 * cleanMe, to alleviate contention across threads CASing one vs
127 * the other.
128 */
129 static final class PaddedAtomicReference<T> extends AtomicReference<T> {
130 // enough padding for 64bytes with 4byte refs
131 Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
132 PaddedAtomicReference(T r) { super(r); }
133 }
134
135
136 /** head of the queue */
137 private transient final PaddedAtomicReference<QNode> head;
138 /** tail of the queue */
139 private transient final PaddedAtomicReference<QNode> tail;
140
141 /**
142 * Reference to a cancelled node that might not yet have been
143 * unlinked from queue because it was the last inserted node
144 * when it cancelled.
145 */
146 private transient final PaddedAtomicReference<QNode> cleanMe;
147
148 /**
149 * Tries to cas nh as new head; if successful, unlink
150 * old head's next node to avoid garbage retention.
151 */
152 private boolean advanceHead(QNode h, QNode nh) {
153 if (h == head.get() && head.compareAndSet(h, nh)) {
154 h.next = h; // forget old next
155 return true;
156 }
157 return false;
158 }
159
160 /**
161 * Puts or takes an item. Used for most queue operations (except
162 * poll() and tryTransfer())
163 * @param e the item or if null, signifies that this is a take
164 * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
165 * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
166 * @return an item, or null on failure
167 */
168 private Object xfer(Object e, int mode, long nanos) {
169 boolean isData = (e != null);
170 QNode s = null;
171 final PaddedAtomicReference<QNode> head = this.head;
172 final PaddedAtomicReference<QNode> tail = this.tail;
173
174 for (;;) {
175 QNode t = tail.get();
176 QNode h = head.get();
177
178 if (t != null && (t == h || t.isData == isData)) {
179 if (s == null)
180 s = new QNode(e, isData);
181 QNode last = t.next;
182 if (last != null) {
183 if (t == tail.get())
184 tail.compareAndSet(t, last);
185 }
186 else if (t.casNext(null, s)) {
187 tail.compareAndSet(t, s);
188 return awaitFulfill(t, s, e, mode, nanos);
189 }
190 }
191
192 else if (h != null) {
193 QNode first = h.next;
194 if (t == tail.get() && first != null &&
195 advanceHead(h, first)) {
196 Object x = first.get();
197 if (x != first && first.compareAndSet(x, e)) {
198 LockSupport.unpark(first.waiter);
199 return isData? e : x;
200 }
201 }
202 }
203 }
204 }
205
206
207 /**
208 * Version of xfer for poll() and tryTransfer, which
209 * simplifies control paths both here and in xfer
210 */
211 private Object fulfill(Object e) {
212 boolean isData = (e != null);
213 final PaddedAtomicReference<QNode> head = this.head;
214 final PaddedAtomicReference<QNode> tail = this.tail;
215
216 for (;;) {
217 QNode t = tail.get();
218 QNode h = head.get();
219
220 if (t != null && (t == h || t.isData == isData)) {
221 QNode last = t.next;
222 if (t == tail.get()) {
223 if (last != null)
224 tail.compareAndSet(t, last);
225 else
226 return null;
227 }
228 }
229 else if (h != null) {
230 QNode first = h.next;
231 if (t == tail.get() &&
232 first != null &&
233 advanceHead(h, first)) {
234 Object x = first.get();
235 if (x != first && first.compareAndSet(x, e)) {
236 LockSupport.unpark(first.waiter);
237 return isData? e : x;
238 }
239 }
240 }
241 }
242 }
243
244 /**
245 * Spins/blocks until node s is fulfilled or caller gives up,
246 * depending on wait mode.
247 *
248 * @param pred the predecessor of waiting node
249 * @param s the waiting node
250 * @param e the comparison value for checking match
251 * @param mode mode
252 * @param nanos timeout value
253 * @return matched item, or s if cancelled
254 */
255 private Object awaitFulfill(QNode pred, QNode s, Object e,
256 int mode, long nanos) {
257 if (mode == NOWAIT)
258 return null;
259
260 long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
261 Thread w = Thread.currentThread();
262 int spins = -1; // set to desired spin count below
263 for (;;) {
264 if (w.isInterrupted())
265 s.compareAndSet(e, s);
266 Object x = s.get();
267 if (x != e) { // Node was matched or cancelled
268 advanceHead(pred, s); // unlink if head
269 if (x == s) // was cancelled
270 return clean(pred, s);
271 else if (x != null) {
272 s.set(s); // avoid garbage retention
273 return x;
274 }
275 else
276 return e;
277 }
278
279 if (mode == TIMEOUT) {
280 long now = System.nanoTime();
281 nanos -= now - lastTime;
282 lastTime = now;
283 if (nanos <= 0) {
284 s.compareAndSet(e, s); // try to cancel
285 continue;
286 }
287 }
288 if (spins < 0) {
289 QNode h = head.get(); // only spin if at head
290 spins = ((h != null && h.next == s) ?
291 (mode == TIMEOUT?
292 maxTimedSpins : maxUntimedSpins) : 0);
293 }
294 if (spins > 0)
295 --spins;
296 else if (s.waiter == null)
297 s.waiter = w;
298 else if (mode != TIMEOUT) {
299 // LockSupport.park(this);
300 LockSupport.park(); // allows run on java5
301 s.waiter = null;
302 spins = -1;
303 }
304 else if (nanos > spinForTimeoutThreshold) {
305 // LockSupport.parkNanos(this, nanos);
306 LockSupport.parkNanos(nanos);
307 s.waiter = null;
308 spins = -1;
309 }
310 }
311 }
312
313 /**
314 * Gets rid of cancelled node s with original predecessor pred.
315 * @return null (to simplify use by callers)
316 */
317 private Object clean(QNode pred, QNode s) {
318 Thread w = s.waiter;
319 if (w != null) { // Wake up thread
320 s.waiter = null;
321 if (w != Thread.currentThread())
322 LockSupport.unpark(w);
323 }
324
325 for (;;) {
326 if (pred.next != s) // already cleaned
327 return null;
328 QNode h = head.get();
329 QNode hn = h.next; // Absorb cancelled first node as head
330 if (hn != null && hn.next == hn) {
331 advanceHead(h, hn);
332 continue;
333 }
334 QNode t = tail.get(); // Ensure consistent read for tail
335 if (t == h)
336 return null;
337 QNode tn = t.next;
338 if (t != tail.get())
339 continue;
340 if (tn != null) { // Help advance tail
341 tail.compareAndSet(t, tn);
342 continue;
343 }
344 if (s != t) { // If not tail, try to unsplice
345 QNode sn = s.next;
346 if (sn == s || pred.casNext(s, sn))
347 return null;
348 }
349 QNode dp = cleanMe.get();
350 if (dp != null) { // Try unlinking previous cancelled node
351 QNode d = dp.next;
352 QNode dn;
353 if (d == null || // d is gone or
354 d == dp || // d is off list or
355 d.get() != d || // d not cancelled or
356 (d != t && // d not tail and
357 (dn = d.next) != null && // has successor
358 dn != d && // that is on list
359 dp.casNext(d, dn))) // d unspliced
360 cleanMe.compareAndSet(dp, null);
361 if (dp == pred)
362 return null; // s is already saved node
363 }
364 else if (cleanMe.compareAndSet(null, pred))
365 return null; // Postpone cleaning s
366 }
367 }
368
369 /**
370 * Creates an initially empty <tt>LinkedTransferQueue</tt>.
371 */
372 public LinkedTransferQueue() {
373 QNode dummy = new QNode(null, false);
374 head = new PaddedAtomicReference<QNode>(dummy);
375 tail = new PaddedAtomicReference<QNode>(dummy);
376 cleanMe = new PaddedAtomicReference<QNode>(null);
377 }
378
379 /**
380 * Creates a <tt>LinkedTransferQueue</tt>
381 * initially containing the elements of the given collection,
382 * added in traversal order of the collection's iterator.
383 * @param c the collection of elements to initially contain
384 * @throws NullPointerException if the specified collection or any
385 * of its elements are null
386 */
387 public LinkedTransferQueue(Collection<? extends E> c) {
388 this();
389 addAll(c);
390 }
391
392 public void put(E e) throws InterruptedException {
393 if (e == null) throw new NullPointerException();
394 if (Thread.interrupted()) throw new InterruptedException();
395 xfer(e, NOWAIT, 0);
396 }
397
398 public boolean offer(E e, long timeout, TimeUnit unit)
399 throws InterruptedException {
400 if (e == null) throw new NullPointerException();
401 if (Thread.interrupted()) throw new InterruptedException();
402 xfer(e, NOWAIT, 0);
403 return true;
404 }
405
406 public boolean offer(E e) {
407 if (e == null) throw new NullPointerException();
408 xfer(e, NOWAIT, 0);
409 return true;
410 }
411
412 public void transfer(E e) throws InterruptedException {
413 if (e == null) throw new NullPointerException();
414 if (xfer(e, WAIT, 0) == null) {
415 Thread.interrupted();
416 throw new InterruptedException();
417 }
418 }
419
420 public boolean tryTransfer(E e, long timeout, TimeUnit unit)
421 throws InterruptedException {
422 if (e == null) throw new NullPointerException();
423 if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
424 return true;
425 if (!Thread.interrupted())
426 return false;
427 throw new InterruptedException();
428 }
429
430 public boolean tryTransfer(E e) {
431 if (e == null) throw new NullPointerException();
432 return fulfill(e) != null;
433 }
434
435 public E take() throws InterruptedException {
436 Object e = xfer(null, WAIT, 0);
437 if (e != null)
438 return (E)e;
439 Thread.interrupted();
440 throw new InterruptedException();
441 }
442
443 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
444 Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
445 if (e != null || !Thread.interrupted())
446 return (E)e;
447 throw new InterruptedException();
448 }
449
450 public E poll() {
451 return (E)fulfill(null);
452 }
453
454 public int drainTo(Collection<? super E> c) {
455 if (c == null)
456 throw new NullPointerException();
457 if (c == this)
458 throw new IllegalArgumentException();
459 int n = 0;
460 E e;
461 while ( (e = poll()) != null) {
462 c.add(e);
463 ++n;
464 }
465 return n;
466 }
467
468 public int drainTo(Collection<? super E> c, int maxElements) {
469 if (c == null)
470 throw new NullPointerException();
471 if (c == this)
472 throw new IllegalArgumentException();
473 int n = 0;
474 E e;
475 while (n < maxElements && (e = poll()) != null) {
476 c.add(e);
477 ++n;
478 }
479 return n;
480 }
481
482 // Traversal-based methods
483
484 /**
485 * Return head after performing any outstanding helping steps
486 */
487 private QNode traversalHead() {
488 for (;;) {
489 QNode t = tail.get();
490 QNode h = head.get();
491 if (h != null && t != null) {
492 QNode last = t.next;
493 QNode first = h.next;
494 if (t == tail.get()) {
495 if (last != null)
496 tail.compareAndSet(t, last);
497 else if (first != null) {
498 Object x = first.get();
499 if (x == first)
500 advanceHead(h, first);
501 else
502 return h;
503 }
504 else
505 return h;
506 }
507 }
508 }
509 }
510
511
512 public Iterator<E> iterator() {
513 return new Itr();
514 }
515
516 /**
517 * Iterators. Basic strategy is to traverse list, treating
518 * non-data (i.e., request) nodes as terminating list.
519 * Once a valid data node is found, the item is cached
520 * so that the next call to next() will return it even
521 * if subsequently removed.
522 */
523 class Itr implements Iterator<E> {
524 QNode nextNode; // Next node to return next
525 QNode currentNode; // last returned node, for remove()
526 QNode prevNode; // predecessor of last returned node
527 E nextItem; // Cache of next item, once commited to in next
528
529 Itr() {
530 nextNode = traversalHead();
531 advance();
532 }
533
534 E advance() {
535 prevNode = currentNode;
536 currentNode = nextNode;
537 E x = nextItem;
538
539 QNode p = nextNode.next;
540 for (;;) {
541 if (p == null || !p.isData) {
542 nextNode = null;
543 nextItem = null;
544 return x;
545 }
546 Object item = p.get();
547 if (item != p && item != null) {
548 nextNode = p;
549 nextItem = (E)item;
550 return x;
551 }
552 prevNode = p;
553 p = p.next;
554 }
555 }
556
557 public boolean hasNext() {
558 return nextNode != null;
559 }
560
561 public E next() {
562 if (nextNode == null) throw new NoSuchElementException();
563 return advance();
564 }
565
566 public void remove() {
567 QNode p = currentNode;
568 QNode prev = prevNode;
569 if (prev == null || p == null)
570 throw new IllegalStateException();
571 Object x = p.get();
572 if (x != null && x != p && p.compareAndSet(x, p))
573 clean(prev, p);
574 }
575 }
576
577 public E peek() {
578 for (;;) {
579 QNode h = traversalHead();
580 QNode p = h.next;
581 if (p == null)
582 return null;
583 Object x = p.get();
584 if (p != x) {
585 if (!p.isData)
586 return null;
587 if (x != null)
588 return (E)x;
589 }
590 }
591 }
592
593 public boolean isEmpty() {
594 for (;;) {
595 QNode h = traversalHead();
596 QNode p = h.next;
597 if (p == null)
598 return true;
599 Object x = p.get();
600 if (p != x) {
601 if (!p.isData)
602 return true;
603 if (x != null)
604 return false;
605 }
606 }
607 }
608
609 public boolean hasWaitingConsumer() {
610 for (;;) {
611 QNode h = traversalHead();
612 QNode p = h.next;
613 if (p == null)
614 return false;
615 Object x = p.get();
616 if (p != x)
617 return !p.isData;
618 }
619 }
620
621 /**
622 * Returns the number of elements in this queue. If this queue
623 * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
624 * <tt>Integer.MAX_VALUE</tt>.
625 *
626 * <p>Beware that, unlike in most collections, this method is
627 * <em>NOT</em> a constant-time operation. Because of the
628 * asynchronous nature of these queues, determining the current
629 * number of elements requires an O(n) traversal.
630 *
631 * @return the number of elements in this queue
632 */
633 public int size() {
634 int count = 0;
635 QNode h = traversalHead();
636 for (QNode p = h.next; p != null && p.isData; p = p.next) {
637 Object x = p.get();
638 if (x != null && x != p) {
639 if (++count == Integer.MAX_VALUE) // saturated
640 break;
641 }
642 }
643 return count;
644 }
645
646 public int getWaitingConsumerCount() {
647 int count = 0;
648 QNode h = traversalHead();
649 for (QNode p = h.next; p != null && !p.isData; p = p.next) {
650 if (p.get() == null) {
651 if (++count == Integer.MAX_VALUE)
652 break;
653 }
654 }
655 return count;
656 }
657
658 public int remainingCapacity() {
659 return Integer.MAX_VALUE;
660 }
661
662 /**
663 * Save the state to a stream (that is, serialize it).
664 *
665 * @serialData All of the elements (each an <tt>E</tt>) in
666 * the proper order, followed by a null
667 * @param s the stream
668 */
669 private void writeObject(java.io.ObjectOutputStream s)
670 throws java.io.IOException {
671 s.defaultWriteObject();
672 for (Iterator<E> it = iterator(); it.hasNext(); )
673 s.writeObject(it.next());
674 // Use trailing null as sentinel
675 s.writeObject(null);
676 }
677
678 /**
679 * Reconstitute the Queue instance from a stream (that is,
680 * deserialize it).
681 * @param s the stream
682 */
683 private void readObject(java.io.ObjectInputStream s)
684 throws java.io.IOException, ClassNotFoundException {
685 s.defaultReadObject();
686 resetHeadAndTail();
687 for (;;) {
688 E item = (E)s.readObject();
689 if (item == null)
690 break;
691 else
692 offer(item);
693 }
694 }
695
696
697 // Support for resetting head/tail while deserializing
698
699 // Temporary Unsafe mechanics for preliminary release
700 private static final Unsafe _unsafe;
701 private static final long headOffset;
702 private static final long tailOffset;
703 private static final long cleanMeOffset;
704 static {
705 try {
706 if (LinkedTransferQueue.class.getClassLoader() != null) {
707 Field f = Unsafe.class.getDeclaredField("theUnsafe");
708 f.setAccessible(true);
709 _unsafe = (Unsafe)f.get(null);
710 }
711 else
712 _unsafe = Unsafe.getUnsafe();
713 headOffset = _unsafe.objectFieldOffset
714 (LinkedTransferQueue.class.getDeclaredField("head"));
715 tailOffset = _unsafe.objectFieldOffset
716 (LinkedTransferQueue.class.getDeclaredField("tail"));
717 cleanMeOffset = _unsafe.objectFieldOffset
718 (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
719 } catch (Exception e) {
720 throw new RuntimeException("Could not initialize intrinsics", e);
721 }
722 }
723
724 private void resetHeadAndTail() {
725 QNode dummy = new QNode(null, false);
726 _unsafe.putObjectVolatile(this, headOffset,
727 new PaddedAtomicReference<QNode>(dummy));
728 _unsafe.putObjectVolatile(this, tailOffset,
729 new PaddedAtomicReference<QNode>(dummy));
730 _unsafe.putObjectVolatile(this, cleanMeOffset,
731 new PaddedAtomicReference<QNode>(null));
732
733 }
734
735 }