ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.10
Committed: Mon Aug 4 02:00:37 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.9: +1 -1 lines
Log Message:
Added wildcards

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@link BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time.
20 * Linked queues typically have higher throughput than array-based queues but
21 * less predictable performance in most concurrent applications.
22 *
23 * <p> The optional capacity bound constructor argument serves as a
24 * way to prevent excessive queue expansion. The capacity, if unspecified,
25 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 * dynamically created upon each insertion unless this would bring the
27 * queue above capacity.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 *
32 **/
33 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 * A variant of the "two lock queue" algorithm. The putLock gates
38 * entry to put (and offer), and has an associated condition for
39 * waiting puts. Similarly for the takeLock. The "count" field
40 * that they both rely on is maintained as an atomic to avoid
41 * needing to get both locks in most cases. Also, to minimize need
42 * for puts to get takeLock and vice-versa, cascading notifies are
43 * used. When a put notices that it has enabled at least one take,
44 * it signals taker. That taker in turn signals others if more
45 * items have been entered since the signal. And symmetrically for
46 * takes signalling puts. Operations such as remove(Object) and
47 * iterators acquire both locks.
48 */
49
50 /**
51 * Linked list node class
52 */
53 static class Node<E> {
54 /** The item, volatile to ensure barrier separating write and read */
55 volatile E item;
56 Node<E> next;
57 Node(E x) { item = x; }
58 }
59
60 /** The capacity bound, or Integer.MAX_VALUE if none */
61 private final int capacity;
62
63 /** Current number of elements */
64 private transient final AtomicInteger count = new AtomicInteger(0);
65
66 /** Head of linked list */
67 private transient Node<E> head;
68
69 /** Tail of linked list */
70 private transient Node<E> last;
71
72 /** Lock held by take, poll, etc */
73 private final ReentrantLock takeLock = new ReentrantLock();
74
75 /** Wait queue for waiting takes */
76 private final Condition notEmpty = takeLock.newCondition();
77
78 /** Lock held by put, offer, etc */
79 private final ReentrantLock putLock = new ReentrantLock();
80
81 /** Wait queue for waiting puts */
82 private final Condition notFull = putLock.newCondition();
83
84 /**
85 * Signal a waiting take. Called only from put/offer (which do not
86 * otherwise ordinarily lock takeLock.)
87 */
88 private void signalNotEmpty() {
89 takeLock.lock();
90 try {
91 notEmpty.signal();
92 }
93 finally {
94 takeLock.unlock();
95 }
96 }
97
98 /**
99 * Signal a waiting put. Called only from take/poll.
100 */
101 private void signalNotFull() {
102 putLock.lock();
103 try {
104 notFull.signal();
105 }
106 finally {
107 putLock.unlock();
108 }
109 }
110
111 /**
112 * Create a node and link it at end of queue
113 * @param x the item
114 */
115 private void insert(E x) {
116 last = last.next = new Node<E>(x);
117 }
118
119 /**
120 * Remove a node from head of queue,
121 * @return the node
122 */
123 private E extract() {
124 Node<E> first = head.next;
125 head = first;
126 E x = (E)first.item;
127 first.item = null;
128 return x;
129 }
130
131 /**
132 * Lock to prevent both puts and takes.
133 */
134 private void fullyLock() {
135 putLock.lock();
136 takeLock.lock();
137 }
138
139 /**
140 * Unlock to allow both puts and takes.
141 */
142 private void fullyUnlock() {
143 takeLock.unlock();
144 putLock.unlock();
145 }
146
147
148 /**
149 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
150 * {@link Integer#MAX_VALUE}.
151 */
152 public LinkedBlockingQueue() {
153 this(Integer.MAX_VALUE);
154 }
155
156 /**
157 * Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158 * @param capacity the capacity of this queue.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160 * than zero.
161 */
162 public LinkedBlockingQueue(int capacity) {
163 if (capacity <= 0) throw new IllegalArgumentException();
164 this.capacity = capacity;
165 last = head = new Node<E>(null);
166 }
167
168 /**
169 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
170 * {@link Integer#MAX_VALUE}, initially holding the elements of the
171 * given collection,
172 * added in traversal order of the collection's iterator.
173 * @param c the collection of elements to initially contain
174 * @throws NullPointerException if <tt>c</tt> or any element within it
175 * is <tt>null</tt>
176 */
177 public LinkedBlockingQueue(Collection<? extends E> c) {
178 this(Integer.MAX_VALUE);
179 for (Iterator<E> it = c.iterator(); it.hasNext();)
180 add(it.next());
181 }
182
183
184 // Have to override just to update the javadoc for @throws
185
186 /**
187 * @throws IllegalStateException {@inheritDoc}
188 * @throws NullPointerException {@inheritDoc}
189 */
190 public boolean add(E o) {
191 return super.add(o);
192 }
193
194 /**
195 * @throws IllegalStateException {@inheritDoc}
196 * @throws NullPointerException {@inheritDoc}
197 */
198 public boolean addAll(Collection<? extends E> c) {
199 return super.addAll(c);
200 }
201
202
203 // this doc comment is overridden to remove the reference to collections
204 // greater in size than Integer.MAX_VALUE
205 /**
206 * Return the number of elements in this collection.
207 */
208 public int size() {
209 return count.get();
210 }
211
212 // this doc comment is a modified copy of the inherited doc comment,
213 // without the reference to unlimited queues.
214 /**
215 * Return the number of elements that this queue can ideally (in
216 * the absence of memory or resource constraints) accept without
217 * blocking. This is always equal to the initial capacity of this queue
218 * less the current <tt>size</tt> of this queue.
219 * <p>Note that you <em>cannot</em> always tell if
220 * an attempt to <tt>add</tt> an element will succeed by
221 * inspecting <tt>remainingCapacity</tt> because it may be the
222 * case that a waiting consumer is ready to <tt>take</tt> an
223 * element out of an otherwise full queue.
224 */
225 public int remainingCapacity() {
226 return capacity - count.get();
227 }
228
229 /**
230 * Add the specified element to the tail of this queue, waiting if
231 * necessary for space to become available.
232 * @throws NullPointerException {@inheritDoc}
233 */
234 public void put(E x) throws InterruptedException {
235 if (x == null) throw new NullPointerException();
236 // Note: convention in all put/take/etc is to preset
237 // local var holding count negative to indicate failure unless set.
238 int c = -1;
239 putLock.lockInterruptibly();
240 try {
241 /*
242 * Note that count is used in wait guard even though it is
243 * not protected by lock. This works because count can
244 * only decrease at this point (all other puts are shut
245 * out by lock), and we (or some other waiting put) are
246 * signalled if it ever changes from
247 * capacity. Similarly for all other uses of count in
248 * other wait guards.
249 */
250 try {
251 while (count.get() == capacity)
252 notFull.await();
253 }
254 catch (InterruptedException ie) {
255 notFull.signal(); // propagate to a non-interrupted thread
256 throw ie;
257 }
258 insert(x);
259 c = count.getAndIncrement();
260 if (c + 1 < capacity)
261 notFull.signal();
262 }
263 finally {
264 putLock.unlock();
265 }
266 if (c == 0)
267 signalNotEmpty();
268 }
269
270 /**
271 * Add the specified element to the tail of this queue, waiting if
272 * necessary up to the specified wait time for space to become available.
273 * @throws NullPointerException {@inheritDoc}
274 */
275 public boolean offer(E x, long timeout, TimeUnit unit)
276 throws InterruptedException {
277
278 if (x == null) throw new NullPointerException();
279 long nanos = unit.toNanos(timeout);
280 int c = -1;
281 putLock.lockInterruptibly();
282 try {
283 for (;;) {
284 if (count.get() < capacity) {
285 insert(x);
286 c = count.getAndIncrement();
287 if (c + 1 < capacity)
288 notFull.signal();
289 break;
290 }
291 if (nanos <= 0)
292 return false;
293 try {
294 nanos = notFull.awaitNanos(nanos);
295 }
296 catch (InterruptedException ie) {
297 notFull.signal(); // propagate to a non-interrupted thread
298 throw ie;
299 }
300 }
301 }
302 finally {
303 putLock.unlock();
304 }
305 if (c == 0)
306 signalNotEmpty();
307 return true;
308 }
309
310 /**
311 * Add the specified element to the tail of this queue if possible,
312 * returning immediately if this queue is full.
313 *
314 * @throws NullPointerException {@inheritDoc}
315 */
316 public boolean offer(E x) {
317 if (x == null) throw new NullPointerException();
318 if (count.get() == capacity)
319 return false;
320 int c = -1;
321 putLock.lock();
322 try {
323 if (count.get() < capacity) {
324 insert(x);
325 c = count.getAndIncrement();
326 if (c + 1 < capacity)
327 notFull.signal();
328 }
329 }
330 finally {
331 putLock.unlock();
332 }
333 if (c == 0)
334 signalNotEmpty();
335 return c >= 0;
336 }
337
338
339 public E take() throws InterruptedException {
340 E x;
341 int c = -1;
342 takeLock.lockInterruptibly();
343 try {
344 try {
345 while (count.get() == 0)
346 notEmpty.await();
347 }
348 catch (InterruptedException ie) {
349 notEmpty.signal(); // propagate to a non-interrupted thread
350 throw ie;
351 }
352
353 x = extract();
354 c = count.getAndDecrement();
355 if (c > 1)
356 notEmpty.signal();
357 }
358 finally {
359 takeLock.unlock();
360 }
361 if (c == capacity)
362 signalNotFull();
363 return x;
364 }
365
366 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
367 E x = null;
368 int c = -1;
369 long nanos = unit.toNanos(timeout);
370 takeLock.lockInterruptibly();
371 try {
372 for (;;) {
373 if (count.get() > 0) {
374 x = extract();
375 c = count.getAndDecrement();
376 if (c > 1)
377 notEmpty.signal();
378 break;
379 }
380 if (nanos <= 0)
381 return null;
382 try {
383 nanos = notEmpty.awaitNanos(nanos);
384 }
385 catch (InterruptedException ie) {
386 notEmpty.signal(); // propagate to a non-interrupted thread
387 throw ie;
388 }
389 }
390 }
391 finally {
392 takeLock.unlock();
393 }
394 if (c == capacity)
395 signalNotFull();
396 return x;
397 }
398
399 public E poll() {
400 if (count.get() == 0)
401 return null;
402 E x = null;
403 int c = -1;
404 takeLock.tryLock();
405 try {
406 if (count.get() > 0) {
407 x = extract();
408 c = count.getAndDecrement();
409 if (c > 1)
410 notEmpty.signal();
411 }
412 }
413 finally {
414 takeLock.unlock();
415 }
416 if (c == capacity)
417 signalNotFull();
418 return x;
419 }
420
421
422 public E peek() {
423 if (count.get() == 0)
424 return null;
425 takeLock.lock();
426 try {
427 Node<E> first = head.next;
428 if (first == null)
429 return null;
430 else
431 return first.item;
432 }
433 finally {
434 takeLock.unlock();
435 }
436 }
437
438 public boolean remove(Object o) {
439 if (o == null) return false;
440 boolean removed = false;
441 fullyLock();
442 try {
443 Node<E> trail = head;
444 Node<E> p = head.next;
445 while (p != null) {
446 if (o.equals(p.item)) {
447 removed = true;
448 break;
449 }
450 trail = p;
451 p = p.next;
452 }
453 if (removed) {
454 p.item = null;
455 trail.next = p.next;
456 if (count.getAndDecrement() == capacity)
457 notFull.signalAll();
458 }
459 }
460 finally {
461 fullyUnlock();
462 }
463 return removed;
464 }
465
466 public Object[] toArray() {
467 fullyLock();
468 try {
469 int size = count.get();
470 Object[] a = new Object[size];
471 int k = 0;
472 for (Node<E> p = head.next; p != null; p = p.next)
473 a[k++] = p.item;
474 return a;
475 }
476 finally {
477 fullyUnlock();
478 }
479 }
480
481 public <T> T[] toArray(T[] a) {
482 fullyLock();
483 try {
484 int size = count.get();
485 if (a.length < size)
486 a = (T[])java.lang.reflect.Array.newInstance
487 (a.getClass().getComponentType(), size);
488
489 int k = 0;
490 for (Node p = head.next; p != null; p = p.next)
491 a[k++] = (T)p.item;
492 return a;
493 }
494 finally {
495 fullyUnlock();
496 }
497 }
498
499 public String toString() {
500 fullyLock();
501 try {
502 return super.toString();
503 }
504 finally {
505 fullyUnlock();
506 }
507 }
508
509 public Iterator<E> iterator() {
510 return new Itr();
511 }
512
513 private class Itr implements Iterator<E> {
514 /*
515 * Basic weak-consistent iterator. At all times hold the next
516 * item to hand out so that if hasNext() reports true, we will
517 * still have it to return even if lost race with a take etc.
518 */
519 Node<E> current;
520 Node<E> lastRet;
521 E currentElement;
522
523 Itr() {
524 fullyLock();
525 try {
526 current = head.next;
527 if (current != null)
528 currentElement = current.item;
529 }
530 finally {
531 fullyUnlock();
532 }
533 }
534
535 public boolean hasNext() {
536 return current != null;
537 }
538
539 public E next() {
540 fullyLock();
541 try {
542 if (current == null)
543 throw new NoSuchElementException();
544 E x = currentElement;
545 lastRet = current;
546 current = current.next;
547 if (current != null)
548 currentElement = current.item;
549 return x;
550 }
551 finally {
552 fullyUnlock();
553 }
554
555 }
556
557 public void remove() {
558 if (lastRet == null)
559 throw new IllegalStateException();
560 fullyLock();
561 try {
562 Node<E> node = lastRet;
563 lastRet = null;
564 Node<E> trail = head;
565 Node<E> p = head.next;
566 while (p != null && p != node) {
567 trail = p;
568 p = p.next;
569 }
570 if (p == node) {
571 p.item = null;
572 trail.next = p.next;
573 int c = count.getAndDecrement();
574 if (c == capacity)
575 notFull.signalAll();
576 }
577 }
578 finally {
579 fullyUnlock();
580 }
581 }
582 }
583
584 /**
585 * Save the state to a stream (that is, serialize it).
586 *
587 * @serialData The capacity is emitted (int), followed by all of
588 * its elements (each an <tt>Object</tt>) in the proper order,
589 * followed by a null
590 * @param s the stream
591 */
592 private void writeObject(java.io.ObjectOutputStream s)
593 throws java.io.IOException {
594
595 fullyLock();
596 try {
597 // Write out any hidden stuff, plus capacity
598 s.defaultWriteObject();
599
600 // Write out all elements in the proper order.
601 for (Node<E> p = head.next; p != null; p = p.next)
602 s.writeObject(p.item);
603
604 // Use trailing null as sentinel
605 s.writeObject(null);
606 }
607 finally {
608 fullyUnlock();
609 }
610 }
611
612 /**
613 * Reconstitute this queue instance from a stream (that is,
614 * deserialize it).
615 * @param s the stream
616 */
617 private void readObject(java.io.ObjectInputStream s)
618 throws java.io.IOException, ClassNotFoundException {
619 // Read in capacity, and any hidden stuff
620 s.defaultReadObject();
621
622 // Read in all elements and place in queue
623 for (;;) {
624 E item = (E)s.readObject();
625 if (item == null)
626 break;
627 add(item);
628 }
629 }
630 }
631
632
633
634
635