ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1, JSR166_PRERELEASE_0_1
Changes since 1.1: +311 -25 lines
Log Message:
re-check-in initial implementations

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.*;
9     import java.util.concurrent.atomic.*;
10 tim 1.1
11    
12     /**
13 dl 1.2 * An unbounded thread-safe queue based on linked nodes. LinkedQueues
14     * are an especially good choice when many threads will share access
15     * to acommon queue.
16 tim 1.1 *
17     * <p> This implementation employs an efficient "wait-free" algorithm
18 dl 1.2 * based on one described in <a
19 tim 1.1 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
20     * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
21     * Algorithms</a> by Maged M. Michael and Michael L. Scott.)
22     *
23 dl 1.2 * Beware that, unlike most collections, the <tt>size</tt> method is
24     * <em>NOT</em> a constant-time operation. Because of the asynchronous
25     * nature of these queues, determining the current number of elements
26     * requires an O(n) traversal.
27     *
28 tim 1.1 **/
29 dl 1.2 public class LinkedQueue<E> extends AbstractQueue<E>
30 tim 1.1 implements Queue<E>, java.io.Serializable {
31    
32 dl 1.2 /*
33     * This is a straight adaptation of Michael & Scott algorithm.
34     * For explanation, read the paper.
35     */
36    
37     static class Node {
38     private volatile Object item;
39     private volatile Node next;
40     Node(Object x, Node n) { item = x; next = n; }
41     }
42    
43     // Atomics support
44    
45     private final static AtomicReferenceFieldUpdater<LinkedQueue, Node> tailUpdater = new AtomicReferenceFieldUpdater<LinkedQueue, Node>(new LinkedQueue[0], new Node[0], "tail");
46     private final static AtomicReferenceFieldUpdater<LinkedQueue, Node> headUpdater = new AtomicReferenceFieldUpdater<LinkedQueue, Node>(new LinkedQueue[0], new Node[0], "head");
47     private final static AtomicReferenceFieldUpdater<Node, Node> nextUpdater =
48     new AtomicReferenceFieldUpdater<Node, Node>(new Node[0], new Node[0], "next");
49     private final static AtomicReferenceFieldUpdater<Node, Object> itemUpdater
50     = new AtomicReferenceFieldUpdater<Node, Object>(new Node[0], new Object[0], "item");
51    
52     private boolean casTail(Node cmp, Node val) {
53     return tailUpdater.compareAndSet(this, cmp, val);
54     }
55    
56     private boolean casHead(Node cmp, Node val) {
57     return headUpdater.compareAndSet(this, cmp, val);
58     }
59    
60     private boolean casNext(Node node, Node cmp, Node val) {
61     return nextUpdater.compareAndSet(node, cmp, val);
62     }
63    
64     private boolean casItem(Node node, Object cmp, Object val) {
65     return itemUpdater.compareAndSet(node, cmp, val);
66     }
67    
68    
69     /**
70     * Pointer to header node, initialized to a dummy node. The first
71     * actual node is at head.next.
72     */
73     private transient volatile Node head = new Node(null, null);
74    
75     /** Pointer to last node on list **/
76     private transient volatile Node tail = head;
77    
78     /**
79     * Return the first actual (non-header) node on list.
80     */
81     Node first() { return head.next; }
82    
83 tim 1.1 public LinkedQueue() {}
84 dl 1.2
85     public LinkedQueue(Collection<E> initialElements) {
86     for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
87     add(it.next());
88     }
89    
90    
91 tim 1.1 public boolean add(E x) {
92 dl 1.2 if (x == null) throw new IllegalArgumentException();
93     Node n = new Node(x, null);
94     for(;;) {
95     Node t = tail;
96     Node s = t.next;
97     if (t == tail) {
98     if (s == null) {
99     if (casNext(t, s, n)) {
100     casTail(t, n);
101     return true;
102     }
103     }
104     else {
105     casTail(t, s);
106     }
107     }
108     }
109 tim 1.1 }
110 dl 1.2
111 tim 1.1 public boolean offer(E x) {
112 dl 1.2 return add(x);
113     }
114    
115     public E poll() {
116     for (;;) {
117     Node h = head;
118     Node t = tail;
119     Node first = h.next;
120     if (h == head) {
121     if (h == t) {
122     if (first == null)
123     return null;
124     else
125     casTail(t, first);
126     }
127     else if (casHead(h, first)) {
128     E item = (E)first.item;
129     if (item != null) {
130     itemUpdater.set(first, null);
131     return item;
132     }
133     // else skip over deleted item, continue loop,
134     }
135     }
136     }
137     }
138    
139     public E peek() { // same as poll except don't remove item
140     for (;;) {
141     Node h = head;
142     Node t = tail;
143     Node first = h.next;
144     if (h == head) {
145     if (h == t) {
146     if (first == null)
147     return null;
148     else
149     casTail(t, first);
150     }
151     else {
152     E item = (E)first.item;
153     if (item != null)
154     return item;
155     else // remove deleted node and continue
156     casHead(h, first);
157     }
158     }
159     }
160     }
161    
162     public boolean isEmpty() {
163     return peek() == null;
164 tim 1.1 }
165 dl 1.2
166     /**
167     * Returns the number of elements in this collection.
168     *
169     * Beware that, unlike most collection, this method> is
170     * <em>NOT</em> a constant-time operation. Because of the
171     * asynchronous nature of these queues, determining the current
172     * number of elements requires an O(n) traversal.
173     * @return the number of elements in this collection
174     */
175     public int size() {
176     int count = 0;
177     for (Node p = first(); p != null; p = p.next) {
178     if (p.item != null)
179     ++count;
180     }
181     return count;
182 tim 1.1 }
183 dl 1.2
184     public boolean contains(Object x) {
185     if (x == null) return false;
186     for (Node p = first(); p != null; p = p.next) {
187     Object item = p.item;
188     if (item != null &&
189     x.equals(item))
190     return true;
191     }
192     return false;
193 tim 1.1 }
194    
195     public boolean remove(Object x) {
196 dl 1.2 if (x == null) return false;
197     for (Node p = first(); p != null; p = p.next) {
198     Object item = p.item;
199     if (item != null &&
200     x.equals(item) &&
201     casItem(p, item, null))
202     return true;
203     }
204 tim 1.1 return false;
205     }
206 dl 1.2
207     public Object[] toArray() {
208     // Use ArrayList to deal with resizing.
209     ArrayList al = new ArrayList();
210     for (Node p = first(); p != null; p = p.next) {
211     Object item = p.item;
212     if (item != null)
213     al.add(item);
214     }
215     return al.toArray();
216 tim 1.1 }
217 dl 1.2
218     public <T> T[] toArray(T[] a) {
219     // try to use sent-in array
220     int k = 0;
221     Node p;
222     for (p = first(); p != null && k < a.length; p = p.next) {
223     Object item = p.item;
224     if (item != null)
225     a[k++] = (T)item;
226     }
227     if (p == null) {
228     if (k < a.length)
229     a[k] = null;
230     return a;
231     }
232    
233     // If won't fit, use ArrayList version
234     ArrayList al = new ArrayList();
235     for (Node q = first(); q != null; q = q.next) {
236     Object item = q.item;
237     if (item != null)
238     al.add(item);
239     }
240     return (T[])al.toArray(a);
241 tim 1.1 }
242 dl 1.2
243     public Iterator<E> iterator() {
244     return new Itr();
245 tim 1.1 }
246 dl 1.2
247     private class Itr implements Iterator<E> {
248     private Node current;
249     /**
250     * currentItem holds on to item fields because once we claim
251     * that an element exists in hasNext(), we must return it in
252     * the following next() call even if it was in the process of
253     * being removed when hasNext() was called.
254     **/
255     private E currentItem;
256    
257     Itr() {
258     for (current = first(); current != null; current = current.next) {
259     E item = (E)current.item;
260     if (item != null) {
261     currentItem = item;
262     return;
263     }
264     }
265     }
266    
267     /**
268     * Move to next valid node.
269     * Return previous item, or null if no such.
270     */
271     private E advance() {
272     E x = (E)currentItem;
273     for (;;) {
274     current = current.next;
275     if (current == null) {
276     currentItem = null;
277     return x;
278     }
279     E item = (E)current.item;
280     if (item != null) {
281     currentItem = item;
282     return x;
283     }
284     }
285     }
286    
287     public boolean hasNext() {
288     return current != null;
289     }
290    
291     public E next() {
292     if (current == null) throw new NoSuchElementException();
293     return advance();
294     }
295    
296     public void remove() {
297     if (current == null) throw new NoSuchElementException();
298     // java.util.Iterator contract requires throw if already removed
299     if (currentItem == null) throw new IllegalStateException();
300     // rely on a future traversal to relink.
301     currentItem = null;
302     itemUpdater.set(current, null);
303     }
304 tim 1.1 }
305 dl 1.2
306     /**
307     * Save the state to a stream (that is, serialize it).
308     *
309     * @serialData All of the elements (each an <tt>E</tt>) in
310     * the proper order, followed by a null
311     */
312     private void writeObject(java.io.ObjectOutputStream s)
313     throws java.io.IOException {
314    
315     // Write out any hidden stuff
316     s.defaultWriteObject();
317    
318     // Write out all elements in the proper order.
319     for (Node p = first(); p != null; p = p.next)
320     s.writeObject(p.item);
321    
322     // Use trailing null as sentinel
323     s.writeObject(null);
324 tim 1.1 }
325    
326 dl 1.2 /**
327     * Reconstitute the Queue instance from a stream (that is,
328     * deserialize it).
329     */
330     private void readObject(java.io.ObjectInputStream s)
331     throws java.io.IOException, ClassNotFoundException {
332     // Read in capacity, and any hidden stuff
333     s.defaultReadObject();
334    
335     // Read in all elements and place in queue
336     for (;;) {
337     E item = (E)s.readObject();
338     if (item == null)
339     break;
340     add(item);
341     }
342 tim 1.1 }
343    
344     }