ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
Revision: 1.3
Committed: Mon Aug 27 19:48:36 2007 UTC (16 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.2: +1 -1 lines
Log Message:
Overhaul ParallelArray APIs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.util.concurrent.*;
9 import java.util.concurrent.locks.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.*;
12 import java.io.*;
13
14 /**
15 * An unbounded {@linkplain TransferQueue} based on linked nodes.
16 * This queue orders elements FIFO (first-in-first-out) with respect
17 * to any given producer. The <em>head</em> of the queue is that
18 * element that has been on the queue the longest time for some
19 * producer. The <em>tail</em> of the queue is that element that has
20 * been on the queue the shortest time for some producer.
21 *
22 * <p>Beware that, unlike in most collections, the <tt>size</tt>
23 * method is <em>NOT</em> a constant-time operation. Because of the
24 * asynchronous nature of these queues, determining the current number
25 * of elements requires a traversal of the elements.
26 *
27 * <p>This class and its iterator implement all of the
28 * <em>optional</em> methods of the {@link Collection} and {@link
29 * Iterator} interfaces.
30 *
31 * <p>Memory consistency effects: As with other concurrent
32 * collections, actions in a thread prior to placing an object into a
33 * {@code LinkedTransferQueue}
34 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
35 * actions subsequent to the access or removal of that element from
36 * the {@code LinkedTransferQueue} in another thread.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.7
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 *
46 */
47 public class LinkedTransferQueue<E> extends AbstractQueue<E>
48 implements TransferQueue<E>, java.io.Serializable {
49 private static final long serialVersionUID = -3223113410248163686L;
50
51 /*
52 * This is still a work in prgress...
53 *
54 * This class extends the approach used in FIFO-mode
55 * SynchronousQueues. See the internal documentation, as well as
56 * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57 * Lea & Scott
58 * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59 *
60 * The main extension is to provide different Wait modes
61 * for the main "xfer" method that puts or takes items.
62 * These don't impact the basic dual-queue logic, but instead
63 * control whether or how threads block upon insertion
64 * of request or data nodes into the dual queue.
65 */
66
67 // Wait modes for xfer method
68 static final int NOWAIT = 0;
69 static final int TIMEOUT = 1;
70 static final int WAIT = 2;
71
72 /** The number of CPUs, for spin control */
73 static final int NCPUS = Runtime.getRuntime().availableProcessors();
74
75 /**
76 * The number of times to spin before blocking in timed waits.
77 * The value is empirically derived -- it works well across a
78 * variety of processors and OSes. Empirically, the best value
79 * seems not to vary with number of CPUs (beyond 2) so is just
80 * a constant.
81 */
82 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
83
84 /**
85 * The number of times to spin before blocking in untimed waits.
86 * This is greater than timed value because untimed waits spin
87 * faster since they don't need to check times on each spin.
88 */
89 static final int maxUntimedSpins = maxTimedSpins * 16;
90
91 /**
92 * The number of nanoseconds for which it is faster to spin
93 * rather than to use timed park. A rough estimate suffices.
94 */
95 static final long spinForTimeoutThreshold = 1000L;
96
97 /**
98 * Node class for LinkedTransferQueue. Opportunistically subclasses from
99 * AtomicReference to represent item. Uses Object, not E, to allow
100 * setting item to "this" after use, to avoid garbage
101 * retention. Similarly, setting the next field to this is used as
102 * sentinel that node is off list.
103 */
104 static final class QNode extends AtomicReference<Object> {
105 volatile QNode next;
106 volatile Thread waiter; // to control park/unpark
107 final boolean isData;
108 QNode(Object item, boolean isData) {
109 super(item);
110 this.isData = isData;
111 }
112
113 static final AtomicReferenceFieldUpdater<QNode, QNode>
114 nextUpdater = AtomicReferenceFieldUpdater.newUpdater
115 (QNode.class, QNode.class, "next");
116
117 boolean casNext(QNode cmp, QNode val) {
118 return nextUpdater.compareAndSet(this, cmp, val);
119 }
120 }
121
122 /**
123 * Padded version of AtomicReference used for head, tail and
124 * cleanMe, to alleviate contention across threads CASing one vs
125 * the other.
126 */
127 static final class PaddedAtomicReference<T> extends AtomicReference<T> {
128 // enough padding for 64bytes with 4byte refs
129 Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
130 PaddedAtomicReference(T r) { super(r); }
131 }
132
133
134 private final QNode dummy = new QNode(null, false);
135 private final PaddedAtomicReference<QNode> head =
136 new PaddedAtomicReference<QNode>(dummy);
137 private final PaddedAtomicReference<QNode> tail =
138 new PaddedAtomicReference<QNode>(dummy);
139
140 /**
141 * Reference to a cancelled node that might not yet have been
142 * unlinked from queue because it was the last inserted node
143 * when it cancelled.
144 */
145 private final PaddedAtomicReference<QNode> cleanMe =
146 new PaddedAtomicReference<QNode>(null);
147
148 /**
149 * Tries to cas nh as new head; if successful, unlink
150 * old head's next node to avoid garbage retention.
151 */
152 private boolean advanceHead(QNode h, QNode nh) {
153 if (h == head.get() && head.compareAndSet(h, nh)) {
154 h.next = h; // forget old next
155 return true;
156 }
157 return false;
158 }
159
160 /**
161 * Puts or takes an item. Used for most queue operations (except
162 * poll() and tryTransfer())
163 * @param e the item or if null, signfies that this is a take
164 * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
165 * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
166 * @return an item, or null on failure
167 */
168 private Object xfer(Object e, int mode, long nanos) {
169 boolean isData = (e != null);
170 QNode s = null;
171 final PaddedAtomicReference<QNode> head = this.head;
172 final PaddedAtomicReference<QNode> tail = this.tail;
173
174 for (;;) {
175 QNode t = tail.get();
176 QNode h = head.get();
177
178 if (t != null && (t == h || t.isData == isData)) {
179 if (s == null)
180 s = new QNode(e, isData);
181 QNode last = t.next;
182 if (last != null) {
183 if (t == tail.get())
184 tail.compareAndSet(t, last);
185 }
186 else if (t.casNext(null, s)) {
187 tail.compareAndSet(t, s);
188 return awaitFulfill(t, s, e, mode, nanos);
189 }
190 }
191
192 else if (h != null) {
193 QNode first = h.next;
194 if (t == tail.get() && first != null &&
195 advanceHead(h, first)) {
196 Object x = first.get();
197 if (x != first && first.compareAndSet(x, e)) {
198 LockSupport.unpark(first.waiter);
199 return isData? e : x;
200 }
201 }
202 }
203 }
204 }
205
206
207 /**
208 * Version of xfer for poll() and tryTransfer, which
209 * simpifies control paths both here and in xfer
210 */
211 private Object fulfill(Object e) {
212 boolean isData = (e != null);
213 final PaddedAtomicReference<QNode> head = this.head;
214 final PaddedAtomicReference<QNode> tail = this.tail;
215
216 for (;;) {
217 QNode t = tail.get();
218 QNode h = head.get();
219
220 if (t != null && (t == h || t.isData == isData)) {
221 QNode last = t.next;
222 if (t == tail.get()) {
223 if (last != null)
224 tail.compareAndSet(t, last);
225 else
226 return null;
227 }
228 }
229 else if (h != null) {
230 QNode first = h.next;
231 if (t == tail.get() &&
232 first != null &&
233 advanceHead(h, first)) {
234 Object x = first.get();
235 if (x != first && first.compareAndSet(x, e)) {
236 LockSupport.unpark(first.waiter);
237 return isData? e : x;
238 }
239 }
240 }
241 }
242 }
243
244 /**
245 * Spins/blocks until node s is fulfilled or caller gives up,
246 * depending on wait mode.
247 *
248 * @param pred the predecessor of waiting node
249 * @param s the waiting node
250 * @param e the comparison value for checking match
251 * @param mode mode
252 * @param nanos timeout value
253 * @return matched item, or s if cancelled
254 */
255 private Object awaitFulfill(QNode pred, QNode s, Object e,
256 int mode, long nanos) {
257 if (mode == NOWAIT)
258 return null;
259
260 long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
261 Thread w = Thread.currentThread();
262 int spins = -1; // set to desired spin count below
263 for (;;) {
264 if (w.isInterrupted())
265 s.compareAndSet(e, s);
266 Object x = s.get();
267 if (x != e) { // Node was matched or cancelled
268 advanceHead(pred, s); // unlink if head
269 if (x == s) // was cancelled
270 return clean(pred, s);
271 else if (x != null) {
272 s.set(s); // avoid garbage retention
273 return x;
274 }
275 else
276 return e;
277 }
278
279 if (mode == TIMEOUT) {
280 long now = System.nanoTime();
281 nanos -= now - lastTime;
282 lastTime = now;
283 if (nanos <= 0) {
284 s.compareAndSet(e, s); // try to cancel
285 continue;
286 }
287 }
288 if (spins < 0) {
289 QNode h = head.get(); // only spin if at head
290 spins = ((h != null && h.next == s) ?
291 (mode == TIMEOUT?
292 maxTimedSpins : maxUntimedSpins) : 0);
293 }
294 if (spins > 0)
295 --spins;
296 else if (s.waiter == null)
297 s.waiter = w;
298 else if (mode != TIMEOUT) {
299 // LockSupport.park(this);
300 LockSupport.park(); // allows run on java5
301 s.waiter = null;
302 spins = -1;
303 }
304 else if (nanos > spinForTimeoutThreshold) {
305 // LockSupport.parkNanos(this, nanos);
306 LockSupport.parkNanos(nanos);
307 s.waiter = null;
308 spins = -1;
309 }
310 }
311 }
312
313 /**
314 * Gets rid of cancelled node s with original predecessor pred.
315 * @return null (to simplify use by callers)
316 */
317 private Object clean(QNode pred, QNode s) {
318 Thread w = s.waiter;
319 if (w != null) { // Wake up thread
320 s.waiter = null;
321 if (w != Thread.currentThread())
322 LockSupport.unpark(w);
323 }
324
325 for (;;) {
326 if (pred.next != s) // already cleaned
327 return null;
328 QNode h = head.get();
329 QNode hn = h.next; // Absorb cancelled first node as head
330 if (hn != null && hn.next == hn) {
331 advanceHead(h, hn);
332 continue;
333 }
334 QNode t = tail.get(); // Ensure consistent read for tail
335 if (t == h)
336 return null;
337 QNode tn = t.next;
338 if (t != tail.get())
339 continue;
340 if (tn != null) { // Help advance tail
341 tail.compareAndSet(t, tn);
342 continue;
343 }
344 if (s != t) { // If not tail, try to unsplice
345 QNode sn = s.next;
346 if (sn == s || pred.casNext(s, sn))
347 return null;
348 }
349 QNode dp = cleanMe.get();
350 if (dp != null) { // Try unlinking previous cancelled node
351 QNode d = dp.next;
352 QNode dn;
353 if (d == null || // d is gone or
354 d == dp || // d is off list or
355 d.get() != d || // d not cancelled or
356 (d != t && // d not tail and
357 (dn = d.next) != null && // has successor
358 dn != d && // that is on list
359 dp.casNext(d, dn))) // d unspliced
360 cleanMe.compareAndSet(dp, null);
361 if (dp == pred)
362 return null; // s is already saved node
363 }
364 else if (cleanMe.compareAndSet(null, pred))
365 return null; // Postpone cleaning s
366 }
367 }
368
369 /**
370 * Creates an initially empty <tt>LinkedTransferQueue</tt>.
371 */
372 public LinkedTransferQueue() {
373 }
374
375 /**
376 * Creates a <tt>LinkedTransferQueue</tt>
377 * initially containing the elements of the given collection,
378 * added in traversal order of the collection's iterator.
379 * @param c the collection of elements to initially contain
380 * @throws NullPointerException if the specified collection or any
381 * of its elements are null
382 */
383 public LinkedTransferQueue(Collection<? extends E> c) {
384 addAll(c);
385 }
386
387 public void put(E e) throws InterruptedException {
388 if (e == null) throw new NullPointerException();
389 if (Thread.interrupted()) throw new InterruptedException();
390 xfer(e, NOWAIT, 0);
391 }
392
393 public boolean offer(E e, long timeout, TimeUnit unit)
394 throws InterruptedException {
395 if (e == null) throw new NullPointerException();
396 if (Thread.interrupted()) throw new InterruptedException();
397 xfer(e, NOWAIT, 0);
398 return true;
399 }
400
401 public boolean offer(E e) {
402 if (e == null) throw new NullPointerException();
403 xfer(e, NOWAIT, 0);
404 return true;
405 }
406
407 public void transfer(E e) throws InterruptedException {
408 if (e == null) throw new NullPointerException();
409 if (xfer(e, WAIT, 0) == null) {
410 Thread.interrupted();
411 throw new InterruptedException();
412 }
413 }
414
415 public boolean tryTransfer(E e, long timeout, TimeUnit unit)
416 throws InterruptedException {
417 if (e == null) throw new NullPointerException();
418 if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
419 return true;
420 if (!Thread.interrupted())
421 return false;
422 throw new InterruptedException();
423 }
424
425 public boolean tryTransfer(E e) {
426 if (e == null) throw new NullPointerException();
427 return fulfill(e) != null;
428 }
429
430 public E take() throws InterruptedException {
431 Object e = xfer(null, WAIT, 0);
432 if (e != null)
433 return (E)e;
434 Thread.interrupted();
435 throw new InterruptedException();
436 }
437
438 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
439 Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
440 if (e != null || !Thread.interrupted())
441 return (E)e;
442 throw new InterruptedException();
443 }
444
445 public E poll() {
446 return (E)fulfill(null);
447 }
448
449 public int drainTo(Collection<? super E> c) {
450 if (c == null)
451 throw new NullPointerException();
452 if (c == this)
453 throw new IllegalArgumentException();
454 int n = 0;
455 E e;
456 while ( (e = poll()) != null) {
457 c.add(e);
458 ++n;
459 }
460 return n;
461 }
462
463 public int drainTo(Collection<? super E> c, int maxElements) {
464 if (c == null)
465 throw new NullPointerException();
466 if (c == this)
467 throw new IllegalArgumentException();
468 int n = 0;
469 E e;
470 while (n < maxElements && (e = poll()) != null) {
471 c.add(e);
472 ++n;
473 }
474 return n;
475 }
476
477 // Traversal-based methods
478
479 /**
480 * Return head after performing any outstanding helping steps
481 */
482 private QNode traversalHead() {
483 for (;;) {
484 QNode t = tail.get();
485 QNode h = head.get();
486 if (h != null && t != null) {
487 QNode last = t.next;
488 QNode first = h.next;
489 if (t == tail.get()) {
490 if (last != null)
491 tail.compareAndSet(t, last);
492 else if (first != null) {
493 Object x = first.get();
494 if (x == first)
495 advanceHead(h, first);
496 else
497 return h;
498 }
499 else
500 return h;
501 }
502 }
503 }
504 }
505
506
507 public Iterator<E> iterator() {
508 return new Itr();
509 }
510
511 /**
512 * Iterators. Basic strategy os to travers list, treating
513 * non-data (i.e., request) nodes as terminating list.
514 * Once a valid data node is found, the item is cached
515 * so that the next call to next() will return it even
516 * if subsequently removed.
517 */
518 class Itr implements Iterator<E> {
519 QNode nextNode; // Next node to return next
520 QNode currentNode; // last returned node, for remove()
521 QNode prevNode; // predecessor of last returned node
522 E nextItem; // Cache of next item, once commited to in next
523
524 Itr() {
525 nextNode = traversalHead();
526 advance();
527 }
528
529 E advance() {
530 prevNode = currentNode;
531 currentNode = nextNode;
532 E x = nextItem;
533
534 QNode p = nextNode.next;
535 for (;;) {
536 if (p == null || !p.isData) {
537 nextNode = null;
538 nextItem = null;
539 return x;
540 }
541 Object item = p.get();
542 if (item != p && item != null) {
543 nextNode = p;
544 nextItem = (E)item;
545 return x;
546 }
547 prevNode = p;
548 p = p.next;
549 }
550 }
551
552 public boolean hasNext() {
553 return nextNode != null;
554 }
555
556 public E next() {
557 if (nextNode == null) throw new NoSuchElementException();
558 return advance();
559 }
560
561 public void remove() {
562 QNode p = currentNode;
563 QNode prev = prevNode;
564 if (prev == null || p == null)
565 throw new IllegalStateException();
566 Object x = p.get();
567 if (x != null && x != p && p.compareAndSet(x, p))
568 clean(prev, p);
569 }
570 }
571
572 public E peek() {
573 for (;;) {
574 QNode h = traversalHead();
575 QNode p = h.next;
576 if (p == null)
577 return null;
578 Object x = p.get();
579 if (p != x) {
580 if (!p.isData)
581 return null;
582 if (x != null)
583 return (E)x;
584 }
585 }
586 }
587
588 public boolean isEmpty() {
589 for (;;) {
590 QNode h = traversalHead();
591 QNode p = h.next;
592 if (p == null)
593 return true;
594 Object x = p.get();
595 if (p != x) {
596 if (!p.isData)
597 return true;
598 if (x != null)
599 return false;
600 }
601 }
602 }
603
604 public boolean hasWaitingConsumer() {
605 for (;;) {
606 QNode h = traversalHead();
607 QNode p = h.next;
608 if (p == null)
609 return false;
610 Object x = p.get();
611 if (p != x)
612 return !p.isData;
613 }
614 }
615
616 /**
617 * Returns the number of elements in this queue. If this queue
618 * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
619 * <tt>Integer.MAX_VALUE</tt>.
620 *
621 * <p>Beware that, unlike in most collections, this method is
622 * <em>NOT</em> a constant-time operation. Because of the
623 * asynchronous nature of these queues, determining the current
624 * number of elements requires an O(n) traversal.
625 *
626 * @return the number of elements in this queue
627 */
628 public int size() {
629 int count = 0;
630 QNode h = traversalHead();
631 for (QNode p = h.next; p != null && p.isData; p = p.next) {
632 Object x = p.get();
633 if (x != null && x != p) {
634 if (++count == Integer.MAX_VALUE) // saturated
635 break;
636 }
637 }
638 return count;
639 }
640
641 public int getWaitingConsumerCount() {
642 int count = 0;
643 QNode h = traversalHead();
644 for (QNode p = h.next; p != null && !p.isData; p = p.next) {
645 if (p.get() == null) {
646 if (++count == Integer.MAX_VALUE)
647 break;
648 }
649 }
650 return count;
651 }
652
653 public int remainingCapacity() {
654 return Integer.MAX_VALUE;
655 }
656
657 /**
658 * Save the state to a stream (that is, serialize it).
659 *
660 * @serialData All of the elements (each an <tt>E</tt>) in
661 * the proper order, followed by a null
662 * @param s the stream
663 */
664 private void writeObject(java.io.ObjectOutputStream s)
665 throws java.io.IOException {
666 s.defaultWriteObject();
667 for (Iterator<E> it = iterator(); it.hasNext(); )
668 s.writeObject(it.next());
669 // Use trailing null as sentinel
670 s.writeObject(null);
671 }
672
673 /**
674 * Reconstitute the Queue instance from a stream (that is,
675 * deserialize it).
676 * @param s the stream
677 */
678 private void readObject(java.io.ObjectInputStream s)
679 throws java.io.IOException, ClassNotFoundException {
680 s.defaultReadObject();
681 for (;;) {
682 E item = (E)s.readObject();
683 if (item == null)
684 break;
685 else
686 offer(item);
687 }
688 }
689 }