ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.15
Committed: Wed Aug 6 11:11:49 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.14: +5 -0 lines
Log Message:
Clarify iterator semantics

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time.
20 * Linked queues typically have higher throughput than array-based queues but
21 * less predictable performance in most concurrent applications.
22 *
23 * <p> The optional capacity bound constructor argument serves as a
24 * way to prevent excessive queue expansion. The capacity, if unspecified,
25 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 * dynamically created upon each insertion unless this would bring the
27 * queue above capacity.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 *
32 **/
33 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 * A variant of the "two lock queue" algorithm. The putLock gates
38 * entry to put (and offer), and has an associated condition for
39 * waiting puts. Similarly for the takeLock. The "count" field
40 * that they both rely on is maintained as an atomic to avoid
41 * needing to get both locks in most cases. Also, to minimize need
42 * for puts to get takeLock and vice-versa, cascading notifies are
43 * used. When a put notices that it has enabled at least one take,
44 * it signals taker. That taker in turn signals others if more
45 * items have been entered since the signal. And symmetrically for
46 * takes signalling puts. Operations such as remove(Object) and
47 * iterators acquire both locks.
48 */
49
50 /**
51 * Linked list node class
52 */
53 static class Node<E> {
54 /** The item, volatile to ensure barrier separating write and read */
55 volatile E item;
56 Node<E> next;
57 Node(E x) { item = x; }
58 }
59
60 /** The capacity bound, or Integer.MAX_VALUE if none */
61 private final int capacity;
62
63 /** Current number of elements */
64 private transient final AtomicInteger count = new AtomicInteger(0);
65
66 /** Head of linked list */
67 private transient Node<E> head;
68
69 /** Tail of linked list */
70 private transient Node<E> last;
71
72 /** Lock held by take, poll, etc */
73 private final ReentrantLock takeLock = new ReentrantLock();
74
75 /** Wait queue for waiting takes */
76 private final Condition notEmpty = takeLock.newCondition();
77
78 /** Lock held by put, offer, etc */
79 private final ReentrantLock putLock = new ReentrantLock();
80
81 /** Wait queue for waiting puts */
82 private final Condition notFull = putLock.newCondition();
83
84 /**
85 * Signal a waiting take. Called only from put/offer (which do not
86 * otherwise ordinarily lock takeLock.)
87 */
88 private void signalNotEmpty() {
89 takeLock.lock();
90 try {
91 notEmpty.signal();
92 }
93 finally {
94 takeLock.unlock();
95 }
96 }
97
98 /**
99 * Signal a waiting put. Called only from take/poll.
100 */
101 private void signalNotFull() {
102 putLock.lock();
103 try {
104 notFull.signal();
105 }
106 finally {
107 putLock.unlock();
108 }
109 }
110
111 /**
112 * Create a node and link it at end of queue
113 * @param x the item
114 */
115 private void insert(E x) {
116 last = last.next = new Node<E>(x);
117 }
118
119 /**
120 * Remove a node from head of queue,
121 * @return the node
122 */
123 private E extract() {
124 Node<E> first = head.next;
125 head = first;
126 E x = (E)first.item;
127 first.item = null;
128 return x;
129 }
130
131 /**
132 * Lock to prevent both puts and takes.
133 */
134 private void fullyLock() {
135 putLock.lock();
136 takeLock.lock();
137 }
138
139 /**
140 * Unlock to allow both puts and takes.
141 */
142 private void fullyUnlock() {
143 takeLock.unlock();
144 putLock.unlock();
145 }
146
147
148 /**
149 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
150 * {@link Integer#MAX_VALUE}.
151 */
152 public LinkedBlockingQueue() {
153 this(Integer.MAX_VALUE);
154 }
155
156 /**
157 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158 * @param capacity the capacity of this queue.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160 * than zero.
161 */
162 public LinkedBlockingQueue(int capacity) {
163 if (capacity <= 0) throw new IllegalArgumentException();
164 this.capacity = capacity;
165 last = head = new Node<E>(null);
166 }
167
168 /**
169 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
170 * {@link Integer#MAX_VALUE}, initially containing the elements of the
171 * given collection,
172 * added in traversal order of the collection's iterator.
173 * @param c the collection of elements to initially contain
174 * @throws NullPointerException if <tt>c</tt> or any element within it
175 * is <tt>null</tt>
176 */
177 public LinkedBlockingQueue(Collection<? extends E> c) {
178 this(Integer.MAX_VALUE);
179 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
180 add(it.next());
181 }
182
183 // Have to override just to update the javadoc
184
185 /**
186 * Adds the specified element to the tail of this queue.
187 * @return <tt>true</tt> (as per the general contract of
188 * <tt>Collection.add</tt>).
189 * @throws IllegalStateException {@inheritDoc}
190 * @throws NullPointerException {@inheritDoc}
191 */
192 public boolean add(E o) {
193 return super.add(o);
194 }
195
196 /**
197 * Adds all of the elements in the specified collection to this queue.
198 * The behavior of this operation is undefined if
199 * the specified collection is modified while the operation is in
200 * progress. (This implies that the behavior of this call is undefined if
201 * the specified collection is this queue, and this queue is nonempty.)
202 * <p>
203 * This implementation iterates over the specified collection, and adds
204 * each object returned by the iterator to this queue's tail, in turn.
205 * @throws IllegalStateException {@inheritDoc}
206 * @throws NullPointerException {@inheritDoc}
207 */
208 public boolean addAll(Collection<? extends E> c) {
209 return super.addAll(c);
210 }
211
212 // this doc comment is overridden to remove the reference to collections
213 // greater in size than Integer.MAX_VALUE
214 /**
215 * Returns the number of elements in this collection.
216 */
217 public int size() {
218 return count.get();
219 }
220
221 // this doc comment is a modified copy of the inherited doc comment,
222 // without the reference to unlimited queues.
223 /**
224 * Returns the number of elements that this queue can ideally (in
225 * the absence of memory or resource constraints) accept without
226 * blocking. This is always equal to the initial capacity of this queue
227 * less the current <tt>size</tt> of this queue.
228 * <p>Note that you <em>cannot</em> always tell if
229 * an attempt to <tt>add</tt> an element will succeed by
230 * inspecting <tt>remainingCapacity</tt> because it may be the
231 * case that a waiting consumer is ready to <tt>take</tt> an
232 * element out of an otherwise full queue.
233 */
234 public int remainingCapacity() {
235 return capacity - count.get();
236 }
237
238 /**
239 * Adds the specified element to the tail of this queue, waiting if
240 * necessary for space to become available.
241 * @throws NullPointerException {@inheritDoc}
242 */
243 public void put(E o) throws InterruptedException {
244 if (o == null) throw new NullPointerException();
245 // Note: convention in all put/take/etc is to preset
246 // local var holding count negative to indicate failure unless set.
247 int c = -1;
248 putLock.lockInterruptibly();
249 try {
250 /*
251 * Note that count is used in wait guard even though it is
252 * not protected by lock. This works because count can
253 * only decrease at this point (all other puts are shut
254 * out by lock), and we (or some other waiting put) are
255 * signalled if it ever changes from
256 * capacity. Similarly for all other uses of count in
257 * other wait guards.
258 */
259 try {
260 while (count.get() == capacity)
261 notFull.await();
262 }
263 catch (InterruptedException ie) {
264 notFull.signal(); // propagate to a non-interrupted thread
265 throw ie;
266 }
267 insert(o);
268 c = count.getAndIncrement();
269 if (c + 1 < capacity)
270 notFull.signal();
271 }
272 finally {
273 putLock.unlock();
274 }
275 if (c == 0)
276 signalNotEmpty();
277 }
278
279 /**
280 * Adds the specified element to the tail of this queue, waiting if
281 * necessary up to the specified wait time for space to become available.
282 * @throws NullPointerException {@inheritDoc}
283 */
284 public boolean offer(E o, long timeout, TimeUnit unit)
285 throws InterruptedException {
286
287 if (o == null) throw new NullPointerException();
288 long nanos = unit.toNanos(timeout);
289 int c = -1;
290 putLock.lockInterruptibly();
291 try {
292 for (;;) {
293 if (count.get() < capacity) {
294 insert(o);
295 c = count.getAndIncrement();
296 if (c + 1 < capacity)
297 notFull.signal();
298 break;
299 }
300 if (nanos <= 0)
301 return false;
302 try {
303 nanos = notFull.awaitNanos(nanos);
304 }
305 catch (InterruptedException ie) {
306 notFull.signal(); // propagate to a non-interrupted thread
307 throw ie;
308 }
309 }
310 }
311 finally {
312 putLock.unlock();
313 }
314 if (c == 0)
315 signalNotEmpty();
316 return true;
317 }
318
319 /**
320 * Adds the specified element to the tail of this queue if possible,
321 * returning immediately if this queue is full.
322 *
323 * @throws NullPointerException {@inheritDoc}
324 */
325 public boolean offer(E o) {
326 if (o == null) throw new NullPointerException();
327 if (count.get() == capacity)
328 return false;
329 int c = -1;
330 putLock.lock();
331 try {
332 if (count.get() < capacity) {
333 insert(o);
334 c = count.getAndIncrement();
335 if (c + 1 < capacity)
336 notFull.signal();
337 }
338 }
339 finally {
340 putLock.unlock();
341 }
342 if (c == 0)
343 signalNotEmpty();
344 return c >= 0;
345 }
346
347
348 public E take() throws InterruptedException {
349 E x;
350 int c = -1;
351 takeLock.lockInterruptibly();
352 try {
353 try {
354 while (count.get() == 0)
355 notEmpty.await();
356 }
357 catch (InterruptedException ie) {
358 notEmpty.signal(); // propagate to a non-interrupted thread
359 throw ie;
360 }
361
362 x = extract();
363 c = count.getAndDecrement();
364 if (c > 1)
365 notEmpty.signal();
366 }
367 finally {
368 takeLock.unlock();
369 }
370 if (c == capacity)
371 signalNotFull();
372 return x;
373 }
374
375 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376 E x = null;
377 int c = -1;
378 long nanos = unit.toNanos(timeout);
379 takeLock.lockInterruptibly();
380 try {
381 for (;;) {
382 if (count.get() > 0) {
383 x = extract();
384 c = count.getAndDecrement();
385 if (c > 1)
386 notEmpty.signal();
387 break;
388 }
389 if (nanos <= 0)
390 return null;
391 try {
392 nanos = notEmpty.awaitNanos(nanos);
393 }
394 catch (InterruptedException ie) {
395 notEmpty.signal(); // propagate to a non-interrupted thread
396 throw ie;
397 }
398 }
399 }
400 finally {
401 takeLock.unlock();
402 }
403 if (c == capacity)
404 signalNotFull();
405 return x;
406 }
407
408 public E poll() {
409 if (count.get() == 0)
410 return null;
411 E x = null;
412 int c = -1;
413 takeLock.tryLock();
414 try {
415 if (count.get() > 0) {
416 x = extract();
417 c = count.getAndDecrement();
418 if (c > 1)
419 notEmpty.signal();
420 }
421 }
422 finally {
423 takeLock.unlock();
424 }
425 if (c == capacity)
426 signalNotFull();
427 return x;
428 }
429
430
431 public E peek() {
432 if (count.get() == 0)
433 return null;
434 takeLock.lock();
435 try {
436 Node<E> first = head.next;
437 if (first == null)
438 return null;
439 else
440 return first.item;
441 }
442 finally {
443 takeLock.unlock();
444 }
445 }
446
447 /**
448 * Removes a single instance of the specified element from this
449 * queue, if it is present. More formally,
450 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
451 * o.equals(e))</tt>, if the queue contains one or more such
452 * elements. Returns <tt>true</tt> if the queue contained the
453 * specified element (or equivalently, if the queue changed as a
454 * result of the call).
455 *
456 * <p>This implementation iterates over the queue looking for the
457 * specified element. If it finds the element, it removes the element
458 * from the queue using the iterator's remove method.<p>
459 *
460 */
461 public boolean remove(Object o) {
462 if (o == null) return false;
463 boolean removed = false;
464 fullyLock();
465 try {
466 Node<E> trail = head;
467 Node<E> p = head.next;
468 while (p != null) {
469 if (o.equals(p.item)) {
470 removed = true;
471 break;
472 }
473 trail = p;
474 p = p.next;
475 }
476 if (removed) {
477 p.item = null;
478 trail.next = p.next;
479 if (count.getAndDecrement() == capacity)
480 notFull.signalAll();
481 }
482 }
483 finally {
484 fullyUnlock();
485 }
486 return removed;
487 }
488
489 public Object[] toArray() {
490 fullyLock();
491 try {
492 int size = count.get();
493 Object[] a = new Object[size];
494 int k = 0;
495 for (Node<E> p = head.next; p != null; p = p.next)
496 a[k++] = p.item;
497 return a;
498 }
499 finally {
500 fullyUnlock();
501 }
502 }
503
504 public <T> T[] toArray(T[] a) {
505 fullyLock();
506 try {
507 int size = count.get();
508 if (a.length < size)
509 a = (T[])java.lang.reflect.Array.newInstance
510 (a.getClass().getComponentType(), size);
511
512 int k = 0;
513 for (Node p = head.next; p != null; p = p.next)
514 a[k++] = (T)p.item;
515 return a;
516 }
517 finally {
518 fullyUnlock();
519 }
520 }
521
522 public String toString() {
523 fullyLock();
524 try {
525 return super.toString();
526 }
527 finally {
528 fullyUnlock();
529 }
530 }
531
532 /**
533 * Returns an iterator over the elements in this queue in proper sequence.
534 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
535 * will never throw {@link java.util.ConcurrentModificationException},
536 * and guarantees to traverse elements as they existed upon
537 * construction of the iterator, and may (but is not guaranteed to)
538 * reflect any modifications subsequent to construction.
539 *
540 * @return an iterator over the elements in this queue in proper sequence.
541 */
542 public Iterator<E> iterator() {
543 return new Itr();
544 }
545
546 private class Itr implements Iterator<E> {
547 /*
548 * Basic weak-consistent iterator. At all times hold the next
549 * item to hand out so that if hasNext() reports true, we will
550 * still have it to return even if lost race with a take etc.
551 */
552 Node<E> current;
553 Node<E> lastRet;
554 E currentElement;
555
556 Itr() {
557 fullyLock();
558 try {
559 current = head.next;
560 if (current != null)
561 currentElement = current.item;
562 }
563 finally {
564 fullyUnlock();
565 }
566 }
567
568 public boolean hasNext() {
569 return current != null;
570 }
571
572 public E next() {
573 fullyLock();
574 try {
575 if (current == null)
576 throw new NoSuchElementException();
577 E x = currentElement;
578 lastRet = current;
579 current = current.next;
580 if (current != null)
581 currentElement = current.item;
582 return x;
583 }
584 finally {
585 fullyUnlock();
586 }
587
588 }
589
590 public void remove() {
591 if (lastRet == null)
592 throw new IllegalStateException();
593 fullyLock();
594 try {
595 Node<E> node = lastRet;
596 lastRet = null;
597 Node<E> trail = head;
598 Node<E> p = head.next;
599 while (p != null && p != node) {
600 trail = p;
601 p = p.next;
602 }
603 if (p == node) {
604 p.item = null;
605 trail.next = p.next;
606 int c = count.getAndDecrement();
607 if (c == capacity)
608 notFull.signalAll();
609 }
610 }
611 finally {
612 fullyUnlock();
613 }
614 }
615 }
616
617 /**
618 * Save the state to a stream (that is, serialize it).
619 *
620 * @serialData The capacity is emitted (int), followed by all of
621 * its elements (each an <tt>Object</tt>) in the proper order,
622 * followed by a null
623 * @param s the stream
624 */
625 private void writeObject(java.io.ObjectOutputStream s)
626 throws java.io.IOException {
627
628 fullyLock();
629 try {
630 // Write out any hidden stuff, plus capacity
631 s.defaultWriteObject();
632
633 // Write out all elements in the proper order.
634 for (Node<E> p = head.next; p != null; p = p.next)
635 s.writeObject(p.item);
636
637 // Use trailing null as sentinel
638 s.writeObject(null);
639 }
640 finally {
641 fullyUnlock();
642 }
643 }
644
645 /**
646 * Reconstitute this queue instance from a stream (that is,
647 * deserialize it).
648 * @param s the stream
649 */
650 private void readObject(java.io.ObjectInputStream s)
651 throws java.io.IOException, ClassNotFoundException {
652 // Read in capacity, and any hidden stuff
653 s.defaultReadObject();
654
655 // Read in all elements and place in queue
656 for (;;) {
657 E item = (E)s.readObject();
658 if (item == null)
659 break;
660 add(item);
661 }
662 }
663 }
664
665
666
667
668