ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.21
Committed: Sat Sep 13 18:51:11 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.20: +3 -0 lines
Log Message:
Proofreading pass -- many minor adjustments

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class implements all of the <em>optional</em> methods
32 * of the {@link Collection} and {@link Iterator} interfaces.
33 *
34 * @since 1.5
35 * @author Doug Lea
36 *
37 **/
38 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -6903933977591709194L;
41
42 /*
43 * A variant of the "two lock queue" algorithm. The putLock gates
44 * entry to put (and offer), and has an associated condition for
45 * waiting puts. Similarly for the takeLock. The "count" field
46 * that they both rely on is maintained as an atomic to avoid
47 * needing to get both locks in most cases. Also, to minimize need
48 * for puts to get takeLock and vice-versa, cascading notifies are
49 * used. When a put notices that it has enabled at least one take,
50 * it signals taker. That taker in turn signals others if more
51 * items have been entered since the signal. And symmetrically for
52 * takes signalling puts. Operations such as remove(Object) and
53 * iterators acquire both locks.
54 */
55
56 /**
57 * Linked list node class
58 */
59 static class Node<E> {
60 /** The item, volatile to ensure barrier separating write and read */
61 volatile E item;
62 Node<E> next;
63 Node(E x) { item = x; }
64 }
65
66 /** The capacity bound, or Integer.MAX_VALUE if none */
67 private final int capacity;
68
69 /** Current number of elements */
70 private final AtomicInteger count = new AtomicInteger(0);
71
72 /** Head of linked list */
73 private transient Node<E> head;
74
75 /** Tail of linked list */
76 private transient Node<E> last;
77
78 /** Lock held by take, poll, etc */
79 private final ReentrantLock takeLock = new ReentrantLock();
80
81 /** Wait queue for waiting takes */
82 private final Condition notEmpty = takeLock.newCondition();
83
84 /** Lock held by put, offer, etc */
85 private final ReentrantLock putLock = new ReentrantLock();
86
87 /** Wait queue for waiting puts */
88 private final Condition notFull = putLock.newCondition();
89
90 /**
91 * Signal a waiting take. Called only from put/offer (which do not
92 * otherwise ordinarily lock takeLock.)
93 */
94 private void signalNotEmpty() {
95 takeLock.lock();
96 try {
97 notEmpty.signal();
98 } finally {
99 takeLock.unlock();
100 }
101 }
102
103 /**
104 * Signal a waiting put. Called only from take/poll.
105 */
106 private void signalNotFull() {
107 putLock.lock();
108 try {
109 notFull.signal();
110 } finally {
111 putLock.unlock();
112 }
113 }
114
115 /**
116 * Create a node and link it at end of queue
117 * @param x the item
118 */
119 private void insert(E x) {
120 last = last.next = new Node<E>(x);
121 }
122
123 /**
124 * Remove a node from head of queue,
125 * @return the node
126 */
127 private E extract() {
128 Node<E> first = head.next;
129 head = first;
130 E x = (E)first.item;
131 first.item = null;
132 return x;
133 }
134
135 /**
136 * Lock to prevent both puts and takes.
137 */
138 private void fullyLock() {
139 putLock.lock();
140 takeLock.lock();
141 }
142
143 /**
144 * Unlock to allow both puts and takes.
145 */
146 private void fullyUnlock() {
147 takeLock.unlock();
148 putLock.unlock();
149 }
150
151
152 /**
153 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
154 * {@link Integer#MAX_VALUE}.
155 */
156 public LinkedBlockingQueue() {
157 this(Integer.MAX_VALUE);
158 }
159
160 /**
161 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
162 *
163 * @param capacity the capacity of this queue.
164 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
165 * than zero.
166 */
167 public LinkedBlockingQueue(int capacity) {
168 if (capacity <= 0) throw new IllegalArgumentException();
169 this.capacity = capacity;
170 last = head = new Node<E>(null);
171 }
172
173 /**
174 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
175 * {@link Integer#MAX_VALUE}, initially containing the elements of the
176 * given collection,
177 * added in traversal order of the collection's iterator.
178 * @param c the collection of elements to initially contain
179 * @throws NullPointerException if <tt>c</tt> or any element within it
180 * is <tt>null</tt>
181 */
182 public LinkedBlockingQueue(Collection<? extends E> c) {
183 this(Integer.MAX_VALUE);
184 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
185 add(it.next());
186 }
187
188
189 // this doc comment is overridden to remove the reference to collections
190 // greater in size than Integer.MAX_VALUE
191 /**
192 * Returns the number of elements in this queue.
193 *
194 * @return the number of elements in this queue.
195 */
196 public int size() {
197 return count.get();
198 }
199
200 // this doc comment is a modified copy of the inherited doc comment,
201 // without the reference to unlimited queues.
202 /**
203 * Returns the number of elements that this queue can ideally (in
204 * the absence of memory or resource constraints) accept without
205 * blocking. This is always equal to the initial capacity of this queue
206 * less the current <tt>size</tt> of this queue.
207 * <p>Note that you <em>cannot</em> always tell if
208 * an attempt to <tt>add</tt> an element will succeed by
209 * inspecting <tt>remainingCapacity</tt> because it may be the
210 * case that a waiting consumer is ready to <tt>take</tt> an
211 * element out of an otherwise full queue.
212 */
213 public int remainingCapacity() {
214 return capacity - count.get();
215 }
216
217 public void put(E o) throws InterruptedException {
218 if (o == null) throw new NullPointerException();
219 // Note: convention in all put/take/etc is to preset
220 // local var holding count negative to indicate failure unless set.
221 int c = -1;
222 putLock.lockInterruptibly();
223 try {
224 /*
225 * Note that count is used in wait guard even though it is
226 * not protected by lock. This works because count can
227 * only decrease at this point (all other puts are shut
228 * out by lock), and we (or some other waiting put) are
229 * signalled if it ever changes from
230 * capacity. Similarly for all other uses of count in
231 * other wait guards.
232 */
233 try {
234 while (count.get() == capacity)
235 notFull.await();
236 } catch (InterruptedException ie) {
237 notFull.signal(); // propagate to a non-interrupted thread
238 throw ie;
239 }
240 insert(o);
241 c = count.getAndIncrement();
242 if (c + 1 < capacity)
243 notFull.signal();
244 } finally {
245 putLock.unlock();
246 }
247 if (c == 0)
248 signalNotEmpty();
249 }
250
251 public boolean offer(E o, long timeout, TimeUnit unit)
252 throws InterruptedException {
253
254 if (o == null) throw new NullPointerException();
255 long nanos = unit.toNanos(timeout);
256 int c = -1;
257 putLock.lockInterruptibly();
258 try {
259 for (;;) {
260 if (count.get() < capacity) {
261 insert(o);
262 c = count.getAndIncrement();
263 if (c + 1 < capacity)
264 notFull.signal();
265 break;
266 }
267 if (nanos <= 0)
268 return false;
269 try {
270 nanos = notFull.awaitNanos(nanos);
271 } catch (InterruptedException ie) {
272 notFull.signal(); // propagate to a non-interrupted thread
273 throw ie;
274 }
275 }
276 } finally {
277 putLock.unlock();
278 }
279 if (c == 0)
280 signalNotEmpty();
281 return true;
282 }
283
284 /**
285 * Adds the specified element to the tail of this queue if possible,
286 * returning immediately if this queue is full.
287 *
288 * @param o the element to add.
289 * @return <tt>true</tt> if it was possible to add the element to
290 * this queue, else <tt>false</tt>
291 * @throws NullPointerException if the specified element is <tt>null</tt>
292 */
293 public boolean offer(E o) {
294 if (o == null) throw new NullPointerException();
295 if (count.get() == capacity)
296 return false;
297 int c = -1;
298 putLock.lock();
299 try {
300 if (count.get() < capacity) {
301 insert(o);
302 c = count.getAndIncrement();
303 if (c + 1 < capacity)
304 notFull.signal();
305 }
306 } finally {
307 putLock.unlock();
308 }
309 if (c == 0)
310 signalNotEmpty();
311 return c >= 0;
312 }
313
314
315 public E take() throws InterruptedException {
316 E x;
317 int c = -1;
318 takeLock.lockInterruptibly();
319 try {
320 try {
321 while (count.get() == 0)
322 notEmpty.await();
323 } catch (InterruptedException ie) {
324 notEmpty.signal(); // propagate to a non-interrupted thread
325 throw ie;
326 }
327
328 x = extract();
329 c = count.getAndDecrement();
330 if (c > 1)
331 notEmpty.signal();
332 } finally {
333 takeLock.unlock();
334 }
335 if (c == capacity)
336 signalNotFull();
337 return x;
338 }
339
340 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
341 E x = null;
342 int c = -1;
343 long nanos = unit.toNanos(timeout);
344 takeLock.lockInterruptibly();
345 try {
346 for (;;) {
347 if (count.get() > 0) {
348 x = extract();
349 c = count.getAndDecrement();
350 if (c > 1)
351 notEmpty.signal();
352 break;
353 }
354 if (nanos <= 0)
355 return null;
356 try {
357 nanos = notEmpty.awaitNanos(nanos);
358 } catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to a non-interrupted thread
360 throw ie;
361 }
362 }
363 } finally {
364 takeLock.unlock();
365 }
366 if (c == capacity)
367 signalNotFull();
368 return x;
369 }
370
371 public E poll() {
372 if (count.get() == 0)
373 return null;
374 E x = null;
375 int c = -1;
376 takeLock.tryLock();
377 try {
378 if (count.get() > 0) {
379 x = extract();
380 c = count.getAndDecrement();
381 if (c > 1)
382 notEmpty.signal();
383 }
384 } finally {
385 takeLock.unlock();
386 }
387 if (c == capacity)
388 signalNotFull();
389 return x;
390 }
391
392
393 public E peek() {
394 if (count.get() == 0)
395 return null;
396 takeLock.lock();
397 try {
398 Node<E> first = head.next;
399 if (first == null)
400 return null;
401 else
402 return first.item;
403 } finally {
404 takeLock.unlock();
405 }
406 }
407
408 public boolean remove(Object o) {
409 if (o == null) return false;
410 boolean removed = false;
411 fullyLock();
412 try {
413 Node<E> trail = head;
414 Node<E> p = head.next;
415 while (p != null) {
416 if (o.equals(p.item)) {
417 removed = true;
418 break;
419 }
420 trail = p;
421 p = p.next;
422 }
423 if (removed) {
424 p.item = null;
425 trail.next = p.next;
426 if (count.getAndDecrement() == capacity)
427 notFull.signalAll();
428 }
429 } finally {
430 fullyUnlock();
431 }
432 return removed;
433 }
434
435 public Object[] toArray() {
436 fullyLock();
437 try {
438 int size = count.get();
439 Object[] a = new Object[size];
440 int k = 0;
441 for (Node<E> p = head.next; p != null; p = p.next)
442 a[k++] = p.item;
443 return a;
444 } finally {
445 fullyUnlock();
446 }
447 }
448
449 public <T> T[] toArray(T[] a) {
450 fullyLock();
451 try {
452 int size = count.get();
453 if (a.length < size)
454 a = (T[])java.lang.reflect.Array.newInstance
455 (a.getClass().getComponentType(), size);
456
457 int k = 0;
458 for (Node p = head.next; p != null; p = p.next)
459 a[k++] = (T)p.item;
460 return a;
461 } finally {
462 fullyUnlock();
463 }
464 }
465
466 public String toString() {
467 fullyLock();
468 try {
469 return super.toString();
470 } finally {
471 fullyUnlock();
472 }
473 }
474
475 /**
476 * Returns an iterator over the elements in this queue in proper sequence.
477 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
478 * will never throw {@link java.util.ConcurrentModificationException},
479 * and guarantees to traverse elements as they existed upon
480 * construction of the iterator, and may (but is not guaranteed to)
481 * reflect any modifications subsequent to construction.
482 *
483 * @return an iterator over the elements in this queue in proper sequence.
484 */
485 public Iterator<E> iterator() {
486 return new Itr();
487 }
488
489 private class Itr implements Iterator<E> {
490 /*
491 * Basic weak-consistent iterator. At all times hold the next
492 * item to hand out so that if hasNext() reports true, we will
493 * still have it to return even if lost race with a take etc.
494 */
495 Node<E> current;
496 Node<E> lastRet;
497 E currentElement;
498
499 Itr() {
500 fullyLock();
501 try {
502 current = head.next;
503 if (current != null)
504 currentElement = current.item;
505 } finally {
506 fullyUnlock();
507 }
508 }
509
510 public boolean hasNext() {
511 return current != null;
512 }
513
514 public E next() {
515 fullyLock();
516 try {
517 if (current == null)
518 throw new NoSuchElementException();
519 E x = currentElement;
520 lastRet = current;
521 current = current.next;
522 if (current != null)
523 currentElement = current.item;
524 return x;
525 } finally {
526 fullyUnlock();
527 }
528
529 }
530
531 public void remove() {
532 if (lastRet == null)
533 throw new IllegalStateException();
534 fullyLock();
535 try {
536 Node<E> node = lastRet;
537 lastRet = null;
538 Node<E> trail = head;
539 Node<E> p = head.next;
540 while (p != null && p != node) {
541 trail = p;
542 p = p.next;
543 }
544 if (p == node) {
545 p.item = null;
546 trail.next = p.next;
547 int c = count.getAndDecrement();
548 if (c == capacity)
549 notFull.signalAll();
550 }
551 } finally {
552 fullyUnlock();
553 }
554 }
555 }
556
557 /**
558 * Save the state to a stream (that is, serialize it).
559 *
560 * @serialData The capacity is emitted (int), followed by all of
561 * its elements (each an <tt>Object</tt>) in the proper order,
562 * followed by a null
563 * @param s the stream
564 */
565 private void writeObject(java.io.ObjectOutputStream s)
566 throws java.io.IOException {
567
568 fullyLock();
569 try {
570 // Write out any hidden stuff, plus capacity
571 s.defaultWriteObject();
572
573 // Write out all elements in the proper order.
574 for (Node<E> p = head.next; p != null; p = p.next)
575 s.writeObject(p.item);
576
577 // Use trailing null as sentinel
578 s.writeObject(null);
579 } finally {
580 fullyUnlock();
581 }
582 }
583
584 /**
585 * Reconstitute this queue instance from a stream (that is,
586 * deserialize it).
587 * @param s the stream
588 */
589 private void readObject(java.io.ObjectInputStream s)
590 throws java.io.IOException, ClassNotFoundException {
591 // Read in capacity, and any hidden stuff
592 s.defaultReadObject();
593
594 count.set(0);
595 last = head = new Node<E>(null);
596
597 // Read in all elements and place in queue
598 for (;;) {
599 E item = (E)s.readObject();
600 if (item == null)
601 break;
602 add(item);
603 }
604 }
605 }