ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:39 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRERELEASE_0_1
Changes since 1.1: +278 -58 lines
Log Message:
re-check-in initial implementations

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * A bounded blocking queue based on a fixed-sized array.
12 **/
13 public class ArrayBlockingQueue<E> extends AbstractBlockingQueueFromQueue<E>
14 implements BlockingQueue<E>, java.io.Serializable {
15
16 public ArrayBlockingQueue(int maximumSize) {
17 super(new CircularBuffer<E>(maximumSize), maximumSize);
18 }
19
20 public ArrayBlockingQueue(int maximumSize, Collection<E> initialElements) {
21 super(new CircularBuffer<E>(maximumSize, initialElements), maximumSize);
22 }
23
24 /**
25 * A classic circular bounded buffer. The bare unsynchronized
26 * version here is practially never useful by itself, so is
27 * defined as a private class to be wrapped with concurrency
28 * control by AbstractBlockingQueueFromQueue.
29 */
30 static private class CircularBuffer<E> extends AbstractQueue<E>
31 implements Queue<E>, java.io.Serializable {
32
33 private transient final E[] items;
34 private transient int takePtr;
35 private transient int putPtr;
36 private transient int modCount;
37 private int count;
38
39 /**
40 * An array used only during deserialization, to hold
41 * items read back in from the stream, and then used
42 * as "items" by readResolve via the private constructor.
43 */
44 private transient E[] deserializedItems;
45
46 /**
47 * Internal constructor also used by readResolve.
48 * Sets all final fields, plus count.
49 * @param cap the maximumSize
50 * @param array the array to use or null if should create new one
51 * @param count the number of items in the array, where indices 0
52 * to count-1 hold items.
53 */
54 private CircularBuffer(int cap, E[] array, int count) {
55 if (cap <= 0)
56 throw new IllegalArgumentException();
57 if (array == null)
58 this.items = new E[cap];
59 else
60 this.items = array;
61 this.putPtr = count;
62 this.count = count;
63 }
64
65 public CircularBuffer(int maximumSize) {
66 this(maximumSize, null, 0);
67 }
68
69 public CircularBuffer(int maximumSize, Collection<E> initialElements) {
70 this(maximumSize, null, 0);
71 int size = initialElements.size();
72 if (size > maximumSize) throw new IllegalArgumentException();
73 for (Iterator<E> it = initialElements.iterator(); it.hasNext();) {
74 items[putPtr] = it.next();
75 putPtr = inc(putPtr);
76 }
77 count = size;
78 }
79
80 public int size() {
81 return count;
82 }
83
84 /**
85 * Circularly increment i.
86 */
87 int inc(int i) {
88 return (++i == items.length)? i : 0;
89 }
90
91 public boolean offer(E x) {
92 if (count >= items.length)
93 return false;
94 items[putPtr] = x;
95 putPtr = inc(putPtr);
96 ++modCount;
97 ++count;
98 return true;
99 }
100
101 public E poll() {
102 if (count == 0)
103 return null;
104 E x = items[takePtr];
105 items[takePtr] = null;
106 takePtr = inc(takePtr);
107 ++modCount;
108 --count;
109 return x;
110 }
111
112 public E peek() {
113 return (count == 0)? null : items[takePtr];
114 }
115
116
117 /**
118 * Utility for remove and iterator.remove: Delete item at position
119 * i by sliding over all others up through putPtr.
120 */
121 void removeAt(int i) {
122 for (;;) {
123 int nexti = inc(i);
124 items[i] = items[nexti];
125 if (nexti != putPtr)
126 i = nexti;
127 else {
128 items[nexti] = null;
129 putPtr = i;
130 ++modCount;
131 --count;
132 return;
133 }
134 }
135 }
136
137 public boolean remove(Object x) {
138 int i = takePtr;
139 while (i != putPtr && !x.equals(items[i]))
140 i = inc(i);
141 if (i == putPtr)
142 return false;
143
144 removeAt(i);
145 return true;
146 }
147
148
149 public boolean contains(Object x) {
150 for (int i = takePtr; i != putPtr; i = inc(i))
151 if (x.equals(items[i]))
152 return true;
153
154 return false;
155 }
156
157 public Object[] toArray() {
158 int size = count;
159 E[] a = new E[size];
160 for (int k = 0, i = takePtr; i != putPtr; i = inc(i))
161 a[k++] = items[i];
162 if (a.length > size)
163 a[size] = null;
164
165 return a;
166 }
167
168 public <T> T[] toArray(T[] a) {
169 int size = count;
170 if (a.length < size)
171 a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);
172
173 for (int k = 0, i = takePtr; i != putPtr; i = inc(i))
174 a[k++] = (T)items[i];
175 if (a.length > size)
176 a[size] = null;
177
178 return a;
179 }
180
181 public Iterator<E> iterator() {
182 return new Itr();
183 }
184
185 private class Itr<E> implements Iterator<E> {
186 /**
187 * Index of element to be returned by next,
188 * or a negative number if no such.
189 */
190 int cursor;
191
192 /**
193 * Index of element returned by most recent call to next.
194 * Reset to -1 if this element is deleted by a call to remove.
195 */
196 int lastRet;
197
198 /**
199 * The modCount value that the iterator believes that the
200 * queue should have. If this expectation is violated, the
201 * iterator has detected concurrent modification.
202 */
203 int expectedModCount;
204
205 Itr() {
206 expectedModCount = modCount;
207 lastRet = -1;
208 cursor = (count > 0)? takePtr : -1;
209 }
210
211 public boolean hasNext() {
212 return cursor >= 0;
213 }
214
215 public E next() {
216 if (expectedModCount != modCount)
217 throw new ConcurrentModificationException();
218 if (cursor < 0)
219 throw new NoSuchElementException();
220 lastRet = cursor;
221 cursor = inc(cursor);
222 if (cursor == putPtr)
223 cursor = -1;
224 return (E)items[lastRet];
225 }
226
227 public void remove() {
228 int i = lastRet;
229 if (i == -1)
230 throw new IllegalStateException();
231 lastRet = -1;
232 if (expectedModCount != modCount || cursor < 0)
233 throw new ConcurrentModificationException();
234
235 cursor = i; // back up cursor
236 removeAt(i);
237 expectedModCount = modCount;
238 }
239 }
240
241
242 /**
243 * Save the state to a stream (that is, serialize it).
244 *
245 * @serialData The maximumSize is emitted (int), followed by all of
246 * its elements (each an <tt>E</tt>) in the proper order.
247 */
248 private void writeObject(java.io.ObjectOutputStream s)
249 throws java.io.IOException {
250
251 // Write out element count, and any hidden stuff
252 s.defaultWriteObject();
253 // Write out maximumSize == items length
254 s.writeInt(items.length);
255
256 // Write out all elements in the proper order.
257 for (int i = takePtr; i != putPtr; i = inc(i))
258 s.writeObject(items[i]);
259 }
260
261 /**
262 * Reconstitute the Queue instance from a stream (that is,
263 * deserialize it).
264 */
265 private void readObject(java.io.ObjectInputStream s)
266 throws java.io.IOException, ClassNotFoundException {
267 // Read in size, and any hidden stuff
268 s.defaultReadObject();
269 int size = count;
270
271 // Read in array length and allocate array
272 int arrayLength = s.readInt();
273
274 // We use deserializedItems here because "items" is final
275 deserializedItems = new E[arrayLength];
276
277 // Read in all elements in the proper order into deserializedItems
278 for (int i=0; i<size; i++)
279 deserializedItems[i] = (E)s.readObject();
280 }
281
282 /**
283 * Throw away the object created with readObject, and replace it
284 * with a usable ArrayBlockingQueue.
285 */
286 private Object readResolve() throws java.io.ObjectStreamException {
287 E[] array = deserializedItems;
288 deserializedItems = null;
289 return new CircularBuffer(array.length, array, count);
290 }
291 }
292 }