ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.8
Committed: Mon Jul 28 04:11:54 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.7: +71 -19 lines
Log Message:
Significant doc updates:
 - inherit comments where appropriate
 - ensure runtime exception comments inherited (overriding as needed)
 - consistent descriptions
 - introduce head and tail terminology

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@link BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time.
20 * Linked queues typically have higher throughput than array-based queues but
21 * less predictable performance in most concurrent applications.
22 *
23 * <p> The optional capacity bound constructor argument serves as a
24 * way to prevent excessive queue expansion. The capacity, if unspecified,
25 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 * dynamically created upon each insertion unless this would bring the
27 * queue above capacity.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 *
32 **/
33 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 * A variant of the "two lock queue" algorithm. The putLock gates
38 * entry to put (and offer), and has an associated condition for
39 * waiting puts. Similarly for the takeLock. The "count" field
40 * that they both rely on is maintained as an atomic to avoid
41 * needing to get both locks in most cases. Also, to minimize need
42 * for puts to get takeLock and vice-versa, cascading notifies are
43 * used. When a put notices that it has enabled at least one take,
44 * it signals taker. That taker in turn signals others if more
45 * items have been entered since the signal. And symmetrically for
46 * takes signalling puts. Operations such as remove(Object) and
47 * iterators acquire both locks.
48 */
49
50 /**
51 * Linked list node class
52 */
53 static class Node<E> {
54 /** The item, volatile to ensure barrier separating write and read */
55 volatile E item;
56 Node<E> next;
57 Node(E x) { item = x; }
58 }
59
60 /** The capacity bound, or Integer.MAX_VALUE if none */
61 private final int capacity;
62
63 /** Current number of elements */
64 private transient final AtomicInteger count = new AtomicInteger(0);
65
66 /** Head of linked list */
67 private transient Node<E> head;
68
69 /** Tail of linked list */
70 private transient Node<E> last;
71
72 /** Lock held by take, poll, etc */
73 private final ReentrantLock takeLock = new ReentrantLock();
74
75 /** Wait queue for waiting takes */
76 private final Condition notEmpty = takeLock.newCondition();
77
78 /** Lock held by put, offer, etc */
79 private final ReentrantLock putLock = new ReentrantLock();
80
81 /** Wait queue for waiting puts */
82 private final Condition notFull = putLock.newCondition();
83
84 /**
85 * Signal a waiting take. Called only from put/offer (which do not
86 * otherwise ordinarily lock takeLock.)
87 */
88 private void signalNotEmpty() {
89 takeLock.lock();
90 try {
91 notEmpty.signal();
92 }
93 finally {
94 takeLock.unlock();
95 }
96 }
97
98 /**
99 * Signal a waiting put. Called only from take/poll.
100 */
101 private void signalNotFull() {
102 putLock.lock();
103 try {
104 notFull.signal();
105 }
106 finally {
107 putLock.unlock();
108 }
109 }
110
111 /**
112 * Create a node and link it at end of queue
113 * @param x the item
114 */
115 private void insert(E x) {
116 last = last.next = new Node<E>(x);
117 }
118
119 /**
120 * Remove a node from head of queue,
121 * @return the node
122 */
123 private E extract() {
124 Node<E> first = head.next;
125 head = first;
126 E x = (E)first.item;
127 first.item = null;
128 return x;
129 }
130
131 /**
132 * Lock to prevent both puts and takes.
133 */
134 private void fullyLock() {
135 putLock.lock();
136 takeLock.lock();
137 }
138
139 /**
140 * Unlock to allow both puts and takes.
141 */
142 private void fullyUnlock() {
143 takeLock.unlock();
144 putLock.unlock();
145 }
146
147
148 /**
149 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
150 * {@link Integer#MAX_VALUE}.
151 */
152 public LinkedBlockingQueue() {
153 this(Integer.MAX_VALUE);
154 }
155
156 /**
157 * Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158 * @param capacity the capacity of this queue.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160 * than zero.
161 */
162 public LinkedBlockingQueue(int capacity) {
163 if (capacity <= 0) throw new IllegalArgumentException();
164 this.capacity = capacity;
165 last = head = new Node<E>(null);
166 }
167
168 /**
169 * Create a <tt>LinkedBlockingQueue</tt> with a capacity if
170 * {@link Integer#MAX_VALUE}, initially holding the elements of the
171 * given collection,
172 * added in traversal order of the collection's iterator.
173 * @param initialElements the elements to initially contain
174 */
175 public LinkedBlockingQueue(Collection<E> initialElements) {
176 this(Integer.MAX_VALUE);
177 for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
178 add(it.next());
179 }
180
181 // this doc comment is overridden to remove the reference to collections
182 // greater in size than Integer.MAX_VALUE
183 /**
184 * Return the number of elements in this collection.
185 */
186 public int size() {
187 return count.get();
188 }
189
190 // this doc comment is a modified copy of the inherited doc comment,
191 // without the reference to unlimited queues.
192 /**
193 * Return the number of elements that this queue can ideally (in
194 * the absence of memory or resource constraints) accept without
195 * blocking. This is always equal to the initial capacity of this queue
196 * less the current <tt>size</tt> of this queue.
197 * <p>Note that you <em>cannot</em> always tell if
198 * an attempt to <tt>add</tt> an element will succeed by
199 * inspecting <tt>remainingCapacity</tt> because it may be the
200 * case that a waiting consumer is ready to <tt>take</tt> an
201 * element out of an otherwise full queue.
202 */
203 public int remainingCapacity() {
204 return capacity - count.get();
205 }
206
207 /**
208 * Add the specified element to the tail of this queue, waiting if
209 * necessary for space to become available.
210 * @throws NullPointerException {@inheritDoc}
211 */
212 public void put(E x) throws InterruptedException {
213 if (x == null) throw new NullPointerException();
214 // Note: convention in all put/take/etc is to preset
215 // local var holding count negative to indicate failure unless set.
216 int c = -1;
217 putLock.lockInterruptibly();
218 try {
219 /*
220 * Note that count is used in wait guard even though it is
221 * not protected by lock. This works because count can
222 * only decrease at this point (all other puts are shut
223 * out by lock), and we (or some other waiting put) are
224 * signalled if it ever changes from
225 * capacity. Similarly for all other uses of count in
226 * other wait guards.
227 */
228 try {
229 while (count.get() == capacity)
230 notFull.await();
231 }
232 catch (InterruptedException ie) {
233 notFull.signal(); // propagate to a non-interrupted thread
234 throw ie;
235 }
236 insert(x);
237 c = count.getAndIncrement();
238 if (c + 1 < capacity)
239 notFull.signal();
240 }
241 finally {
242 putLock.unlock();
243 }
244 if (c == 0)
245 signalNotEmpty();
246 }
247
248 /**
249 * Add the specified element to the tail of this queue, waiting if
250 * necessary up to the specified wait time for space to become available.
251 * @throws NullPointerException {@inheritDoc}
252 */
253 public boolean offer(E x, long timeout, TimeUnit unit)
254 throws InterruptedException {
255
256 if (x == null) throw new NullPointerException();
257 long nanos = unit.toNanos(timeout);
258 int c = -1;
259 putLock.lockInterruptibly();
260 try {
261 for (;;) {
262 if (count.get() < capacity) {
263 insert(x);
264 c = count.getAndIncrement();
265 if (c + 1 < capacity)
266 notFull.signal();
267 break;
268 }
269 if (nanos <= 0)
270 return false;
271 try {
272 nanos = notFull.awaitNanos(nanos);
273 }
274 catch (InterruptedException ie) {
275 notFull.signal(); // propagate to a non-interrupted thread
276 throw ie;
277 }
278 }
279 }
280 finally {
281 putLock.unlock();
282 }
283 if (c == 0)
284 signalNotEmpty();
285 return true;
286 }
287
288 /**
289 * Add the specified element to the tail of this queue if possible,
290 * returning immediately if this queue is full.
291 *
292 * @throws NullPointerException {@inheritDoc}
293 */
294 public boolean offer(E x) {
295 if (x == null) throw new NullPointerException();
296 if (count.get() == capacity)
297 return false;
298 int c = -1;
299 putLock.lock();
300 try {
301 if (count.get() < capacity) {
302 insert(x);
303 c = count.getAndIncrement();
304 if (c + 1 < capacity)
305 notFull.signal();
306 }
307 }
308 finally {
309 putLock.unlock();
310 }
311 if (c == 0)
312 signalNotEmpty();
313 return c >= 0;
314 }
315
316
317 public E take() throws InterruptedException {
318 E x;
319 int c = -1;
320 takeLock.lockInterruptibly();
321 try {
322 try {
323 while (count.get() == 0)
324 notEmpty.await();
325 }
326 catch (InterruptedException ie) {
327 notEmpty.signal(); // propagate to a non-interrupted thread
328 throw ie;
329 }
330
331 x = extract();
332 c = count.getAndDecrement();
333 if (c > 1)
334 notEmpty.signal();
335 }
336 finally {
337 takeLock.unlock();
338 }
339 if (c == capacity)
340 signalNotFull();
341 return x;
342 }
343
344 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
345 E x = null;
346 int c = -1;
347 long nanos = unit.toNanos(timeout);
348 takeLock.lockInterruptibly();
349 try {
350 for (;;) {
351 if (count.get() > 0) {
352 x = extract();
353 c = count.getAndDecrement();
354 if (c > 1)
355 notEmpty.signal();
356 break;
357 }
358 if (nanos <= 0)
359 return null;
360 try {
361 nanos = notEmpty.awaitNanos(nanos);
362 }
363 catch (InterruptedException ie) {
364 notEmpty.signal(); // propagate to a non-interrupted thread
365 throw ie;
366 }
367 }
368 }
369 finally {
370 takeLock.unlock();
371 }
372 if (c == capacity)
373 signalNotFull();
374 return x;
375 }
376
377 public E poll() {
378 if (count.get() == 0)
379 return null;
380 E x = null;
381 int c = -1;
382 takeLock.tryLock();
383 try {
384 if (count.get() > 0) {
385 x = extract();
386 c = count.getAndDecrement();
387 if (c > 1)
388 notEmpty.signal();
389 }
390 }
391 finally {
392 takeLock.unlock();
393 }
394 if (c == capacity)
395 signalNotFull();
396 return x;
397 }
398
399
400 public E peek() {
401 if (count.get() == 0)
402 return null;
403 takeLock.lock();
404 try {
405 Node<E> first = head.next;
406 if (first == null)
407 return null;
408 else
409 return first.item;
410 }
411 finally {
412 takeLock.unlock();
413 }
414 }
415
416 public boolean remove(Object x) {
417 if (x == null) return false;
418 boolean removed = false;
419 fullyLock();
420 try {
421 Node<E> trail = head;
422 Node<E> p = head.next;
423 while (p != null) {
424 if (x.equals(p.item)) {
425 removed = true;
426 break;
427 }
428 trail = p;
429 p = p.next;
430 }
431 if (removed) {
432 p.item = null;
433 trail.next = p.next;
434 if (count.getAndDecrement() == capacity)
435 notFull.signalAll();
436 }
437 }
438 finally {
439 fullyUnlock();
440 }
441 return removed;
442 }
443
444 public Object[] toArray() {
445 fullyLock();
446 try {
447 int size = count.get();
448 Object[] a = new Object[size];
449 int k = 0;
450 for (Node<E> p = head.next; p != null; p = p.next)
451 a[k++] = p.item;
452 return a;
453 }
454 finally {
455 fullyUnlock();
456 }
457 }
458
459 public <T> T[] toArray(T[] a) {
460 fullyLock();
461 try {
462 int size = count.get();
463 if (a.length < size)
464 a = (T[])java.lang.reflect.Array.newInstance
465 (a.getClass().getComponentType(), size);
466
467 int k = 0;
468 for (Node p = head.next; p != null; p = p.next)
469 a[k++] = (T)p.item;
470 return a;
471 }
472 finally {
473 fullyUnlock();
474 }
475 }
476
477 public String toString() {
478 fullyLock();
479 try {
480 return super.toString();
481 }
482 finally {
483 fullyUnlock();
484 }
485 }
486
487 public Iterator<E> iterator() {
488 return new Itr();
489 }
490
491 private class Itr implements Iterator<E> {
492 /*
493 * Basic weak-consistent iterator. At all times hold the next
494 * item to hand out so that if hasNext() reports true, we will
495 * still have it to return even if lost race with a take etc.
496 */
497 Node<E> current;
498 Node<E> lastRet;
499 E currentElement;
500
501 Itr() {
502 fullyLock();
503 try {
504 current = head.next;
505 if (current != null)
506 currentElement = current.item;
507 }
508 finally {
509 fullyUnlock();
510 }
511 }
512
513 public boolean hasNext() {
514 return current != null;
515 }
516
517 public E next() {
518 fullyLock();
519 try {
520 if (current == null)
521 throw new NoSuchElementException();
522 E x = currentElement;
523 lastRet = current;
524 current = current.next;
525 if (current != null)
526 currentElement = current.item;
527 return x;
528 }
529 finally {
530 fullyUnlock();
531 }
532
533 }
534
535 public void remove() {
536 if (lastRet == null)
537 throw new IllegalStateException();
538 fullyLock();
539 try {
540 Node<E> node = lastRet;
541 lastRet = null;
542 Node<E> trail = head;
543 Node<E> p = head.next;
544 while (p != null && p != node) {
545 trail = p;
546 p = p.next;
547 }
548 if (p == node) {
549 p.item = null;
550 trail.next = p.next;
551 int c = count.getAndDecrement();
552 if (c == capacity)
553 notFull.signalAll();
554 }
555 }
556 finally {
557 fullyUnlock();
558 }
559 }
560 }
561
562 /**
563 * Save the state to a stream (that is, serialize it).
564 *
565 * @serialData The capacity is emitted (int), followed by all of
566 * its elements (each an <tt>Object</tt>) in the proper order,
567 * followed by a null
568 * @param s the stream
569 */
570 private void writeObject(java.io.ObjectOutputStream s)
571 throws java.io.IOException {
572
573 fullyLock();
574 try {
575 // Write out any hidden stuff, plus capacity
576 s.defaultWriteObject();
577
578 // Write out all elements in the proper order.
579 for (Node<E> p = head.next; p != null; p = p.next)
580 s.writeObject(p.item);
581
582 // Use trailing null as sentinel
583 s.writeObject(null);
584 }
585 finally {
586 fullyUnlock();
587 }
588 }
589
590 /**
591 * Reconstitute this queue instance from a stream (that is,
592 * deserialize it).
593 * @param s the stream
594 */
595 private void readObject(java.io.ObjectInputStream s)
596 throws java.io.IOException, ClassNotFoundException {
597 // Read in capacity, and any hidden stuff
598 s.defaultReadObject();
599
600 // Read in all elements and place in queue
601 for (;;) {
602 E item = (E)s.readObject();
603 if (item == null)
604 break;
605 add(item);
606 }
607 }
608 }
609
610
611
612
613