1 |
/* |
2 |
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
* Expert Group and released to the public domain. Use, modify, and |
4 |
* redistribute this code in any way without acknowledgement. |
5 |
*/ |
6 |
|
7 |
package java.util.concurrent; |
8 |
import java.util.*; |
9 |
|
10 |
/** |
11 |
* A bounded blocking queue based on a fixed-sized array. |
12 |
**/ |
13 |
public class ArrayBlockingQueue<E> extends AbstractBlockingQueueFromQueue<E> |
14 |
implements BlockingQueue<E>, java.io.Serializable { |
15 |
|
16 |
public ArrayBlockingQueue(int maximumSize) { |
17 |
super(new CircularBuffer<E>(maximumSize), maximumSize); |
18 |
} |
19 |
|
20 |
public ArrayBlockingQueue(int maximumSize, Collection<E> initialElements) { |
21 |
super(new CircularBuffer<E>(maximumSize, initialElements), maximumSize); |
22 |
} |
23 |
|
24 |
/** |
25 |
* A classic circular bounded buffer. The bare unsynchronized |
26 |
* version here is practially never useful by itself, so is |
27 |
* defined as a private class to be wrapped with concurrency |
28 |
* control by AbstractBlockingQueueFromQueue. |
29 |
*/ |
30 |
static private class CircularBuffer<E> extends AbstractQueue<E> |
31 |
implements Queue<E>, java.io.Serializable { |
32 |
|
33 |
private transient final E[] items; |
34 |
private transient int takePtr; |
35 |
private transient int putPtr; |
36 |
private transient int modCount; |
37 |
private int count; |
38 |
|
39 |
/** |
40 |
* An array used only during deserialization, to hold |
41 |
* items read back in from the stream, and then used |
42 |
* as "items" by readResolve via the private constructor. |
43 |
*/ |
44 |
private transient E[] deserializedItems; |
45 |
|
46 |
/** |
47 |
* Internal constructor also used by readResolve. |
48 |
* Sets all final fields, plus count. |
49 |
* @param cap the maximumSize |
50 |
* @param array the array to use or null if should create new one |
51 |
* @param count the number of items in the array, where indices 0 |
52 |
* to count-1 hold items. |
53 |
*/ |
54 |
private CircularBuffer(int cap, E[] array, int count) { |
55 |
if (cap <= 0) |
56 |
throw new IllegalArgumentException(); |
57 |
if (array == null) |
58 |
this.items = new E[cap]; |
59 |
else |
60 |
this.items = array; |
61 |
this.putPtr = count; |
62 |
this.count = count; |
63 |
} |
64 |
|
65 |
public CircularBuffer(int maximumSize) { |
66 |
this(maximumSize, null, 0); |
67 |
} |
68 |
|
69 |
public CircularBuffer(int maximumSize, Collection<E> initialElements) { |
70 |
this(maximumSize, null, 0); |
71 |
int size = initialElements.size(); |
72 |
if (size > maximumSize) throw new IllegalArgumentException(); |
73 |
for (Iterator<E> it = initialElements.iterator(); it.hasNext();) { |
74 |
items[putPtr] = it.next(); |
75 |
putPtr = inc(putPtr); |
76 |
} |
77 |
count = size; |
78 |
} |
79 |
|
80 |
public int size() { |
81 |
return count; |
82 |
} |
83 |
|
84 |
/** |
85 |
* Circularly increment i. |
86 |
*/ |
87 |
int inc(int i) { |
88 |
return (++i == items.length)? i : 0; |
89 |
} |
90 |
|
91 |
public boolean offer(E x) { |
92 |
if (count >= items.length) |
93 |
return false; |
94 |
items[putPtr] = x; |
95 |
putPtr = inc(putPtr); |
96 |
++modCount; |
97 |
++count; |
98 |
return true; |
99 |
} |
100 |
|
101 |
public E poll() { |
102 |
if (count == 0) |
103 |
return null; |
104 |
E x = items[takePtr]; |
105 |
items[takePtr] = null; |
106 |
takePtr = inc(takePtr); |
107 |
++modCount; |
108 |
--count; |
109 |
return x; |
110 |
} |
111 |
|
112 |
public E peek() { |
113 |
return (count == 0)? null : items[takePtr]; |
114 |
} |
115 |
|
116 |
|
117 |
/** |
118 |
* Utility for remove and iterator.remove: Delete item at position |
119 |
* i by sliding over all others up through putPtr. |
120 |
*/ |
121 |
void removeAt(int i) { |
122 |
for (;;) { |
123 |
int nexti = inc(i); |
124 |
items[i] = items[nexti]; |
125 |
if (nexti != putPtr) |
126 |
i = nexti; |
127 |
else { |
128 |
items[nexti] = null; |
129 |
putPtr = i; |
130 |
++modCount; |
131 |
--count; |
132 |
return; |
133 |
} |
134 |
} |
135 |
} |
136 |
|
137 |
public boolean remove(Object x) { |
138 |
int i = takePtr; |
139 |
while (i != putPtr && !x.equals(items[i])) |
140 |
i = inc(i); |
141 |
if (i == putPtr) |
142 |
return false; |
143 |
|
144 |
removeAt(i); |
145 |
return true; |
146 |
} |
147 |
|
148 |
|
149 |
public boolean contains(Object x) { |
150 |
for (int i = takePtr; i != putPtr; i = inc(i)) |
151 |
if (x.equals(items[i])) |
152 |
return true; |
153 |
|
154 |
return false; |
155 |
} |
156 |
|
157 |
public Object[] toArray() { |
158 |
int size = count; |
159 |
E[] a = new E[size]; |
160 |
for (int k = 0, i = takePtr; i != putPtr; i = inc(i)) |
161 |
a[k++] = items[i]; |
162 |
if (a.length > size) |
163 |
a[size] = null; |
164 |
|
165 |
return a; |
166 |
} |
167 |
|
168 |
public <T> T[] toArray(T[] a) { |
169 |
int size = count; |
170 |
if (a.length < size) |
171 |
a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size); |
172 |
|
173 |
for (int k = 0, i = takePtr; i != putPtr; i = inc(i)) |
174 |
a[k++] = (T)items[i]; |
175 |
if (a.length > size) |
176 |
a[size] = null; |
177 |
|
178 |
return a; |
179 |
} |
180 |
|
181 |
public Iterator<E> iterator() { |
182 |
return new Itr(); |
183 |
} |
184 |
|
185 |
private class Itr<E> implements Iterator<E> { |
186 |
/** |
187 |
* Index of element to be returned by next, |
188 |
* or a negative number if no such. |
189 |
*/ |
190 |
int cursor; |
191 |
|
192 |
/** |
193 |
* Index of element returned by most recent call to next. |
194 |
* Reset to -1 if this element is deleted by a call to remove. |
195 |
*/ |
196 |
int lastRet; |
197 |
|
198 |
/** |
199 |
* The modCount value that the iterator believes that the |
200 |
* queue should have. If this expectation is violated, the |
201 |
* iterator has detected concurrent modification. |
202 |
*/ |
203 |
int expectedModCount; |
204 |
|
205 |
Itr() { |
206 |
expectedModCount = modCount; |
207 |
lastRet = -1; |
208 |
cursor = (count > 0)? takePtr : -1; |
209 |
} |
210 |
|
211 |
public boolean hasNext() { |
212 |
return cursor >= 0; |
213 |
} |
214 |
|
215 |
public E next() { |
216 |
if (expectedModCount != modCount) |
217 |
throw new ConcurrentModificationException(); |
218 |
if (cursor < 0) |
219 |
throw new NoSuchElementException(); |
220 |
lastRet = cursor; |
221 |
cursor = inc(cursor); |
222 |
if (cursor == putPtr) |
223 |
cursor = -1; |
224 |
return (E)items[lastRet]; |
225 |
} |
226 |
|
227 |
public void remove() { |
228 |
int i = lastRet; |
229 |
if (i == -1) |
230 |
throw new IllegalStateException(); |
231 |
lastRet = -1; |
232 |
if (expectedModCount != modCount || cursor < 0) |
233 |
throw new ConcurrentModificationException(); |
234 |
|
235 |
cursor = i; // back up cursor |
236 |
removeAt(i); |
237 |
expectedModCount = modCount; |
238 |
} |
239 |
} |
240 |
|
241 |
|
242 |
/** |
243 |
* Save the state to a stream (that is, serialize it). |
244 |
* |
245 |
* @serialData The maximumSize is emitted (int), followed by all of |
246 |
* its elements (each an <tt>E</tt>) in the proper order. |
247 |
*/ |
248 |
private void writeObject(java.io.ObjectOutputStream s) |
249 |
throws java.io.IOException { |
250 |
|
251 |
// Write out element count, and any hidden stuff |
252 |
s.defaultWriteObject(); |
253 |
// Write out maximumSize == items length |
254 |
s.writeInt(items.length); |
255 |
|
256 |
// Write out all elements in the proper order. |
257 |
for (int i = takePtr; i != putPtr; i = inc(i)) |
258 |
s.writeObject(items[i]); |
259 |
} |
260 |
|
261 |
/** |
262 |
* Reconstitute the Queue instance from a stream (that is, |
263 |
* deserialize it). |
264 |
*/ |
265 |
private void readObject(java.io.ObjectInputStream s) |
266 |
throws java.io.IOException, ClassNotFoundException { |
267 |
// Read in size, and any hidden stuff |
268 |
s.defaultReadObject(); |
269 |
int size = count; |
270 |
|
271 |
// Read in array length and allocate array |
272 |
int arrayLength = s.readInt(); |
273 |
|
274 |
// We use deserializedItems here because "items" is final |
275 |
deserializedItems = new E[arrayLength]; |
276 |
|
277 |
// Read in all elements in the proper order into deserializedItems |
278 |
for (int i=0; i<size; i++) |
279 |
deserializedItems[i] = (E)s.readObject(); |
280 |
} |
281 |
|
282 |
/** |
283 |
* Throw away the object created with readObject, and replace it |
284 |
* with a usable ArrayBlockingQueue. |
285 |
*/ |
286 |
private Object readResolve() throws java.io.ObjectStreamException { |
287 |
E[] array = deserializedItems; |
288 |
deserializedItems = null; |
289 |
return new CircularBuffer(array.length, array, count); |
290 |
} |
291 |
} |
292 |
} |