ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.24
Committed: Sun Oct 5 23:00:18 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.23: +66 -0 lines
Log Message:
added drainTo; clarified various exception specs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class implements all of the <em>optional</em> methods
32 * of the {@link Collection} and {@link Iterator} interfaces.
33 *
34 * @since 1.5
35 * @author Doug Lea
36 *
37 **/
38 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -6903933977591709194L;
41
42 /*
43 * A variant of the "two lock queue" algorithm. The putLock gates
44 * entry to put (and offer), and has an associated condition for
45 * waiting puts. Similarly for the takeLock. The "count" field
46 * that they both rely on is maintained as an atomic to avoid
47 * needing to get both locks in most cases. Also, to minimize need
48 * for puts to get takeLock and vice-versa, cascading notifies are
49 * used. When a put notices that it has enabled at least one take,
50 * it signals taker. That taker in turn signals others if more
51 * items have been entered since the signal. And symmetrically for
52 * takes signalling puts. Operations such as remove(Object) and
53 * iterators acquire both locks.
54 */
55
56 /**
57 * Linked list node class
58 */
59 static class Node<E> {
60 /** The item, volatile to ensure barrier separating write and read */
61 volatile E item;
62 Node<E> next;
63 Node(E x) { item = x; }
64 }
65
66 /** The capacity bound, or Integer.MAX_VALUE if none */
67 private final int capacity;
68
69 /** Current number of elements */
70 private final AtomicInteger count = new AtomicInteger(0);
71
72 /** Head of linked list */
73 private transient Node<E> head;
74
75 /** Tail of linked list */
76 private transient Node<E> last;
77
78 /** Lock held by take, poll, etc */
79 private final ReentrantLock takeLock = new ReentrantLock();
80
81 /** Wait queue for waiting takes */
82 private final Condition notEmpty = takeLock.newCondition();
83
84 /** Lock held by put, offer, etc */
85 private final ReentrantLock putLock = new ReentrantLock();
86
87 /** Wait queue for waiting puts */
88 private final Condition notFull = putLock.newCondition();
89
90 /**
91 * Signal a waiting take. Called only from put/offer (which do not
92 * otherwise ordinarily lock takeLock.)
93 */
94 private void signalNotEmpty() {
95 takeLock.lock();
96 try {
97 notEmpty.signal();
98 } finally {
99 takeLock.unlock();
100 }
101 }
102
103 /**
104 * Signal a waiting put. Called only from take/poll.
105 */
106 private void signalNotFull() {
107 putLock.lock();
108 try {
109 notFull.signal();
110 } finally {
111 putLock.unlock();
112 }
113 }
114
115 /**
116 * Create a node and link it at end of queue
117 * @param x the item
118 */
119 private void insert(E x) {
120 last = last.next = new Node<E>(x);
121 }
122
123 /**
124 * Remove a node from head of queue,
125 * @return the node
126 */
127 private E extract() {
128 Node<E> first = head.next;
129 head = first;
130 E x = (E)first.item;
131 first.item = null;
132 return x;
133 }
134
135 /**
136 * Lock to prevent both puts and takes.
137 */
138 private void fullyLock() {
139 putLock.lock();
140 takeLock.lock();
141 }
142
143 /**
144 * Unlock to allow both puts and takes.
145 */
146 private void fullyUnlock() {
147 takeLock.unlock();
148 putLock.unlock();
149 }
150
151
152 /**
153 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
154 * {@link Integer#MAX_VALUE}.
155 */
156 public LinkedBlockingQueue() {
157 this(Integer.MAX_VALUE);
158 }
159
160 /**
161 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
162 *
163 * @param capacity the capacity of this queue.
164 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
165 * than zero.
166 */
167 public LinkedBlockingQueue(int capacity) {
168 if (capacity <= 0) throw new IllegalArgumentException();
169 this.capacity = capacity;
170 last = head = new Node<E>(null);
171 }
172
173 /**
174 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
175 * {@link Integer#MAX_VALUE}, initially containing the elements of the
176 * given collection,
177 * added in traversal order of the collection's iterator.
178 * @param c the collection of elements to initially contain
179 * @throws NullPointerException if <tt>c</tt> or any element within it
180 * is <tt>null</tt>
181 */
182 public LinkedBlockingQueue(Collection<? extends E> c) {
183 this(Integer.MAX_VALUE);
184 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
185 add(it.next());
186 }
187
188
189 // this doc comment is overridden to remove the reference to collections
190 // greater in size than Integer.MAX_VALUE
191 /**
192 * Returns the number of elements in this queue.
193 *
194 * @return the number of elements in this queue.
195 */
196 public int size() {
197 return count.get();
198 }
199
200 // this doc comment is a modified copy of the inherited doc comment,
201 // without the reference to unlimited queues.
202 /**
203 * Returns the number of elements that this queue can ideally (in
204 * the absence of memory or resource constraints) accept without
205 * blocking. This is always equal to the initial capacity of this queue
206 * less the current <tt>size</tt> of this queue.
207 * <p>Note that you <em>cannot</em> always tell if
208 * an attempt to <tt>add</tt> an element will succeed by
209 * inspecting <tt>remainingCapacity</tt> because it may be the
210 * case that a waiting consumer is ready to <tt>take</tt> an
211 * element out of an otherwise full queue.
212 */
213 public int remainingCapacity() {
214 return capacity - count.get();
215 }
216
217 /**
218 * Adds the specified element to the tail of this queue, waiting if
219 * necessary for space to become available.
220 * @param o the element to add
221 * @throws InterruptedException if interrupted while waiting.
222 * @throws NullPointerException if the specified element is <tt>null</tt>.
223 */
224 public void put(E o) throws InterruptedException {
225 if (o == null) throw new NullPointerException();
226 // Note: convention in all put/take/etc is to preset
227 // local var holding count negative to indicate failure unless set.
228 int c = -1;
229 putLock.lockInterruptibly();
230 try {
231 /*
232 * Note that count is used in wait guard even though it is
233 * not protected by lock. This works because count can
234 * only decrease at this point (all other puts are shut
235 * out by lock), and we (or some other waiting put) are
236 * signalled if it ever changes from
237 * capacity. Similarly for all other uses of count in
238 * other wait guards.
239 */
240 try {
241 while (count.get() == capacity)
242 notFull.await();
243 } catch (InterruptedException ie) {
244 notFull.signal(); // propagate to a non-interrupted thread
245 throw ie;
246 }
247 insert(o);
248 c = count.getAndIncrement();
249 if (c + 1 < capacity)
250 notFull.signal();
251 } finally {
252 putLock.unlock();
253 }
254 if (c == 0)
255 signalNotEmpty();
256 }
257
258 /**
259 * Inserts the specified element at the tail of this queue, waiting if
260 * necessary up to the specified wait time for space to become available.
261 * @param o the element to add
262 * @param timeout how long to wait before giving up, in units of
263 * <tt>unit</tt>
264 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
265 * <tt>timeout</tt> parameter
266 * @return <tt>true</tt> if successful, or <tt>false</tt> if
267 * the specified waiting time elapses before space is available.
268 * @throws InterruptedException if interrupted while waiting.
269 * @throws NullPointerException if the specified element is <tt>null</tt>.
270 */
271 public boolean offer(E o, long timeout, TimeUnit unit)
272 throws InterruptedException {
273
274 if (o == null) throw new NullPointerException();
275 long nanos = unit.toNanos(timeout);
276 int c = -1;
277 putLock.lockInterruptibly();
278 try {
279 for (;;) {
280 if (count.get() < capacity) {
281 insert(o);
282 c = count.getAndIncrement();
283 if (c + 1 < capacity)
284 notFull.signal();
285 break;
286 }
287 if (nanos <= 0)
288 return false;
289 try {
290 nanos = notFull.awaitNanos(nanos);
291 } catch (InterruptedException ie) {
292 notFull.signal(); // propagate to a non-interrupted thread
293 throw ie;
294 }
295 }
296 } finally {
297 putLock.unlock();
298 }
299 if (c == 0)
300 signalNotEmpty();
301 return true;
302 }
303
304 /**
305 * Inserts the specified element at the tail of this queue if possible,
306 * returning immediately if this queue is full.
307 *
308 * @param o the element to add.
309 * @return <tt>true</tt> if it was possible to add the element to
310 * this queue, else <tt>false</tt>
311 * @throws NullPointerException if the specified element is <tt>null</tt>
312 */
313 public boolean offer(E o) {
314 if (o == null) throw new NullPointerException();
315 if (count.get() == capacity)
316 return false;
317 int c = -1;
318 putLock.lock();
319 try {
320 if (count.get() < capacity) {
321 insert(o);
322 c = count.getAndIncrement();
323 if (c + 1 < capacity)
324 notFull.signal();
325 }
326 } finally {
327 putLock.unlock();
328 }
329 if (c == 0)
330 signalNotEmpty();
331 return c >= 0;
332 }
333
334
335 public E take() throws InterruptedException {
336 E x;
337 int c = -1;
338 takeLock.lockInterruptibly();
339 try {
340 try {
341 while (count.get() == 0)
342 notEmpty.await();
343 } catch (InterruptedException ie) {
344 notEmpty.signal(); // propagate to a non-interrupted thread
345 throw ie;
346 }
347
348 x = extract();
349 c = count.getAndDecrement();
350 if (c > 1)
351 notEmpty.signal();
352 } finally {
353 takeLock.unlock();
354 }
355 if (c == capacity)
356 signalNotFull();
357 return x;
358 }
359
360 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
361 E x = null;
362 int c = -1;
363 long nanos = unit.toNanos(timeout);
364 takeLock.lockInterruptibly();
365 try {
366 for (;;) {
367 if (count.get() > 0) {
368 x = extract();
369 c = count.getAndDecrement();
370 if (c > 1)
371 notEmpty.signal();
372 break;
373 }
374 if (nanos <= 0)
375 return null;
376 try {
377 nanos = notEmpty.awaitNanos(nanos);
378 } catch (InterruptedException ie) {
379 notEmpty.signal(); // propagate to a non-interrupted thread
380 throw ie;
381 }
382 }
383 } finally {
384 takeLock.unlock();
385 }
386 if (c == capacity)
387 signalNotFull();
388 return x;
389 }
390
391 public E poll() {
392 if (count.get() == 0)
393 return null;
394 E x = null;
395 int c = -1;
396 takeLock.tryLock();
397 try {
398 if (count.get() > 0) {
399 x = extract();
400 c = count.getAndDecrement();
401 if (c > 1)
402 notEmpty.signal();
403 }
404 } finally {
405 takeLock.unlock();
406 }
407 if (c == capacity)
408 signalNotFull();
409 return x;
410 }
411
412
413 public E peek() {
414 if (count.get() == 0)
415 return null;
416 takeLock.lock();
417 try {
418 Node<E> first = head.next;
419 if (first == null)
420 return null;
421 else
422 return first.item;
423 } finally {
424 takeLock.unlock();
425 }
426 }
427
428 public boolean remove(Object o) {
429 if (o == null) return false;
430 boolean removed = false;
431 fullyLock();
432 try {
433 Node<E> trail = head;
434 Node<E> p = head.next;
435 while (p != null) {
436 if (o.equals(p.item)) {
437 removed = true;
438 break;
439 }
440 trail = p;
441 p = p.next;
442 }
443 if (removed) {
444 p.item = null;
445 trail.next = p.next;
446 if (count.getAndDecrement() == capacity)
447 notFull.signalAll();
448 }
449 } finally {
450 fullyUnlock();
451 }
452 return removed;
453 }
454
455 public Object[] toArray() {
456 fullyLock();
457 try {
458 int size = count.get();
459 Object[] a = new Object[size];
460 int k = 0;
461 for (Node<E> p = head.next; p != null; p = p.next)
462 a[k++] = p.item;
463 return a;
464 } finally {
465 fullyUnlock();
466 }
467 }
468
469 public <T> T[] toArray(T[] a) {
470 fullyLock();
471 try {
472 int size = count.get();
473 if (a.length < size)
474 a = (T[])java.lang.reflect.Array.newInstance
475 (a.getClass().getComponentType(), size);
476
477 int k = 0;
478 for (Node p = head.next; p != null; p = p.next)
479 a[k++] = (T)p.item;
480 return a;
481 } finally {
482 fullyUnlock();
483 }
484 }
485
486 public String toString() {
487 fullyLock();
488 try {
489 return super.toString();
490 } finally {
491 fullyUnlock();
492 }
493 }
494
495 public void clear() {
496 fullyLock();
497 try {
498 head.next = null;
499 if (count.getAndSet(0) == capacity)
500 notFull.signalAll();
501 } finally {
502 fullyUnlock();
503 }
504 }
505
506 public int drainTo(Collection<? super E> c) {
507 if (c == null)
508 throw new NullPointerException();
509 if (c == this)
510 throw new IllegalArgumentException();
511 Node first;
512 fullyLock();
513 try {
514 first = head.next;
515 head.next = null;
516 if (count.getAndSet(0) == capacity)
517 notFull.signalAll();
518 } finally {
519 fullyUnlock();
520 }
521 // Transfer the elements outside of locks
522 int n = 0;
523 for (Node p = first; p != null; p = p.next) {
524 c.add((E)p.item);
525 p.item = null;
526 ++n;
527 }
528 return n;
529 }
530
531 public int drainTo(Collection<? super E> c, int maxElements) {
532 if (c == null)
533 throw new NullPointerException();
534 if (c == this)
535 throw new IllegalArgumentException();
536 if (maxElements <= 0)
537 return 0;
538 fullyLock();
539 try {
540 int n = 0;
541 Node p = head.next;
542 while (p != null && n < maxElements) {
543 c.add((E)p.item);
544 p.item = null;
545 p = p.next;
546 ++n;
547 }
548 if (n != 0) {
549 head.next = p;
550 if (count.getAndAdd(-n) == capacity)
551 notFull.signalAll();
552 }
553 return n;
554 } finally {
555 fullyUnlock();
556 }
557 }
558
559
560
561 /**
562 * Returns an iterator over the elements in this queue in proper sequence.
563 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
564 * will never throw {@link java.util.ConcurrentModificationException},
565 * and guarantees to traverse elements as they existed upon
566 * construction of the iterator, and may (but is not guaranteed to)
567 * reflect any modifications subsequent to construction.
568 *
569 * @return an iterator over the elements in this queue in proper sequence.
570 */
571 public Iterator<E> iterator() {
572 return new Itr();
573 }
574
575 private class Itr implements Iterator<E> {
576 /*
577 * Basic weak-consistent iterator. At all times hold the next
578 * item to hand out so that if hasNext() reports true, we will
579 * still have it to return even if lost race with a take etc.
580 */
581 Node<E> current;
582 Node<E> lastRet;
583 E currentElement;
584
585 Itr() {
586 fullyLock();
587 try {
588 current = head.next;
589 if (current != null)
590 currentElement = current.item;
591 } finally {
592 fullyUnlock();
593 }
594 }
595
596 public boolean hasNext() {
597 return current != null;
598 }
599
600 public E next() {
601 fullyLock();
602 try {
603 if (current == null)
604 throw new NoSuchElementException();
605 E x = currentElement;
606 lastRet = current;
607 current = current.next;
608 if (current != null)
609 currentElement = current.item;
610 return x;
611 } finally {
612 fullyUnlock();
613 }
614
615 }
616
617 public void remove() {
618 if (lastRet == null)
619 throw new IllegalStateException();
620 fullyLock();
621 try {
622 Node<E> node = lastRet;
623 lastRet = null;
624 Node<E> trail = head;
625 Node<E> p = head.next;
626 while (p != null && p != node) {
627 trail = p;
628 p = p.next;
629 }
630 if (p == node) {
631 p.item = null;
632 trail.next = p.next;
633 int c = count.getAndDecrement();
634 if (c == capacity)
635 notFull.signalAll();
636 }
637 } finally {
638 fullyUnlock();
639 }
640 }
641 }
642
643 /**
644 * Save the state to a stream (that is, serialize it).
645 *
646 * @serialData The capacity is emitted (int), followed by all of
647 * its elements (each an <tt>Object</tt>) in the proper order,
648 * followed by a null
649 * @param s the stream
650 */
651 private void writeObject(java.io.ObjectOutputStream s)
652 throws java.io.IOException {
653
654 fullyLock();
655 try {
656 // Write out any hidden stuff, plus capacity
657 s.defaultWriteObject();
658
659 // Write out all elements in the proper order.
660 for (Node<E> p = head.next; p != null; p = p.next)
661 s.writeObject(p.item);
662
663 // Use trailing null as sentinel
664 s.writeObject(null);
665 } finally {
666 fullyUnlock();
667 }
668 }
669
670 /**
671 * Reconstitute this queue instance from a stream (that is,
672 * deserialize it).
673 * @param s the stream
674 */
675 private void readObject(java.io.ObjectInputStream s)
676 throws java.io.IOException, ClassNotFoundException {
677 // Read in capacity, and any hidden stuff
678 s.defaultReadObject();
679
680 count.set(0);
681 last = head = new Node<E>(null);
682
683 // Read in all elements and place in queue
684 for (;;) {
685 E item = (E)s.readObject();
686 if (item == null)
687 break;
688 add(item);
689 }
690 }
691 }