ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.16
Committed: Wed Aug 6 18:22:09 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
CVS Tags: JSR166_CR1
Changes since 1.15: +3 -2 lines
Log Message:
Fixes to minor errors found by DocCheck

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time.
20 * Linked queues typically have higher throughput than array-based queues but
21 * less predictable performance in most concurrent applications.
22 *
23 * <p> The optional capacity bound constructor argument serves as a
24 * way to prevent excessive queue expansion. The capacity, if unspecified,
25 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 * dynamically created upon each insertion unless this would bring the
27 * queue above capacity.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 *
32 **/
33 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 * A variant of the "two lock queue" algorithm. The putLock gates
38 * entry to put (and offer), and has an associated condition for
39 * waiting puts. Similarly for the takeLock. The "count" field
40 * that they both rely on is maintained as an atomic to avoid
41 * needing to get both locks in most cases. Also, to minimize need
42 * for puts to get takeLock and vice-versa, cascading notifies are
43 * used. When a put notices that it has enabled at least one take,
44 * it signals taker. That taker in turn signals others if more
45 * items have been entered since the signal. And symmetrically for
46 * takes signalling puts. Operations such as remove(Object) and
47 * iterators acquire both locks.
48 */
49
50 /**
51 * Linked list node class
52 */
53 static class Node<E> {
54 /** The item, volatile to ensure barrier separating write and read */
55 volatile E item;
56 Node<E> next;
57 Node(E x) { item = x; }
58 }
59
60 /** The capacity bound, or Integer.MAX_VALUE if none */
61 private final int capacity;
62
63 /** Current number of elements */
64 private transient final AtomicInteger count = new AtomicInteger(0);
65
66 /** Head of linked list */
67 private transient Node<E> head;
68
69 /** Tail of linked list */
70 private transient Node<E> last;
71
72 /** Lock held by take, poll, etc */
73 private final ReentrantLock takeLock = new ReentrantLock();
74
75 /** Wait queue for waiting takes */
76 private final Condition notEmpty = takeLock.newCondition();
77
78 /** Lock held by put, offer, etc */
79 private final ReentrantLock putLock = new ReentrantLock();
80
81 /** Wait queue for waiting puts */
82 private final Condition notFull = putLock.newCondition();
83
84 /**
85 * Signal a waiting take. Called only from put/offer (which do not
86 * otherwise ordinarily lock takeLock.)
87 */
88 private void signalNotEmpty() {
89 takeLock.lock();
90 try {
91 notEmpty.signal();
92 }
93 finally {
94 takeLock.unlock();
95 }
96 }
97
98 /**
99 * Signal a waiting put. Called only from take/poll.
100 */
101 private void signalNotFull() {
102 putLock.lock();
103 try {
104 notFull.signal();
105 }
106 finally {
107 putLock.unlock();
108 }
109 }
110
111 /**
112 * Create a node and link it at end of queue
113 * @param x the item
114 */
115 private void insert(E x) {
116 last = last.next = new Node<E>(x);
117 }
118
119 /**
120 * Remove a node from head of queue,
121 * @return the node
122 */
123 private E extract() {
124 Node<E> first = head.next;
125 head = first;
126 E x = (E)first.item;
127 first.item = null;
128 return x;
129 }
130
131 /**
132 * Lock to prevent both puts and takes.
133 */
134 private void fullyLock() {
135 putLock.lock();
136 takeLock.lock();
137 }
138
139 /**
140 * Unlock to allow both puts and takes.
141 */
142 private void fullyUnlock() {
143 takeLock.unlock();
144 putLock.unlock();
145 }
146
147
148 /**
149 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
150 * {@link Integer#MAX_VALUE}.
151 */
152 public LinkedBlockingQueue() {
153 this(Integer.MAX_VALUE);
154 }
155
156 /**
157 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
158 *
159 * @param capacity the capacity of this queue.
160 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
161 * than zero.
162 */
163 public LinkedBlockingQueue(int capacity) {
164 if (capacity <= 0) throw new IllegalArgumentException();
165 this.capacity = capacity;
166 last = head = new Node<E>(null);
167 }
168
169 /**
170 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
171 * {@link Integer#MAX_VALUE}, initially containing the elements of the
172 * given collection,
173 * added in traversal order of the collection's iterator.
174 * @param c the collection of elements to initially contain
175 * @throws NullPointerException if <tt>c</tt> or any element within it
176 * is <tt>null</tt>
177 */
178 public LinkedBlockingQueue(Collection<? extends E> c) {
179 this(Integer.MAX_VALUE);
180 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
181 add(it.next());
182 }
183
184 // Have to override just to update the javadoc
185
186 /**
187 * Adds the specified element to the tail of this queue.
188 * @return <tt>true</tt> (as per the general contract of
189 * <tt>Collection.add</tt>).
190 * @throws IllegalStateException {@inheritDoc}
191 * @throws NullPointerException {@inheritDoc}
192 */
193 public boolean add(E o) {
194 return super.add(o);
195 }
196
197 /**
198 * Adds all of the elements in the specified collection to this queue.
199 * The behavior of this operation is undefined if
200 * the specified collection is modified while the operation is in
201 * progress. (This implies that the behavior of this call is undefined if
202 * the specified collection is this queue, and this queue is nonempty.)
203 * <p>
204 * This implementation iterates over the specified collection, and adds
205 * each object returned by the iterator to this queue's tail, in turn.
206 * @throws IllegalStateException {@inheritDoc}
207 * @throws NullPointerException {@inheritDoc}
208 */
209 public boolean addAll(Collection<? extends E> c) {
210 return super.addAll(c);
211 }
212
213 // this doc comment is overridden to remove the reference to collections
214 // greater in size than Integer.MAX_VALUE
215 /**
216 * Returns the number of elements in this collection.
217 */
218 public int size() {
219 return count.get();
220 }
221
222 // this doc comment is a modified copy of the inherited doc comment,
223 // without the reference to unlimited queues.
224 /**
225 * Returns the number of elements that this queue can ideally (in
226 * the absence of memory or resource constraints) accept without
227 * blocking. This is always equal to the initial capacity of this queue
228 * less the current <tt>size</tt> of this queue.
229 * <p>Note that you <em>cannot</em> always tell if
230 * an attempt to <tt>add</tt> an element will succeed by
231 * inspecting <tt>remainingCapacity</tt> because it may be the
232 * case that a waiting consumer is ready to <tt>take</tt> an
233 * element out of an otherwise full queue.
234 */
235 public int remainingCapacity() {
236 return capacity - count.get();
237 }
238
239 /**
240 * Adds the specified element to the tail of this queue, waiting if
241 * necessary for space to become available.
242 * @throws NullPointerException {@inheritDoc}
243 */
244 public void put(E o) throws InterruptedException {
245 if (o == null) throw new NullPointerException();
246 // Note: convention in all put/take/etc is to preset
247 // local var holding count negative to indicate failure unless set.
248 int c = -1;
249 putLock.lockInterruptibly();
250 try {
251 /*
252 * Note that count is used in wait guard even though it is
253 * not protected by lock. This works because count can
254 * only decrease at this point (all other puts are shut
255 * out by lock), and we (or some other waiting put) are
256 * signalled if it ever changes from
257 * capacity. Similarly for all other uses of count in
258 * other wait guards.
259 */
260 try {
261 while (count.get() == capacity)
262 notFull.await();
263 }
264 catch (InterruptedException ie) {
265 notFull.signal(); // propagate to a non-interrupted thread
266 throw ie;
267 }
268 insert(o);
269 c = count.getAndIncrement();
270 if (c + 1 < capacity)
271 notFull.signal();
272 }
273 finally {
274 putLock.unlock();
275 }
276 if (c == 0)
277 signalNotEmpty();
278 }
279
280 /**
281 * Adds the specified element to the tail of this queue, waiting if
282 * necessary up to the specified wait time for space to become available.
283 * @throws NullPointerException {@inheritDoc}
284 */
285 public boolean offer(E o, long timeout, TimeUnit unit)
286 throws InterruptedException {
287
288 if (o == null) throw new NullPointerException();
289 long nanos = unit.toNanos(timeout);
290 int c = -1;
291 putLock.lockInterruptibly();
292 try {
293 for (;;) {
294 if (count.get() < capacity) {
295 insert(o);
296 c = count.getAndIncrement();
297 if (c + 1 < capacity)
298 notFull.signal();
299 break;
300 }
301 if (nanos <= 0)
302 return false;
303 try {
304 nanos = notFull.awaitNanos(nanos);
305 }
306 catch (InterruptedException ie) {
307 notFull.signal(); // propagate to a non-interrupted thread
308 throw ie;
309 }
310 }
311 }
312 finally {
313 putLock.unlock();
314 }
315 if (c == 0)
316 signalNotEmpty();
317 return true;
318 }
319
320 /**
321 * Adds the specified element to the tail of this queue if possible,
322 * returning immediately if this queue is full.
323 *
324 * @throws NullPointerException {@inheritDoc}
325 */
326 public boolean offer(E o) {
327 if (o == null) throw new NullPointerException();
328 if (count.get() == capacity)
329 return false;
330 int c = -1;
331 putLock.lock();
332 try {
333 if (count.get() < capacity) {
334 insert(o);
335 c = count.getAndIncrement();
336 if (c + 1 < capacity)
337 notFull.signal();
338 }
339 }
340 finally {
341 putLock.unlock();
342 }
343 if (c == 0)
344 signalNotEmpty();
345 return c >= 0;
346 }
347
348
349 public E take() throws InterruptedException {
350 E x;
351 int c = -1;
352 takeLock.lockInterruptibly();
353 try {
354 try {
355 while (count.get() == 0)
356 notEmpty.await();
357 }
358 catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to a non-interrupted thread
360 throw ie;
361 }
362
363 x = extract();
364 c = count.getAndDecrement();
365 if (c > 1)
366 notEmpty.signal();
367 }
368 finally {
369 takeLock.unlock();
370 }
371 if (c == capacity)
372 signalNotFull();
373 return x;
374 }
375
376 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
377 E x = null;
378 int c = -1;
379 long nanos = unit.toNanos(timeout);
380 takeLock.lockInterruptibly();
381 try {
382 for (;;) {
383 if (count.get() > 0) {
384 x = extract();
385 c = count.getAndDecrement();
386 if (c > 1)
387 notEmpty.signal();
388 break;
389 }
390 if (nanos <= 0)
391 return null;
392 try {
393 nanos = notEmpty.awaitNanos(nanos);
394 }
395 catch (InterruptedException ie) {
396 notEmpty.signal(); // propagate to a non-interrupted thread
397 throw ie;
398 }
399 }
400 }
401 finally {
402 takeLock.unlock();
403 }
404 if (c == capacity)
405 signalNotFull();
406 return x;
407 }
408
409 public E poll() {
410 if (count.get() == 0)
411 return null;
412 E x = null;
413 int c = -1;
414 takeLock.tryLock();
415 try {
416 if (count.get() > 0) {
417 x = extract();
418 c = count.getAndDecrement();
419 if (c > 1)
420 notEmpty.signal();
421 }
422 }
423 finally {
424 takeLock.unlock();
425 }
426 if (c == capacity)
427 signalNotFull();
428 return x;
429 }
430
431
432 public E peek() {
433 if (count.get() == 0)
434 return null;
435 takeLock.lock();
436 try {
437 Node<E> first = head.next;
438 if (first == null)
439 return null;
440 else
441 return first.item;
442 }
443 finally {
444 takeLock.unlock();
445 }
446 }
447
448 /**
449 * Removes a single instance of the specified element from this
450 * queue, if it is present. More formally,
451 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
452 * o.equals(e))</tt>, if the queue contains one or more such
453 * elements. Returns <tt>true</tt> if the queue contained the
454 * specified element (or equivalently, if the queue changed as a
455 * result of the call).
456 *
457 * <p>This implementation iterates over the queue looking for the
458 * specified element. If it finds the element, it removes the element
459 * from the queue using the iterator's remove method.<p>
460 *
461 */
462 public boolean remove(Object o) {
463 if (o == null) return false;
464 boolean removed = false;
465 fullyLock();
466 try {
467 Node<E> trail = head;
468 Node<E> p = head.next;
469 while (p != null) {
470 if (o.equals(p.item)) {
471 removed = true;
472 break;
473 }
474 trail = p;
475 p = p.next;
476 }
477 if (removed) {
478 p.item = null;
479 trail.next = p.next;
480 if (count.getAndDecrement() == capacity)
481 notFull.signalAll();
482 }
483 }
484 finally {
485 fullyUnlock();
486 }
487 return removed;
488 }
489
490 public Object[] toArray() {
491 fullyLock();
492 try {
493 int size = count.get();
494 Object[] a = new Object[size];
495 int k = 0;
496 for (Node<E> p = head.next; p != null; p = p.next)
497 a[k++] = p.item;
498 return a;
499 }
500 finally {
501 fullyUnlock();
502 }
503 }
504
505 public <T> T[] toArray(T[] a) {
506 fullyLock();
507 try {
508 int size = count.get();
509 if (a.length < size)
510 a = (T[])java.lang.reflect.Array.newInstance
511 (a.getClass().getComponentType(), size);
512
513 int k = 0;
514 for (Node p = head.next; p != null; p = p.next)
515 a[k++] = (T)p.item;
516 return a;
517 }
518 finally {
519 fullyUnlock();
520 }
521 }
522
523 public String toString() {
524 fullyLock();
525 try {
526 return super.toString();
527 }
528 finally {
529 fullyUnlock();
530 }
531 }
532
533 /**
534 * Returns an iterator over the elements in this queue in proper sequence.
535 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
536 * will never throw {@link java.util.ConcurrentModificationException},
537 * and guarantees to traverse elements as they existed upon
538 * construction of the iterator, and may (but is not guaranteed to)
539 * reflect any modifications subsequent to construction.
540 *
541 * @return an iterator over the elements in this queue in proper sequence.
542 */
543 public Iterator<E> iterator() {
544 return new Itr();
545 }
546
547 private class Itr implements Iterator<E> {
548 /*
549 * Basic weak-consistent iterator. At all times hold the next
550 * item to hand out so that if hasNext() reports true, we will
551 * still have it to return even if lost race with a take etc.
552 */
553 Node<E> current;
554 Node<E> lastRet;
555 E currentElement;
556
557 Itr() {
558 fullyLock();
559 try {
560 current = head.next;
561 if (current != null)
562 currentElement = current.item;
563 }
564 finally {
565 fullyUnlock();
566 }
567 }
568
569 public boolean hasNext() {
570 return current != null;
571 }
572
573 public E next() {
574 fullyLock();
575 try {
576 if (current == null)
577 throw new NoSuchElementException();
578 E x = currentElement;
579 lastRet = current;
580 current = current.next;
581 if (current != null)
582 currentElement = current.item;
583 return x;
584 }
585 finally {
586 fullyUnlock();
587 }
588
589 }
590
591 public void remove() {
592 if (lastRet == null)
593 throw new IllegalStateException();
594 fullyLock();
595 try {
596 Node<E> node = lastRet;
597 lastRet = null;
598 Node<E> trail = head;
599 Node<E> p = head.next;
600 while (p != null && p != node) {
601 trail = p;
602 p = p.next;
603 }
604 if (p == node) {
605 p.item = null;
606 trail.next = p.next;
607 int c = count.getAndDecrement();
608 if (c == capacity)
609 notFull.signalAll();
610 }
611 }
612 finally {
613 fullyUnlock();
614 }
615 }
616 }
617
618 /**
619 * Save the state to a stream (that is, serialize it).
620 *
621 * @serialData The capacity is emitted (int), followed by all of
622 * its elements (each an <tt>Object</tt>) in the proper order,
623 * followed by a null
624 * @param s the stream
625 */
626 private void writeObject(java.io.ObjectOutputStream s)
627 throws java.io.IOException {
628
629 fullyLock();
630 try {
631 // Write out any hidden stuff, plus capacity
632 s.defaultWriteObject();
633
634 // Write out all elements in the proper order.
635 for (Node<E> p = head.next; p != null; p = p.next)
636 s.writeObject(p.item);
637
638 // Use trailing null as sentinel
639 s.writeObject(null);
640 }
641 finally {
642 fullyUnlock();
643 }
644 }
645
646 /**
647 * Reconstitute this queue instance from a stream (that is,
648 * deserialize it).
649 * @param s the stream
650 */
651 private void readObject(java.io.ObjectInputStream s)
652 throws java.io.IOException, ClassNotFoundException {
653 // Read in capacity, and any hidden stuff
654 s.defaultReadObject();
655
656 // Read in all elements and place in queue
657 for (;;) {
658 E item = (E)s.readObject();
659 if (item == null)
660 break;
661 add(item);
662 }
663 }
664 }
665
666
667
668
669