ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ConcurrentLinkedQueue.java
Revision: 1.3
Committed: Thu Jul 31 16:46:39 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.2: +4 -4 lines
Log Message:
Fix unchecked calls to raw type

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8     import java.util.*;
9     import java.util.concurrent.atomic.*;
10    
11    
12     /**
13     * An unbounded thread-safe queue based on linked nodes. ConcurrentLinkedQueues
14     * are an especially good choice when many threads will share access
15 tim 1.2 * to a common queue.
16 dl 1.1 *
17     * <p> This implementation employs an efficient "wait-free" algorithm
18     * based on one described in <a
19     * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
20     * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
21     * Algorithms</a> by Maged M. Michael and Michael L. Scott.)
22     *
23     * Beware that, unlike in most collections, the <tt>size</tt> method
24     * is <em>NOT</em> a constant-time operation. Because of the
25     * asynchronous nature of these queues, determining the current number
26     * of elements requires an O(n) traversal.
27     * @since 1.5
28     * @author Doug Lea
29 tim 1.2 *
30 dl 1.1 **/
31     public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
32     implements Queue<E>, java.io.Serializable {
33    
34     /*
35     * This is a straight adaptation of Michael & Scott algorithm.
36     * For explanation, read the paper. The only (minor) algorithmic
37     * difference is that this version supports lazy deletion of
38     * internal nodes (method remove(Object)) -- remove CAS'es item
39     * fields to null. The normal queue operations unlink but then
40     * pass over nodes with null item fields. Similarly, iteration
41     * methods ignore those with nulls.
42     */
43    
44     // Atomics support
45    
46     private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, AtomicLinkedNode> tailUpdater = new AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, AtomicLinkedNode>(new ConcurrentLinkedQueue[0], new AtomicLinkedNode[0], "tail");
47     private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, AtomicLinkedNode> headUpdater = new AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, AtomicLinkedNode>(new ConcurrentLinkedQueue[0], new AtomicLinkedNode[0], "head");
48    
49     private boolean casTail(AtomicLinkedNode cmp, AtomicLinkedNode val) {
50     return tailUpdater.compareAndSet(this, cmp, val);
51     }
52    
53     private boolean casHead(AtomicLinkedNode cmp, AtomicLinkedNode val) {
54     return headUpdater.compareAndSet(this, cmp, val);
55     }
56    
57    
58 tim 1.2 /**
59 dl 1.1 * Pointer to header node, initialized to a dummy node. The first
60     * actual node is at head.getNext().
61     */
62     private transient volatile AtomicLinkedNode head = new AtomicLinkedNode(null, null);
63    
64     /** Pointer to last node on list **/
65     private transient volatile AtomicLinkedNode tail = head;
66    
67    
68     /**
69     * Creates an initially empty ConcurrentLinkedQueue.
70     */
71     public ConcurrentLinkedQueue() {}
72    
73     /**
74     * Creates a ConcurrentLinkedQueue initially holding the elements
75 tim 1.2 * of the given collection. The elements are added in
76 dl 1.1 * iterator traversal order.
77     *
78     * @param initialElements the collections whose elements are to be added.
79     */
80 tim 1.2 public ConcurrentLinkedQueue(Collection<? extends E> initialElements) {
81     for (Iterator<? extends E> it = initialElements.iterator(); it.hasNext();)
82 dl 1.1 add(it.next());
83     }
84    
85     public boolean offer(E x) {
86     if (x == null) throw new NullPointerException();
87     AtomicLinkedNode n = new AtomicLinkedNode(x, null);
88     for(;;) {
89     AtomicLinkedNode t = tail;
90     AtomicLinkedNode s = t.getNext();
91     if (t == tail) {
92     if (s == null) {
93     if (t.casNext(s, n)) {
94 tim 1.2 casTail(t, n);
95 dl 1.1 return true;
96     }
97     }
98     else {
99     casTail(t, s);
100     }
101     }
102     }
103     }
104    
105     public E poll() {
106     for (;;) {
107     AtomicLinkedNode h = head;
108     AtomicLinkedNode t = tail;
109     AtomicLinkedNode first = h.getNext();
110     if (h == head) {
111     if (h == t) {
112     if (first == null)
113     return null;
114     else
115     casTail(t, first);
116     }
117     else if (casHead(h, first)) {
118     E item = (E)first.getItem();
119     if (item != null) {
120     first.setItem(null);
121     return item;
122     }
123 tim 1.2 // else skip over deleted item, continue loop,
124 dl 1.1 }
125     }
126     }
127     }
128    
129     public E peek() { // same as poll except don't remove item
130     for (;;) {
131     AtomicLinkedNode h = head;
132     AtomicLinkedNode t = tail;
133     AtomicLinkedNode first = h.getNext();
134     if (h == head) {
135     if (h == t) {
136     if (first == null)
137     return null;
138     else
139     casTail(t, first);
140     }
141     else {
142     E item = (E)first.getItem();
143     if (item != null)
144     return item;
145     else // remove deleted node and continue
146     casHead(h, first);
147     }
148     }
149     }
150     }
151    
152     /**
153     * Return the first actual (non-header) node on list. This is yet
154     * another variant of poll/peek; here returning out the first
155     * node, not element (so we cannot collapse with peek() without
156     * introducing race.)
157     */
158 tim 1.2 AtomicLinkedNode first() {
159 dl 1.1 for (;;) {
160     AtomicLinkedNode h = head;
161     AtomicLinkedNode t = tail;
162     AtomicLinkedNode first = h.getNext();
163     if (h == head) {
164     if (h == t) {
165     if (first == null)
166     return null;
167     else
168     casTail(t, first);
169     }
170     else {
171     if (first.getItem() != null)
172     return first;
173     else // remove deleted node and continue
174     casHead(h, first);
175     }
176     }
177     }
178     }
179    
180    
181     public boolean isEmpty() {
182     return first() == null;
183     }
184    
185     /**
186 tim 1.2 * Returns the number of elements in this collection.
187     *
188 dl 1.1 * Beware that, unlike in most collection, this method> is
189     * <em>NOT</em> a constant-time operation. Because of the
190     * asynchronous nature of these queues, determining the current
191     * number of elements requires an O(n) traversal.
192     * @return the number of elements in this collection
193 tim 1.2 */
194 dl 1.1 public int size() {
195     int count = 0;
196     for (AtomicLinkedNode p = first(); p != null; p = p.getNext()) {
197     if (p.getItem() != null)
198     ++count;
199     }
200     return count;
201     }
202    
203     public boolean contains(Object x) {
204     if (x == null) return false;
205     for (AtomicLinkedNode p = first(); p != null; p = p.getNext()) {
206     Object item = p.getItem();
207 tim 1.2 if (item != null &&
208 dl 1.1 x.equals(item))
209     return true;
210     }
211     return false;
212     }
213    
214     public boolean remove(Object x) {
215     if (x == null) return false;
216     for (AtomicLinkedNode p = first(); p != null; p = p.getNext()) {
217     Object item = p.getItem();
218 tim 1.2 if (item != null &&
219 dl 1.1 x.equals(item) &&
220     p.casItem(item, null))
221     return true;
222     }
223     return false;
224     }
225 tim 1.2
226 dl 1.1 public Object[] toArray() {
227     // Use ArrayList to deal with resizing.
228 tim 1.3 ArrayList<E> al = new ArrayList<E>();
229 dl 1.1 for (AtomicLinkedNode p = first(); p != null; p = p.getNext()) {
230 tim 1.3 E item = (E) p.getItem();
231 dl 1.1 if (item != null)
232     al.add(item);
233     }
234     return al.toArray();
235     }
236    
237     public <T> T[] toArray(T[] a) {
238     // try to use sent-in array
239     int k = 0;
240     AtomicLinkedNode p;
241     for (p = first(); p != null && k < a.length; p = p.getNext()) {
242     Object item = p.getItem();
243     if (item != null)
244     a[k++] = (T)item;
245     }
246     if (p == null) {
247     if (k < a.length)
248     a[k] = null;
249     return a;
250     }
251    
252     // If won't fit, use ArrayList version
253 tim 1.3 ArrayList<E> al = new ArrayList<E>();
254 dl 1.1 for (AtomicLinkedNode q = first(); q != null; q = q.getNext()) {
255 tim 1.3 E item = (E) q.getItem();
256 dl 1.1 if (item != null)
257     al.add(item);
258     }
259     return (T[])al.toArray(a);
260     }
261    
262     public Iterator<E> iterator() {
263     return new Itr();
264     }
265    
266     private class Itr implements Iterator<E> {
267     /**
268     * Next node to return item for.
269     */
270     private AtomicLinkedNode nextNode;
271    
272 tim 1.2 /**
273 dl 1.1 * nextItem holds on to item fields because once we claim
274     * that an element exists in hasNext(), we must return it in
275     * the following next() call even if it was in the process of
276     * being removed when hasNext() was called.
277     **/
278     private E nextItem;
279    
280     /**
281     * Node of the last returned item, to support remove.
282     */
283     private AtomicLinkedNode lastRet;
284    
285 tim 1.2 Itr() {
286 dl 1.1 advance();
287     }
288 tim 1.2
289 dl 1.1 /**
290 tim 1.2 * Move to next valid node.
291 dl 1.1 * Return item to return for next(), or null if no such.
292     */
293 tim 1.2 private E advance() {
294 dl 1.1 lastRet = nextNode;
295     E x = (E)nextItem;
296    
297     AtomicLinkedNode p = (nextNode == null)? first() : nextNode.getNext();
298     for (;;) {
299     if (p == null) {
300     nextNode = null;
301     nextItem = null;
302     return x;
303     }
304     E item = (E)p.getItem();
305     if (item != null) {
306     nextNode = p;
307     nextItem = item;
308     return x;
309     }
310 tim 1.2 else // skip over nulls
311 dl 1.1 p = p.getNext();
312     }
313     }
314 tim 1.2
315 dl 1.1 public boolean hasNext() {
316     return nextNode != null;
317     }
318 tim 1.2
319 dl 1.1 public E next() {
320     if (nextNode == null) throw new NoSuchElementException();
321     return advance();
322     }
323 tim 1.2
324 dl 1.1 public void remove() {
325     AtomicLinkedNode l = lastRet;
326     if (l == null) throw new IllegalStateException();
327     // rely on a future traversal to relink.
328     l.setItem(null);
329     lastRet = null;
330     }
331     }
332    
333     /**
334     * Save the state to a stream (that is, serialize it).
335     *
336     * @serialData All of the elements (each an <tt>E</tt>) in
337     * the proper order, followed by a null
338     * @param s the stream
339     */
340     private void writeObject(java.io.ObjectOutputStream s)
341     throws java.io.IOException {
342    
343     // Write out any hidden stuff
344     s.defaultWriteObject();
345 tim 1.2
346 dl 1.1 // Write out all elements in the proper order.
347     for (AtomicLinkedNode p = first(); p != null; p = p.getNext()) {
348     Object item = p.getItem();
349     if (item != null)
350     s.writeObject(item);
351     }
352    
353     // Use trailing null as sentinel
354     s.writeObject(null);
355     }
356    
357     /**
358     * Reconstitute the Queue instance from a stream (that is,
359     * deserialize it).
360     * @param s the stream
361     */
362     private void readObject(java.io.ObjectInputStream s)
363     throws java.io.IOException, ClassNotFoundException {
364 tim 1.2 // Read in capacity, and any hidden stuff
365     s.defaultReadObject();
366 dl 1.1
367 tim 1.2 // Read in all elements and place in queue
368 dl 1.1 for (;;) {
369     E item = (E)s.readObject();
370     if (item == null)
371     break;
372     add(item);
373     }
374     }
375    
376     }