10 |
|
import java.util.*; |
11 |
|
|
12 |
|
/** |
13 |
< |
* An optionally-bounded {@link BlockingQueue blocking queue} based on |
13 |
> |
* An optionally-bounded {@link BlockingQueue blocking queue} based on |
14 |
|
* linked nodes. |
15 |
|
* This queue orders elements FIFO (first-in-first-out). |
16 |
< |
* The <em>head</em> of the queue is that element that has been on the |
16 |
> |
* The <em>head</em> of the queue is that element that has been on the |
17 |
|
* queue the longest time. |
18 |
|
* The <em>tail</em> of the queue is that element that has been on the |
19 |
|
* queue the shortest time. |
20 |
|
* Linked queues typically have higher throughput than array-based queues but |
21 |
|
* less predictable performance in most concurrent applications. |
22 |
< |
* |
22 |
> |
* |
23 |
|
* <p> The optional capacity bound constructor argument serves as a |
24 |
|
* way to prevent excessive queue expansion. The capacity, if unspecified, |
25 |
|
* is equal to {@link Integer#MAX_VALUE}. Linked nodes are |
28 |
|
* |
29 |
|
* @since 1.5 |
30 |
|
* @author Doug Lea |
31 |
< |
* |
31 |
> |
* |
32 |
|
**/ |
33 |
|
public class LinkedBlockingQueue<E> extends AbstractQueue<E> |
34 |
|
implements BlockingQueue<E>, java.io.Serializable { |
43 |
|
* used. When a put notices that it has enabled at least one take, |
44 |
|
* it signals taker. That taker in turn signals others if more |
45 |
|
* items have been entered since the signal. And symmetrically for |
46 |
< |
* takes signalling puts. Operations such as remove(Object) and |
46 |
> |
* takes signalling puts. Operations such as remove(Object) and |
47 |
|
* iterators acquire both locks. |
48 |
|
*/ |
49 |
|
|
129 |
|
} |
130 |
|
|
131 |
|
/** |
132 |
< |
* Lock to prevent both puts and takes. |
132 |
> |
* Lock to prevent both puts and takes. |
133 |
|
*/ |
134 |
|
private void fullyLock() { |
135 |
|
putLock.lock(); |
137 |
|
} |
138 |
|
|
139 |
|
/** |
140 |
< |
* Unlock to allow both puts and takes. |
140 |
> |
* Unlock to allow both puts and takes. |
141 |
|
*/ |
142 |
|
private void fullyUnlock() { |
143 |
|
takeLock.unlock(); |
154 |
|
} |
155 |
|
|
156 |
|
/** |
157 |
< |
* Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity |
157 |
> |
* Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity |
158 |
|
* @param capacity the capacity of this queue. |
159 |
|
* @throws IllegalArgumentException if <tt>capacity</tt> is not greater |
160 |
|
* than zero. |
167 |
|
|
168 |
|
/** |
169 |
|
* Create a <tt>LinkedBlockingQueue</tt> with a capacity of |
170 |
< |
* {@link Integer#MAX_VALUE}, initially holding the elements of the |
171 |
< |
* given collection, |
170 |
> |
* {@link Integer#MAX_VALUE}, initially holding the elements of the |
171 |
> |
* given collection, |
172 |
|
* added in traversal order of the collection's iterator. |
173 |
|
* @param c the collection of elements to initially contain |
174 |
|
* @throws NullPointerException if <tt>c</tt> or any element within it |
176 |
|
*/ |
177 |
|
public LinkedBlockingQueue(Collection<? extends E> c) { |
178 |
|
this(Integer.MAX_VALUE); |
179 |
< |
for (Iterator it = c.iterator(); it.hasNext();) |
180 |
< |
add((E)it.next()); |
179 |
> |
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) |
180 |
> |
add(it.next()); |
181 |
|
} |
182 |
|
|
183 |
|
|
202 |
|
|
203 |
|
// this doc comment is overridden to remove the reference to collections |
204 |
|
// greater in size than Integer.MAX_VALUE |
205 |
< |
/** |
206 |
< |
* Return the number of elements in this collection. |
205 |
> |
/** |
206 |
> |
* Return the number of elements in this collection. |
207 |
|
*/ |
208 |
|
public int size() { |
209 |
|
return count.get(); |
211 |
|
|
212 |
|
// this doc comment is a modified copy of the inherited doc comment, |
213 |
|
// without the reference to unlimited queues. |
214 |
< |
/** |
214 |
> |
/** |
215 |
|
* Return the number of elements that this queue can ideally (in |
216 |
|
* the absence of memory or resource constraints) accept without |
217 |
|
* blocking. This is always equal to the initial capacity of this queue |
227 |
|
} |
228 |
|
|
229 |
|
/** |
230 |
< |
* Add the specified element to the tail of this queue, waiting if |
230 |
> |
* Add the specified element to the tail of this queue, waiting if |
231 |
|
* necessary for space to become available. |
232 |
|
* @throws NullPointerException {@inheritDoc} |
233 |
|
*/ |
235 |
|
if (x == null) throw new NullPointerException(); |
236 |
|
// Note: convention in all put/take/etc is to preset |
237 |
|
// local var holding count negative to indicate failure unless set. |
238 |
< |
int c = -1; |
238 |
> |
int c = -1; |
239 |
|
putLock.lockInterruptibly(); |
240 |
|
try { |
241 |
|
/* |
248 |
|
* other wait guards. |
249 |
|
*/ |
250 |
|
try { |
251 |
< |
while (count.get() == capacity) |
251 |
> |
while (count.get() == capacity) |
252 |
|
notFull.await(); |
253 |
|
} |
254 |
|
catch (InterruptedException ie) { |
263 |
|
finally { |
264 |
|
putLock.unlock(); |
265 |
|
} |
266 |
< |
if (c == 0) |
266 |
> |
if (c == 0) |
267 |
|
signalNotEmpty(); |
268 |
|
} |
269 |
|
|
270 |
|
/** |
271 |
< |
* Add the specified element to the tail of this queue, waiting if |
271 |
> |
* Add the specified element to the tail of this queue, waiting if |
272 |
|
* necessary up to the specified wait time for space to become available. |
273 |
|
* @throws NullPointerException {@inheritDoc} |
274 |
|
*/ |
275 |
< |
public boolean offer(E x, long timeout, TimeUnit unit) |
275 |
> |
public boolean offer(E x, long timeout, TimeUnit unit) |
276 |
|
throws InterruptedException { |
277 |
< |
|
277 |
> |
|
278 |
|
if (x == null) throw new NullPointerException(); |
279 |
|
long nanos = unit.toNanos(timeout); |
280 |
|
int c = -1; |
302 |
|
finally { |
303 |
|
putLock.unlock(); |
304 |
|
} |
305 |
< |
if (c == 0) |
305 |
> |
if (c == 0) |
306 |
|
signalNotEmpty(); |
307 |
|
return true; |
308 |
|
} |
309 |
|
|
310 |
< |
/** |
310 |
> |
/** |
311 |
|
* Add the specified element to the tail of this queue if possible, |
312 |
|
* returning immediately if this queue is full. |
313 |
|
* |
317 |
|
if (x == null) throw new NullPointerException(); |
318 |
|
if (count.get() == capacity) |
319 |
|
return false; |
320 |
< |
int c = -1; |
320 |
> |
int c = -1; |
321 |
|
putLock.lock(); |
322 |
|
try { |
323 |
|
if (count.get() < capacity) { |
330 |
|
finally { |
331 |
|
putLock.unlock(); |
332 |
|
} |
333 |
< |
if (c == 0) |
333 |
> |
if (c == 0) |
334 |
|
signalNotEmpty(); |
335 |
|
return c >= 0; |
336 |
|
} |
342 |
|
takeLock.lockInterruptibly(); |
343 |
|
try { |
344 |
|
try { |
345 |
< |
while (count.get() == 0) |
345 |
> |
while (count.get() == 0) |
346 |
|
notEmpty.await(); |
347 |
|
} |
348 |
|
catch (InterruptedException ie) { |
358 |
|
finally { |
359 |
|
takeLock.unlock(); |
360 |
|
} |
361 |
< |
if (c == capacity) |
361 |
> |
if (c == capacity) |
362 |
|
signalNotFull(); |
363 |
|
return x; |
364 |
|
} |
391 |
|
finally { |
392 |
|
takeLock.unlock(); |
393 |
|
} |
394 |
< |
if (c == capacity) |
394 |
> |
if (c == capacity) |
395 |
|
signalNotFull(); |
396 |
|
return x; |
397 |
|
} |
400 |
|
if (count.get() == 0) |
401 |
|
return null; |
402 |
|
E x = null; |
403 |
< |
int c = -1; |
403 |
> |
int c = -1; |
404 |
|
takeLock.tryLock(); |
405 |
|
try { |
406 |
|
if (count.get() > 0) { |
413 |
|
finally { |
414 |
|
takeLock.unlock(); |
415 |
|
} |
416 |
< |
if (c == capacity) |
416 |
> |
if (c == capacity) |
417 |
|
signalNotFull(); |
418 |
|
return x; |
419 |
|
} |
467 |
|
fullyLock(); |
468 |
|
try { |
469 |
|
int size = count.get(); |
470 |
< |
Object[] a = new Object[size]; |
470 |
> |
Object[] a = new Object[size]; |
471 |
|
int k = 0; |
472 |
< |
for (Node<E> p = head.next; p != null; p = p.next) |
472 |
> |
for (Node<E> p = head.next; p != null; p = p.next) |
473 |
|
a[k++] = p.item; |
474 |
|
return a; |
475 |
|
} |
485 |
|
if (a.length < size) |
486 |
|
a = (T[])java.lang.reflect.Array.newInstance |
487 |
|
(a.getClass().getComponentType(), size); |
488 |
< |
|
488 |
> |
|
489 |
|
int k = 0; |
490 |
< |
for (Node p = head.next; p != null; p = p.next) |
490 |
> |
for (Node p = head.next; p != null; p = p.next) |
491 |
|
a[k++] = (T)p.item; |
492 |
|
return a; |
493 |
|
} |
511 |
|
} |
512 |
|
|
513 |
|
private class Itr implements Iterator<E> { |
514 |
< |
/* |
514 |
> |
/* |
515 |
|
* Basic weak-consistent iterator. At all times hold the next |
516 |
|
* item to hand out so that if hasNext() reports true, we will |
517 |
|
* still have it to return even if lost race with a take etc. |
519 |
|
Node<E> current; |
520 |
|
Node<E> lastRet; |
521 |
|
E currentElement; |
522 |
< |
|
522 |
> |
|
523 |
|
Itr() { |
524 |
|
fullyLock(); |
525 |
|
try { |
531 |
|
fullyUnlock(); |
532 |
|
} |
533 |
|
} |
534 |
< |
|
535 |
< |
public boolean hasNext() { |
534 |
> |
|
535 |
> |
public boolean hasNext() { |
536 |
|
return current != null; |
537 |
|
} |
538 |
|
|
539 |
< |
public E next() { |
539 |
> |
public E next() { |
540 |
|
fullyLock(); |
541 |
|
try { |
542 |
|
if (current == null) |
551 |
|
finally { |
552 |
|
fullyUnlock(); |
553 |
|
} |
554 |
< |
|
554 |
> |
|
555 |
|
} |
556 |
|
|
557 |
< |
public void remove() { |
557 |
> |
public void remove() { |
558 |
|
if (lastRet == null) |
559 |
< |
throw new IllegalStateException(); |
559 |
> |
throw new IllegalStateException(); |
560 |
|
fullyLock(); |
561 |
|
try { |
562 |
|
Node<E> node = lastRet; |
592 |
|
private void writeObject(java.io.ObjectOutputStream s) |
593 |
|
throws java.io.IOException { |
594 |
|
|
595 |
< |
fullyLock(); |
595 |
> |
fullyLock(); |
596 |
|
try { |
597 |
|
// Write out any hidden stuff, plus capacity |
598 |
|
s.defaultWriteObject(); |
599 |
|
|
600 |
|
// Write out all elements in the proper order. |
601 |
< |
for (Node<E> p = head.next; p != null; p = p.next) |
601 |
> |
for (Node<E> p = head.next; p != null; p = p.next) |
602 |
|
s.writeObject(p.item); |
603 |
|
|
604 |
|
// Use trailing null as sentinel |
616 |
|
*/ |
617 |
|
private void readObject(java.io.ObjectInputStream s) |
618 |
|
throws java.io.IOException, ClassNotFoundException { |
619 |
< |
// Read in capacity, and any hidden stuff |
620 |
< |
s.defaultReadObject(); |
619 |
> |
// Read in capacity, and any hidden stuff |
620 |
> |
s.defaultReadObject(); |
621 |
|
|
622 |
|
// Read in all elements and place in queue |
623 |
|
for (;;) { |