ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ConcurrentLinkedQueue.java
Revision: 1.52
Committed: Wed Jul 29 20:53:30 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.51: +2 -1 lines
Log Message:
6866554: Misc. javadoc warnings

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.util.Queue;
15
16 /**
17 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
18 * This queue orders elements FIFO (first-in-first-out).
19 * The <em>head</em> of the queue is that element that has been on the
20 * queue the longest time.
21 * The <em>tail</em> of the queue is that element that has been on the
22 * queue the shortest time. New elements
23 * are inserted at the tail of the queue, and the queue retrieval
24 * operations obtain elements at the head of the queue.
25 * A {@code ConcurrentLinkedQueue} is an appropriate choice when
26 * many threads will share access to a common collection.
27 * This queue does not permit {@code null} elements.
28 *
29 * <p>This implementation employs an efficient &quot;wait-free&quot;
30 * algorithm based on one described in <a
31 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
32 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
33 * Algorithms</a> by Maged M. Michael and Michael L. Scott.
34 *
35 * <p>Beware that, unlike in most collections, the {@code size} method
36 * is <em>NOT</em> a constant-time operation. Because of the
37 * asynchronous nature of these queues, determining the current number
38 * of elements requires a traversal of the elements.
39 *
40 * <p>This class and its iterator implement all of the
41 * <em>optional</em> methods of the {@link Collection} and {@link
42 * Iterator} interfaces.
43 *
44 * <p>Memory consistency effects: As with other concurrent
45 * collections, actions in a thread prior to placing an object into a
46 * {@code ConcurrentLinkedQueue}
47 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
48 * actions subsequent to the access or removal of that element from
49 * the {@code ConcurrentLinkedQueue} in another thread.
50 *
51 * <p>This class is a member of the
52 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
53 * Java Collections Framework</a>.
54 *
55 * @since 1.5
56 * @author Doug Lea
57 * @param <E> the type of elements held in this collection
58 *
59 */
60 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
61 implements Queue<E>, java.io.Serializable {
62 private static final long serialVersionUID = 196745693267521676L;
63
64 /*
65 * This is a modification of the Michael & Scott algorithm,
66 * adapted for a garbage-collected environment, with support for
67 * interior node deletion (to support remove(Object)). For
68 * explanation, read the paper.
69 *
70 * Note that like most non-blocking algorithms in this package,
71 * this implementation relies on the fact that in garbage
72 * collected systems, there is no possibility of ABA problems due
73 * to recycled nodes, so there is no need to use "counted
74 * pointers" or related techniques seen in versions used in
75 * non-GC'ed settings.
76 *
77 * The fundamental invariants are:
78 * - There is exactly one (last) Node with a null next reference,
79 * which is CASed when enqueueing. This last Node can be
80 * reached in O(1) time from tail, but tail is merely an
81 * optimization - it can always be reached in O(N) time from
82 * head as well.
83 * - The elements contained in the queue are the non-null items in
84 * Nodes that are reachable from head. CASing the item
85 * reference of a Node to null atomically removes it from the
86 * queue. Reachability of all elements from head must remain
87 * true even in the case of concurrent modifications that cause
88 * head to advance. A dequeued Node may remain in use
89 * indefinitely due to creation of an Iterator or simply a
90 * poll() that has lost its time slice.
91 *
92 * The above might appear to imply that all Nodes are GC-reachable
93 * from a predecessor dequeued Node. That would cause two problems:
94 * - allow a rogue Iterator to cause unbounded memory retention
95 * - cause cross-generational linking of old Nodes to new Nodes if
96 * a Node was tenured while live, which generational GCs have a
97 * hard time dealing with, causing repeated major collections.
98 * However, only non-deleted Nodes need to be reachable from
99 * dequeued Nodes, and reachability does not necessarily have to
100 * be of the kind understood by the GC. We use the trick of
101 * linking a Node that has just been dequeued to itself. Such a
102 * self-link implicitly means to advance to head.
103 *
104 * Both head and tail are permitted to lag. In fact, failing to
105 * update them every time one could is a significant optimization
106 * (fewer CASes). This is controlled by local "hops" variables
107 * that only trigger helping-CASes after experiencing multiple
108 * lags.
109 *
110 * Since head and tail are updated concurrently and independently,
111 * it is possible for tail to lag behind head (why not)?
112 *
113 * CASing a Node's item reference to null atomically removes the
114 * element from the queue. Iterators skip over Nodes with null
115 * items. Prior implementations of this class had a race between
116 * poll() and remove(Object) where the same element would appear
117 * to be successfully removed by two concurrent operations. The
118 * method remove(Object) also lazily unlinks deleted Nodes, but
119 * this is merely an optimization.
120 *
121 * When constructing a Node (before enqueuing it) we avoid paying
122 * for a volatile write to item by using lazySet instead of a
123 * normal write. This allows the cost of enqueue to be
124 * "one-and-a-half" CASes.
125 *
126 * Both head and tail may or may not point to a Node with a
127 * non-null item. If the queue is empty, all items must of course
128 * be null. Upon creation, both head and tail refer to a dummy
129 * Node with null item. Both head and tail are only updated using
130 * CAS, so they never regress, although again this is merely an
131 * optimization.
132 */
133
134 private static class Node<E> {
135 private volatile E item;
136 private volatile Node<E> next;
137
138 Node(E item) {
139 // Piggyback on imminent casNext()
140 lazySetItem(item);
141 }
142
143 E getItem() {
144 return item;
145 }
146
147 boolean casItem(E cmp, E val) {
148 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
149 }
150
151 void setItem(E val) {
152 item = val;
153 }
154
155 void lazySetItem(E val) {
156 UNSAFE.putOrderedObject(this, itemOffset, val);
157 }
158
159 void lazySetNext(Node<E> val) {
160 UNSAFE.putOrderedObject(this, nextOffset, val);
161 }
162
163 Node<E> getNext() {
164 return next;
165 }
166
167 boolean casNext(Node<E> cmp, Node<E> val) {
168 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
169 }
170
171 // Unsafe mechanics
172
173 private static final sun.misc.Unsafe UNSAFE =
174 sun.misc.Unsafe.getUnsafe();
175 private static final long nextOffset =
176 objectFieldOffset(UNSAFE, "next", Node.class);
177 private static final long itemOffset =
178 objectFieldOffset(UNSAFE, "item", Node.class);
179 }
180
181 /**
182 * A node from which the first live (non-deleted) node (if any)
183 * can be reached in O(1) time.
184 * Invariants:
185 * - all live nodes are reachable from head via succ()
186 * - head != null
187 * - (tmp = head).next != tmp || tmp != head
188 * Non-invariants:
189 * - head.item may or may not be null.
190 * - it is permitted for tail to lag behind head, that is, for tail
191 * to not be reachable from head!
192 */
193 private transient volatile Node<E> head = new Node<E>(null);
194
195 /**
196 * A node from which the last node on list (that is, the unique
197 * node with node.next == null) can be reached in O(1) time.
198 * Invariants:
199 * - the last node is always reachable from tail via succ()
200 * - tail != null
201 * Non-invariants:
202 * - tail.item may or may not be null.
203 * - it is permitted for tail to lag behind head, that is, for tail
204 * to not be reachable from head!
205 * - tail.next may or may not be self-pointing to tail.
206 */
207 private transient volatile Node<E> tail = head;
208
209
210 /**
211 * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
212 */
213 public ConcurrentLinkedQueue() {}
214
215 /**
216 * Creates a {@code ConcurrentLinkedQueue}
217 * initially containing the elements of the given collection,
218 * added in traversal order of the collection's iterator.
219 * @param c the collection of elements to initially contain
220 * @throws NullPointerException if the specified collection or any
221 * of its elements are null
222 */
223 public ConcurrentLinkedQueue(Collection<? extends E> c) {
224 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
225 add(it.next());
226 }
227
228 // Have to override just to update the javadoc
229
230 /**
231 * Inserts the specified element at the tail of this queue.
232 *
233 * @return {@code true} (as specified by {@link Collection#add})
234 * @throws NullPointerException if the specified element is null
235 */
236 public boolean add(E e) {
237 return offer(e);
238 }
239
240 /**
241 * We don't bother to update head or tail pointers if fewer than
242 * HOPS links from "true" location. We assume that volatile
243 * writes are significantly more expensive than volatile reads.
244 */
245 private static final int HOPS = 1;
246
247 /**
248 * Try to CAS head to p. If successful, repoint old head to itself
249 * as sentinel for succ(), below.
250 */
251 final void updateHead(Node<E> h, Node<E> p) {
252 if (h != p && casHead(h, p))
253 h.lazySetNext(h);
254 }
255
256 /**
257 * Returns the successor of p, or the head node if p.next has been
258 * linked to self, which will only be true if traversing with a
259 * stale pointer that is now off the list.
260 */
261 final Node<E> succ(Node<E> p) {
262 Node<E> next = p.getNext();
263 return (p == next) ? head : next;
264 }
265
266 /**
267 * Inserts the specified element at the tail of this queue.
268 *
269 * @return {@code true} (as specified by {@link Queue#offer})
270 * @throws NullPointerException if the specified element is null
271 */
272 public boolean offer(E e) {
273 if (e == null) throw new NullPointerException();
274 Node<E> n = new Node<E>(e);
275 retry:
276 for (;;) {
277 Node<E> t = tail;
278 Node<E> p = t;
279 for (int hops = 0; ; hops++) {
280 Node<E> next = succ(p);
281 if (next != null) {
282 if (hops > HOPS && t != tail)
283 continue retry;
284 p = next;
285 } else if (p.casNext(null, n)) {
286 if (hops >= HOPS)
287 casTail(t, n); // Failure is OK.
288 return true;
289 } else {
290 p = succ(p);
291 }
292 }
293 }
294 }
295
296 public E poll() {
297 Node<E> h = head;
298 Node<E> p = h;
299 for (int hops = 0; ; hops++) {
300 E item = p.getItem();
301
302 if (item != null && p.casItem(item, null)) {
303 if (hops >= HOPS) {
304 Node<E> q = p.getNext();
305 updateHead(h, (q != null) ? q : p);
306 }
307 return item;
308 }
309 Node<E> next = succ(p);
310 if (next == null) {
311 updateHead(h, p);
312 break;
313 }
314 p = next;
315 }
316 return null;
317 }
318
319 public E peek() {
320 Node<E> h = head;
321 Node<E> p = h;
322 E item;
323 for (;;) {
324 item = p.getItem();
325 if (item != null)
326 break;
327 Node<E> next = succ(p);
328 if (next == null) {
329 break;
330 }
331 p = next;
332 }
333 updateHead(h, p);
334 return item;
335 }
336
337 /**
338 * Returns the first live (non-deleted) node on list, or null if none.
339 * This is yet another variant of poll/peek; here returning the
340 * first node, not element. We could make peek() a wrapper around
341 * first(), but that would cost an extra volatile read of item,
342 * and the need to add a retry loop to deal with the possibility
343 * of losing a race to a concurrent poll().
344 */
345 Node<E> first() {
346 Node<E> h = head;
347 Node<E> p = h;
348 Node<E> result;
349 for (;;) {
350 E item = p.getItem();
351 if (item != null) {
352 result = p;
353 break;
354 }
355 Node<E> next = succ(p);
356 if (next == null) {
357 result = null;
358 break;
359 }
360 p = next;
361 }
362 updateHead(h, p);
363 return result;
364 }
365
366 /**
367 * Returns {@code true} if this queue contains no elements.
368 *
369 * @return {@code true} if this queue contains no elements
370 */
371 public boolean isEmpty() {
372 return first() == null;
373 }
374
375 /**
376 * Returns the number of elements in this queue. If this queue
377 * contains more than {@code Integer.MAX_VALUE} elements, returns
378 * {@code Integer.MAX_VALUE}.
379 *
380 * <p>Beware that, unlike in most collections, this method is
381 * <em>NOT</em> a constant-time operation. Because of the
382 * asynchronous nature of these queues, determining the current
383 * number of elements requires an O(n) traversal.
384 *
385 * @return the number of elements in this queue
386 */
387 public int size() {
388 int count = 0;
389 for (Node<E> p = first(); p != null; p = succ(p)) {
390 if (p.getItem() != null) {
391 // Collections.size() spec says to max out
392 if (++count == Integer.MAX_VALUE)
393 break;
394 }
395 }
396 return count;
397 }
398
399 /**
400 * Returns {@code true} if this queue contains the specified element.
401 * More formally, returns {@code true} if and only if this queue contains
402 * at least one element {@code e} such that {@code o.equals(e)}.
403 *
404 * @param o object to be checked for containment in this queue
405 * @return {@code true} if this queue contains the specified element
406 */
407 public boolean contains(Object o) {
408 if (o == null) return false;
409 for (Node<E> p = first(); p != null; p = succ(p)) {
410 E item = p.getItem();
411 if (item != null &&
412 o.equals(item))
413 return true;
414 }
415 return false;
416 }
417
418 /**
419 * Removes a single instance of the specified element from this queue,
420 * if it is present. More formally, removes an element {@code e} such
421 * that {@code o.equals(e)}, if this queue contains one or more such
422 * elements.
423 * Returns {@code true} if this queue contained the specified element
424 * (or equivalently, if this queue changed as a result of the call).
425 *
426 * @param o element to be removed from this queue, if present
427 * @return {@code true} if this queue changed as a result of the call
428 */
429 public boolean remove(Object o) {
430 if (o == null) return false;
431 Node<E> pred = null;
432 for (Node<E> p = first(); p != null; p = succ(p)) {
433 E item = p.getItem();
434 if (item != null &&
435 o.equals(item) &&
436 p.casItem(item, null)) {
437 Node<E> next = succ(p);
438 if (pred != null && next != null)
439 pred.casNext(p, next);
440 return true;
441 }
442 pred = p;
443 }
444 return false;
445 }
446
447 /**
448 * Returns an array containing all of the elements in this queue, in
449 * proper sequence.
450 *
451 * <p>The returned array will be "safe" in that no references to it are
452 * maintained by this queue. (In other words, this method must allocate
453 * a new array). The caller is thus free to modify the returned array.
454 *
455 * <p>This method acts as bridge between array-based and collection-based
456 * APIs.
457 *
458 * @return an array containing all of the elements in this queue
459 */
460 public Object[] toArray() {
461 // Use ArrayList to deal with resizing.
462 ArrayList<E> al = new ArrayList<E>();
463 for (Node<E> p = first(); p != null; p = succ(p)) {
464 E item = p.getItem();
465 if (item != null)
466 al.add(item);
467 }
468 return al.toArray();
469 }
470
471 /**
472 * Returns an array containing all of the elements in this queue, in
473 * proper sequence; the runtime type of the returned array is that of
474 * the specified array. If the queue fits in the specified array, it
475 * is returned therein. Otherwise, a new array is allocated with the
476 * runtime type of the specified array and the size of this queue.
477 *
478 * <p>If this queue fits in the specified array with room to spare
479 * (i.e., the array has more elements than this queue), the element in
480 * the array immediately following the end of the queue is set to
481 * {@code null}.
482 *
483 * <p>Like the {@link #toArray()} method, this method acts as bridge between
484 * array-based and collection-based APIs. Further, this method allows
485 * precise control over the runtime type of the output array, and may,
486 * under certain circumstances, be used to save allocation costs.
487 *
488 * <p>Suppose {@code x} is a queue known to contain only strings.
489 * The following code can be used to dump the queue into a newly
490 * allocated array of {@code String}:
491 *
492 * <pre>
493 * String[] y = x.toArray(new String[0]);</pre>
494 *
495 * Note that {@code toArray(new Object[0])} is identical in function to
496 * {@code toArray()}.
497 *
498 * @param a the array into which the elements of the queue are to
499 * be stored, if it is big enough; otherwise, a new array of the
500 * same runtime type is allocated for this purpose
501 * @return an array containing all of the elements in this queue
502 * @throws ArrayStoreException if the runtime type of the specified array
503 * is not a supertype of the runtime type of every element in
504 * this queue
505 * @throws NullPointerException if the specified array is null
506 */
507 @SuppressWarnings("unchecked")
508 public <T> T[] toArray(T[] a) {
509 // try to use sent-in array
510 int k = 0;
511 Node<E> p;
512 for (p = first(); p != null && k < a.length; p = succ(p)) {
513 E item = p.getItem();
514 if (item != null)
515 a[k++] = (T)item;
516 }
517 if (p == null) {
518 if (k < a.length)
519 a[k] = null;
520 return a;
521 }
522
523 // If won't fit, use ArrayList version
524 ArrayList<E> al = new ArrayList<E>();
525 for (Node<E> q = first(); q != null; q = succ(q)) {
526 E item = q.getItem();
527 if (item != null)
528 al.add(item);
529 }
530 return al.toArray(a);
531 }
532
533 /**
534 * Returns an iterator over the elements in this queue in proper sequence.
535 * The returned iterator is a "weakly consistent" iterator that
536 * will never throw {@link java.util.ConcurrentModificationException
537 * ConcurrentModificationException},
538 * and guarantees to traverse elements as they existed upon
539 * construction of the iterator, and may (but is not guaranteed to)
540 * reflect any modifications subsequent to construction.
541 *
542 * @return an iterator over the elements in this queue in proper sequence
543 */
544 public Iterator<E> iterator() {
545 return new Itr();
546 }
547
548 private class Itr implements Iterator<E> {
549 /**
550 * Next node to return item for.
551 */
552 private Node<E> nextNode;
553
554 /**
555 * nextItem holds on to item fields because once we claim
556 * that an element exists in hasNext(), we must return it in
557 * the following next() call even if it was in the process of
558 * being removed when hasNext() was called.
559 */
560 private E nextItem;
561
562 /**
563 * Node of the last returned item, to support remove.
564 */
565 private Node<E> lastRet;
566
567 Itr() {
568 advance();
569 }
570
571 /**
572 * Moves to next valid node and returns item to return for
573 * next(), or null if no such.
574 */
575 private E advance() {
576 lastRet = nextNode;
577 E x = nextItem;
578
579 Node<E> pred, p;
580 if (nextNode == null) {
581 p = first();
582 pred = null;
583 } else {
584 pred = nextNode;
585 p = succ(nextNode);
586 }
587
588 for (;;) {
589 if (p == null) {
590 nextNode = null;
591 nextItem = null;
592 return x;
593 }
594 E item = p.getItem();
595 if (item != null) {
596 nextNode = p;
597 nextItem = item;
598 return x;
599 } else {
600 // skip over nulls
601 Node<E> next = succ(p);
602 if (pred != null && next != null)
603 pred.casNext(p, next);
604 p = next;
605 }
606 }
607 }
608
609 public boolean hasNext() {
610 return nextNode != null;
611 }
612
613 public E next() {
614 if (nextNode == null) throw new NoSuchElementException();
615 return advance();
616 }
617
618 public void remove() {
619 Node<E> l = lastRet;
620 if (l == null) throw new IllegalStateException();
621 // rely on a future traversal to relink.
622 l.setItem(null);
623 lastRet = null;
624 }
625 }
626
627 /**
628 * Save the state to a stream (that is, serialize it).
629 *
630 * @serialData All of the elements (each an {@code E}) in
631 * the proper order, followed by a null
632 * @param s the stream
633 */
634 private void writeObject(java.io.ObjectOutputStream s)
635 throws java.io.IOException {
636
637 // Write out any hidden stuff
638 s.defaultWriteObject();
639
640 // Write out all elements in the proper order.
641 for (Node<E> p = first(); p != null; p = succ(p)) {
642 Object item = p.getItem();
643 if (item != null)
644 s.writeObject(item);
645 }
646
647 // Use trailing null as sentinel
648 s.writeObject(null);
649 }
650
651 /**
652 * Reconstitute the Queue instance from a stream (that is,
653 * deserialize it).
654 * @param s the stream
655 */
656 private void readObject(java.io.ObjectInputStream s)
657 throws java.io.IOException, ClassNotFoundException {
658 // Read in capacity, and any hidden stuff
659 s.defaultReadObject();
660 head = new Node<E>(null);
661 tail = head;
662 // Read in all elements and place in queue
663 for (;;) {
664 @SuppressWarnings("unchecked")
665 E item = (E)s.readObject();
666 if (item == null)
667 break;
668 else
669 offer(item);
670 }
671 }
672
673 // Unsafe mechanics
674
675 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
676 private static final long headOffset =
677 objectFieldOffset(UNSAFE, "head", ConcurrentLinkedQueue.class);
678 private static final long tailOffset =
679 objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedQueue.class);
680
681 private boolean casTail(Node<E> cmp, Node<E> val) {
682 return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
683 }
684
685 private boolean casHead(Node<E> cmp, Node<E> val) {
686 return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
687 }
688
689 private void lazySetHead(Node<E> val) {
690 UNSAFE.putOrderedObject(this, headOffset, val);
691 }
692
693 static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
694 String field, Class<?> klazz) {
695 try {
696 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
697 } catch (NoSuchFieldException e) {
698 // Convert Exception to corresponding Error
699 NoSuchFieldError error = new NoSuchFieldError(field);
700 error.initCause(e);
701 throw error;
702 }
703 }
704 }