ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.4
Committed: Fri Jun 6 16:53:04 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1
Changes since 1.3: +6 -1 lines
Log Message:
Minor doc updates; FairReentrantLock serialize now

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11 dl 1.4 * A bounded blocking queue based on an array. The implementation is
12     * a classic "bounded buffer", in which a fixed-sized array holds
13     * elements inserted by propducers and extracted by
14     * consumers. Array-based queues typically have more predictable
15     * performance than linked queues but lower throughput in most
16     * concurrent applications.
17 tim 1.1 **/
18 dl 1.2 public class ArrayBlockingQueue<E> extends AbstractBlockingQueueFromQueue<E>
19 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
20    
21 dl 1.2 public ArrayBlockingQueue(int maximumSize) {
22     super(new CircularBuffer<E>(maximumSize), maximumSize);
23 tim 1.1 }
24 dl 1.2
25     public ArrayBlockingQueue(int maximumSize, Collection<E> initialElements) {
26     super(new CircularBuffer<E>(maximumSize, initialElements), maximumSize);
27 tim 1.1 }
28    
29     /**
30 dl 1.2 * A classic circular bounded buffer. The bare unsynchronized
31     * version here is practially never useful by itself, so is
32     * defined as a private class to be wrapped with concurrency
33     * control by AbstractBlockingQueueFromQueue.
34     */
35     static private class CircularBuffer<E> extends AbstractQueue<E>
36     implements Queue<E>, java.io.Serializable {
37    
38     private transient final E[] items;
39     private transient int takePtr;
40     private transient int putPtr;
41     private transient int modCount;
42     private int count;
43    
44     /**
45     * An array used only during deserialization, to hold
46     * items read back in from the stream, and then used
47     * as "items" by readResolve via the private constructor.
48     */
49     private transient E[] deserializedItems;
50    
51     /**
52     * Internal constructor also used by readResolve.
53     * Sets all final fields, plus count.
54     * @param cap the maximumSize
55     * @param array the array to use or null if should create new one
56     * @param count the number of items in the array, where indices 0
57     * to count-1 hold items.
58     */
59     private CircularBuffer(int cap, E[] array, int count) {
60     if (cap <= 0)
61     throw new IllegalArgumentException();
62     if (array == null)
63     this.items = new E[cap];
64     else
65     this.items = array;
66     this.putPtr = count;
67     this.count = count;
68     }
69    
70     public CircularBuffer(int maximumSize) {
71     this(maximumSize, null, 0);
72     }
73    
74     public CircularBuffer(int maximumSize, Collection<E> initialElements) {
75     this(maximumSize, null, 0);
76     int size = initialElements.size();
77     if (size > maximumSize) throw new IllegalArgumentException();
78     for (Iterator<E> it = initialElements.iterator(); it.hasNext();) {
79     items[putPtr] = it.next();
80     putPtr = inc(putPtr);
81     }
82     count = size;
83     }
84    
85     public int size() {
86     return count;
87     }
88    
89     /**
90     * Circularly increment i.
91     */
92     int inc(int i) {
93 dl 1.3 return (++i == items.length)? 0 : i;
94 dl 1.2 }
95    
96     public boolean offer(E x) {
97     if (count >= items.length)
98     return false;
99     items[putPtr] = x;
100     putPtr = inc(putPtr);
101     ++modCount;
102     ++count;
103     return true;
104     }
105    
106     public E poll() {
107     if (count == 0)
108     return null;
109     E x = items[takePtr];
110     items[takePtr] = null;
111     takePtr = inc(takePtr);
112     ++modCount;
113     --count;
114     return x;
115     }
116    
117     public E peek() {
118     return (count == 0)? null : items[takePtr];
119     }
120    
121    
122     /**
123     * Utility for remove and iterator.remove: Delete item at position
124     * i by sliding over all others up through putPtr.
125     */
126     void removeAt(int i) {
127     for (;;) {
128     int nexti = inc(i);
129     items[i] = items[nexti];
130     if (nexti != putPtr)
131     i = nexti;
132     else {
133     items[nexti] = null;
134     putPtr = i;
135     ++modCount;
136     --count;
137     return;
138     }
139     }
140     }
141    
142     public boolean remove(Object x) {
143     int i = takePtr;
144     while (i != putPtr && !x.equals(items[i]))
145     i = inc(i);
146     if (i == putPtr)
147     return false;
148    
149     removeAt(i);
150     return true;
151     }
152    
153    
154     public boolean contains(Object x) {
155     for (int i = takePtr; i != putPtr; i = inc(i))
156     if (x.equals(items[i]))
157     return true;
158    
159     return false;
160     }
161    
162     public Object[] toArray() {
163     int size = count;
164     E[] a = new E[size];
165     for (int k = 0, i = takePtr; i != putPtr; i = inc(i))
166     a[k++] = items[i];
167    
168     return a;
169     }
170    
171     public <T> T[] toArray(T[] a) {
172     int size = count;
173     if (a.length < size)
174     a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);
175    
176     for (int k = 0, i = takePtr; i != putPtr; i = inc(i))
177     a[k++] = (T)items[i];
178     if (a.length > size)
179     a[size] = null;
180    
181     return a;
182     }
183    
184     public Iterator<E> iterator() {
185     return new Itr();
186     }
187    
188     private class Itr<E> implements Iterator<E> {
189     /**
190     * Index of element to be returned by next,
191     * or a negative number if no such.
192     */
193     int cursor;
194    
195     /**
196     * Index of element returned by most recent call to next.
197     * Reset to -1 if this element is deleted by a call to remove.
198     */
199     int lastRet;
200    
201     /**
202     * The modCount value that the iterator believes that the
203     * queue should have. If this expectation is violated, the
204     * iterator has detected concurrent modification.
205     */
206     int expectedModCount;
207    
208     Itr() {
209     expectedModCount = modCount;
210     lastRet = -1;
211     cursor = (count > 0)? takePtr : -1;
212     }
213    
214     public boolean hasNext() {
215     return cursor >= 0;
216     }
217    
218     public E next() {
219     if (expectedModCount != modCount)
220     throw new ConcurrentModificationException();
221     if (cursor < 0)
222     throw new NoSuchElementException();
223     lastRet = cursor;
224     cursor = inc(cursor);
225     if (cursor == putPtr)
226     cursor = -1;
227     return (E)items[lastRet];
228     }
229    
230     public void remove() {
231     int i = lastRet;
232     if (i == -1)
233     throw new IllegalStateException();
234     lastRet = -1;
235     if (expectedModCount != modCount || cursor < 0)
236     throw new ConcurrentModificationException();
237    
238     cursor = i; // back up cursor
239     removeAt(i);
240     expectedModCount = modCount;
241     }
242     }
243    
244    
245     /**
246     * Save the state to a stream (that is, serialize it).
247     *
248     * @serialData The maximumSize is emitted (int), followed by all of
249     * its elements (each an <tt>E</tt>) in the proper order.
250     */
251     private void writeObject(java.io.ObjectOutputStream s)
252     throws java.io.IOException {
253    
254     // Write out element count, and any hidden stuff
255     s.defaultWriteObject();
256     // Write out maximumSize == items length
257     s.writeInt(items.length);
258    
259     // Write out all elements in the proper order.
260     for (int i = takePtr; i != putPtr; i = inc(i))
261     s.writeObject(items[i]);
262     }
263 tim 1.1
264 dl 1.2 /**
265     * Reconstitute the Queue instance from a stream (that is,
266     * deserialize it).
267     */
268     private void readObject(java.io.ObjectInputStream s)
269     throws java.io.IOException, ClassNotFoundException {
270     // Read in size, and any hidden stuff
271     s.defaultReadObject();
272     int size = count;
273    
274     // Read in array length and allocate array
275     int arrayLength = s.readInt();
276    
277     // We use deserializedItems here because "items" is final
278     deserializedItems = new E[arrayLength];
279    
280     // Read in all elements in the proper order into deserializedItems
281     for (int i=0; i<size; i++)
282     deserializedItems[i] = (E)s.readObject();
283     }
284    
285     /**
286     * Throw away the object created with readObject, and replace it
287     * with a usable ArrayBlockingQueue.
288     */
289     private Object readResolve() throws java.io.ObjectStreamException {
290     E[] array = deserializedItems;
291     deserializedItems = null;
292     return new CircularBuffer(array.length, array, count);
293     }
294 tim 1.1 }
295     }