ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.29
Committed: Wed Nov 12 01:04:24 2003 UTC (20 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.28: +4 -4 lines
Log Message:
fixed typos; avoided some casts

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class implements all of the <em>optional</em> methods
32 * of the {@link Collection} and {@link Iterator} interfaces.
33 *
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 *
38 **/
39 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
40 implements BlockingQueue<E>, java.io.Serializable {
41 private static final long serialVersionUID = -6903933977591709194L;
42
43 /*
44 * A variant of the "two lock queue" algorithm. The putLock gates
45 * entry to put (and offer), and has an associated condition for
46 * waiting puts. Similarly for the takeLock. The "count" field
47 * that they both rely on is maintained as an atomic to avoid
48 * needing to get both locks in most cases. Also, to minimize need
49 * for puts to get takeLock and vice-versa, cascading notifies are
50 * used. When a put notices that it has enabled at least one take,
51 * it signals taker. That taker in turn signals others if more
52 * items have been entered since the signal. And symmetrically for
53 * takes signalling puts. Operations such as remove(Object) and
54 * iterators acquire both locks.
55 */
56
57 /**
58 * Linked list node class
59 */
60 static class Node<E> {
61 /** The item, volatile to ensure barrier separating write and read */
62 volatile E item;
63 Node<E> next;
64 Node(E x) { item = x; }
65 }
66
67 /** The capacity bound, or Integer.MAX_VALUE if none */
68 private final int capacity;
69
70 /** Current number of elements */
71 private final AtomicInteger count = new AtomicInteger(0);
72
73 /** Head of linked list */
74 private transient Node<E> head;
75
76 /** Tail of linked list */
77 private transient Node<E> last;
78
79 /** Lock held by take, poll, etc */
80 private final ReentrantLock takeLock = new ReentrantLock();
81
82 /** Wait queue for waiting takes */
83 private final ReentrantLock.ConditionObject notEmpty = takeLock.newCondition();
84
85 /** Lock held by put, offer, etc */
86 private final ReentrantLock putLock = new ReentrantLock();
87
88 /** Wait queue for waiting puts */
89 private final ReentrantLock.ConditionObject notFull = putLock.newCondition();
90
91 /**
92 * Signal a waiting take. Called only from put/offer (which do not
93 * otherwise ordinarily lock takeLock.)
94 */
95 private void signalNotEmpty() {
96 takeLock.lock();
97 try {
98 notEmpty.signal();
99 } finally {
100 takeLock.unlock();
101 }
102 }
103
104 /**
105 * Signal a waiting put. Called only from take/poll.
106 */
107 private void signalNotFull() {
108 putLock.lock();
109 try {
110 notFull.signal();
111 } finally {
112 putLock.unlock();
113 }
114 }
115
116 /**
117 * Create a node and link it at end of queue
118 * @param x the item
119 */
120 private void insert(E x) {
121 last = last.next = new Node<E>(x);
122 }
123
124 /**
125 * Remove a node from head of queue,
126 * @return the node
127 */
128 private E extract() {
129 Node<E> first = head.next;
130 head = first;
131 E x = first.item;
132 first.item = null;
133 return x;
134 }
135
136 /**
137 * Lock to prevent both puts and takes.
138 */
139 private void fullyLock() {
140 putLock.lock();
141 takeLock.lock();
142 }
143
144 /**
145 * Unlock to allow both puts and takes.
146 */
147 private void fullyUnlock() {
148 takeLock.unlock();
149 putLock.unlock();
150 }
151
152
153 /**
154 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
155 * {@link Integer#MAX_VALUE}.
156 */
157 public LinkedBlockingQueue() {
158 this(Integer.MAX_VALUE);
159 }
160
161 /**
162 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
163 *
164 * @param capacity the capacity of this queue.
165 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
166 * than zero.
167 */
168 public LinkedBlockingQueue(int capacity) {
169 if (capacity <= 0) throw new IllegalArgumentException();
170 this.capacity = capacity;
171 last = head = new Node<E>(null);
172 }
173
174 /**
175 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
176 * {@link Integer#MAX_VALUE}, initially containing the elements of the
177 * given collection,
178 * added in traversal order of the collection's iterator.
179 * @param c the collection of elements to initially contain
180 * @throws NullPointerException if <tt>c</tt> or any element within it
181 * is <tt>null</tt>
182 */
183 public LinkedBlockingQueue(Collection<? extends E> c) {
184 this(Integer.MAX_VALUE);
185 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
186 add(it.next());
187 }
188
189
190 // this doc comment is overridden to remove the reference to collections
191 // greater in size than Integer.MAX_VALUE
192 /**
193 * Returns the number of elements in this queue.
194 *
195 * @return the number of elements in this queue.
196 */
197 public int size() {
198 return count.get();
199 }
200
201 // this doc comment is a modified copy of the inherited doc comment,
202 // without the reference to unlimited queues.
203 /**
204 * Returns the number of elements that this queue can ideally (in
205 * the absence of memory or resource constraints) accept without
206 * blocking. This is always equal to the initial capacity of this queue
207 * less the current <tt>size</tt> of this queue.
208 * <p>Note that you <em>cannot</em> always tell if
209 * an attempt to <tt>add</tt> an element will succeed by
210 * inspecting <tt>remainingCapacity</tt> because it may be the
211 * case that a waiting consumer is ready to <tt>take</tt> an
212 * element out of an otherwise full queue.
213 */
214 public int remainingCapacity() {
215 return capacity - count.get();
216 }
217
218 /**
219 * Adds the specified element to the tail of this queue, waiting if
220 * necessary for space to become available.
221 * @param o the element to add
222 * @throws InterruptedException if interrupted while waiting.
223 * @throws NullPointerException if the specified element is <tt>null</tt>.
224 */
225 public void put(E o) throws InterruptedException {
226 if (o == null) throw new NullPointerException();
227 // Note: convention in all put/take/etc is to preset
228 // local var holding count negative to indicate failure unless set.
229 int c = -1;
230 putLock.lockInterruptibly();
231 try {
232 /*
233 * Note that count is used in wait guard even though it is
234 * not protected by lock. This works because count can
235 * only decrease at this point (all other puts are shut
236 * out by lock), and we (or some other waiting put) are
237 * signalled if it ever changes from
238 * capacity. Similarly for all other uses of count in
239 * other wait guards.
240 */
241 try {
242 while (count.get() == capacity)
243 notFull.await();
244 } catch (InterruptedException ie) {
245 notFull.signal(); // propagate to a non-interrupted thread
246 throw ie;
247 }
248 insert(o);
249 c = count.getAndIncrement();
250 if (c + 1 < capacity)
251 notFull.signal();
252 } finally {
253 putLock.unlock();
254 }
255 if (c == 0)
256 signalNotEmpty();
257 }
258
259 /**
260 * Inserts the specified element at the tail of this queue, waiting if
261 * necessary up to the specified wait time for space to become available.
262 * @param o the element to add
263 * @param timeout how long to wait before giving up, in units of
264 * <tt>unit</tt>
265 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
266 * <tt>timeout</tt> parameter
267 * @return <tt>true</tt> if successful, or <tt>false</tt> if
268 * the specified waiting time elapses before space is available.
269 * @throws InterruptedException if interrupted while waiting.
270 * @throws NullPointerException if the specified element is <tt>null</tt>.
271 */
272 public boolean offer(E o, long timeout, TimeUnit unit)
273 throws InterruptedException {
274
275 if (o == null) throw new NullPointerException();
276 long nanos = unit.toNanos(timeout);
277 int c = -1;
278 putLock.lockInterruptibly();
279 try {
280 for (;;) {
281 if (count.get() < capacity) {
282 insert(o);
283 c = count.getAndIncrement();
284 if (c + 1 < capacity)
285 notFull.signal();
286 break;
287 }
288 if (nanos <= 0)
289 return false;
290 try {
291 nanos = notFull.awaitNanos(nanos);
292 } catch (InterruptedException ie) {
293 notFull.signal(); // propagate to a non-interrupted thread
294 throw ie;
295 }
296 }
297 } finally {
298 putLock.unlock();
299 }
300 if (c == 0)
301 signalNotEmpty();
302 return true;
303 }
304
305 /**
306 * Inserts the specified element at the tail of this queue if possible,
307 * returning immediately if this queue is full.
308 *
309 * @param o the element to add.
310 * @return <tt>true</tt> if it was possible to add the element to
311 * this queue, else <tt>false</tt>
312 * @throws NullPointerException if the specified element is <tt>null</tt>
313 */
314 public boolean offer(E o) {
315 if (o == null) throw new NullPointerException();
316 if (count.get() == capacity)
317 return false;
318 int c = -1;
319 putLock.lock();
320 try {
321 if (count.get() < capacity) {
322 insert(o);
323 c = count.getAndIncrement();
324 if (c + 1 < capacity)
325 notFull.signal();
326 }
327 } finally {
328 putLock.unlock();
329 }
330 if (c == 0)
331 signalNotEmpty();
332 return c >= 0;
333 }
334
335
336 public E take() throws InterruptedException {
337 E x;
338 int c = -1;
339 takeLock.lockInterruptibly();
340 try {
341 try {
342 while (count.get() == 0)
343 notEmpty.await();
344 } catch (InterruptedException ie) {
345 notEmpty.signal(); // propagate to a non-interrupted thread
346 throw ie;
347 }
348
349 x = extract();
350 c = count.getAndDecrement();
351 if (c > 1)
352 notEmpty.signal();
353 } finally {
354 takeLock.unlock();
355 }
356 if (c == capacity)
357 signalNotFull();
358 return x;
359 }
360
361 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
362 E x = null;
363 int c = -1;
364 long nanos = unit.toNanos(timeout);
365 takeLock.lockInterruptibly();
366 try {
367 for (;;) {
368 if (count.get() > 0) {
369 x = extract();
370 c = count.getAndDecrement();
371 if (c > 1)
372 notEmpty.signal();
373 break;
374 }
375 if (nanos <= 0)
376 return null;
377 try {
378 nanos = notEmpty.awaitNanos(nanos);
379 } catch (InterruptedException ie) {
380 notEmpty.signal(); // propagate to a non-interrupted thread
381 throw ie;
382 }
383 }
384 } finally {
385 takeLock.unlock();
386 }
387 if (c == capacity)
388 signalNotFull();
389 return x;
390 }
391
392 public E poll() {
393 if (count.get() == 0)
394 return null;
395 E x = null;
396 int c = -1;
397 takeLock.tryLock();
398 try {
399 if (count.get() > 0) {
400 x = extract();
401 c = count.getAndDecrement();
402 if (c > 1)
403 notEmpty.signal();
404 }
405 } finally {
406 takeLock.unlock();
407 }
408 if (c == capacity)
409 signalNotFull();
410 return x;
411 }
412
413
414 public E peek() {
415 if (count.get() == 0)
416 return null;
417 takeLock.lock();
418 try {
419 Node<E> first = head.next;
420 if (first == null)
421 return null;
422 else
423 return first.item;
424 } finally {
425 takeLock.unlock();
426 }
427 }
428
429 public boolean remove(Object o) {
430 if (o == null) return false;
431 boolean removed = false;
432 fullyLock();
433 try {
434 Node<E> trail = head;
435 Node<E> p = head.next;
436 while (p != null) {
437 if (o.equals(p.item)) {
438 removed = true;
439 break;
440 }
441 trail = p;
442 p = p.next;
443 }
444 if (removed) {
445 p.item = null;
446 trail.next = p.next;
447 if (count.getAndDecrement() == capacity)
448 notFull.signalAll();
449 }
450 } finally {
451 fullyUnlock();
452 }
453 return removed;
454 }
455
456 public Object[] toArray() {
457 fullyLock();
458 try {
459 int size = count.get();
460 Object[] a = new Object[size];
461 int k = 0;
462 for (Node<E> p = head.next; p != null; p = p.next)
463 a[k++] = p.item;
464 return a;
465 } finally {
466 fullyUnlock();
467 }
468 }
469
470 public <T> T[] toArray(T[] a) {
471 fullyLock();
472 try {
473 int size = count.get();
474 if (a.length < size)
475 a = (T[])java.lang.reflect.Array.newInstance
476 (a.getClass().getComponentType(), size);
477
478 int k = 0;
479 for (Node p = head.next; p != null; p = p.next)
480 a[k++] = (T)p.item;
481 return a;
482 } finally {
483 fullyUnlock();
484 }
485 }
486
487 public String toString() {
488 fullyLock();
489 try {
490 return super.toString();
491 } finally {
492 fullyUnlock();
493 }
494 }
495
496 public void clear() {
497 fullyLock();
498 try {
499 head.next = null;
500 if (count.getAndSet(0) == capacity)
501 notFull.signalAll();
502 } finally {
503 fullyUnlock();
504 }
505 }
506
507 public int drainTo(Collection<? super E> c) {
508 if (c == null)
509 throw new NullPointerException();
510 if (c == this)
511 throw new IllegalArgumentException();
512 Node first;
513 fullyLock();
514 try {
515 first = head.next;
516 head.next = null;
517 if (count.getAndSet(0) == capacity)
518 notFull.signalAll();
519 } finally {
520 fullyUnlock();
521 }
522 // Transfer the elements outside of locks
523 int n = 0;
524 for (Node<E> p = first; p != null; p = p.next) {
525 c.add(p.item);
526 p.item = null;
527 ++n;
528 }
529 return n;
530 }
531
532 public int drainTo(Collection<? super E> c, int maxElements) {
533 if (c == null)
534 throw new NullPointerException();
535 if (c == this)
536 throw new IllegalArgumentException();
537 if (maxElements <= 0)
538 return 0;
539 fullyLock();
540 try {
541 int n = 0;
542 Node<E> p = head.next;
543 while (p != null && n < maxElements) {
544 c.add(p.item);
545 p.item = null;
546 p = p.next;
547 ++n;
548 }
549 if (n != 0) {
550 head.next = p;
551 if (count.getAndAdd(-n) == capacity)
552 notFull.signalAll();
553 }
554 return n;
555 } finally {
556 fullyUnlock();
557 }
558 }
559
560
561
562 /**
563 * Returns an iterator over the elements in this queue in proper sequence.
564 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
565 * will never throw {@link java.util.ConcurrentModificationException},
566 * and guarantees to traverse elements as they existed upon
567 * construction of the iterator, and may (but is not guaranteed to)
568 * reflect any modifications subsequent to construction.
569 *
570 * @return an iterator over the elements in this queue in proper sequence.
571 */
572 public Iterator<E> iterator() {
573 return new Itr();
574 }
575
576 private class Itr implements Iterator<E> {
577 /*
578 * Basic weak-consistent iterator. At all times hold the next
579 * item to hand out so that if hasNext() reports true, we will
580 * still have it to return even if lost race with a take etc.
581 */
582 Node<E> current;
583 Node<E> lastRet;
584 E currentElement;
585
586 Itr() {
587 fullyLock();
588 try {
589 current = head.next;
590 if (current != null)
591 currentElement = current.item;
592 } finally {
593 fullyUnlock();
594 }
595 }
596
597 public boolean hasNext() {
598 return current != null;
599 }
600
601 public E next() {
602 fullyLock();
603 try {
604 if (current == null)
605 throw new NoSuchElementException();
606 E x = currentElement;
607 lastRet = current;
608 current = current.next;
609 if (current != null)
610 currentElement = current.item;
611 return x;
612 } finally {
613 fullyUnlock();
614 }
615
616 }
617
618 public void remove() {
619 if (lastRet == null)
620 throw new IllegalStateException();
621 fullyLock();
622 try {
623 Node<E> node = lastRet;
624 lastRet = null;
625 Node<E> trail = head;
626 Node<E> p = head.next;
627 while (p != null && p != node) {
628 trail = p;
629 p = p.next;
630 }
631 if (p == node) {
632 p.item = null;
633 trail.next = p.next;
634 int c = count.getAndDecrement();
635 if (c == capacity)
636 notFull.signalAll();
637 }
638 } finally {
639 fullyUnlock();
640 }
641 }
642 }
643
644 /**
645 * Save the state to a stream (that is, serialize it).
646 *
647 * @serialData The capacity is emitted (int), followed by all of
648 * its elements (each an <tt>Object</tt>) in the proper order,
649 * followed by a null
650 * @param s the stream
651 */
652 private void writeObject(java.io.ObjectOutputStream s)
653 throws java.io.IOException {
654
655 fullyLock();
656 try {
657 // Write out any hidden stuff, plus capacity
658 s.defaultWriteObject();
659
660 // Write out all elements in the proper order.
661 for (Node<E> p = head.next; p != null; p = p.next)
662 s.writeObject(p.item);
663
664 // Use trailing null as sentinel
665 s.writeObject(null);
666 } finally {
667 fullyUnlock();
668 }
669 }
670
671 /**
672 * Reconstitute this queue instance from a stream (that is,
673 * deserialize it).
674 * @param s the stream
675 */
676 private void readObject(java.io.ObjectInputStream s)
677 throws java.io.IOException, ClassNotFoundException {
678 // Read in capacity, and any hidden stuff
679 s.defaultReadObject();
680
681 count.set(0);
682 last = head = new Node<E>(null);
683
684 // Read in all elements and place in queue
685 for (;;) {
686 E item = (E)s.readObject();
687 if (item == null)
688 break;
689 add(item);
690 }
691 }
692 }