ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.20
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.19: +10 -51 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * @since 1.5
32 * @author Doug Lea
33 *
34 **/
35 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
36 implements BlockingQueue<E>, java.io.Serializable {
37 private static final long serialVersionUID = -6903933977591709194L;
38
39 /*
40 * A variant of the "two lock queue" algorithm. The putLock gates
41 * entry to put (and offer), and has an associated condition for
42 * waiting puts. Similarly for the takeLock. The "count" field
43 * that they both rely on is maintained as an atomic to avoid
44 * needing to get both locks in most cases. Also, to minimize need
45 * for puts to get takeLock and vice-versa, cascading notifies are
46 * used. When a put notices that it has enabled at least one take,
47 * it signals taker. That taker in turn signals others if more
48 * items have been entered since the signal. And symmetrically for
49 * takes signalling puts. Operations such as remove(Object) and
50 * iterators acquire both locks.
51 */
52
53 /**
54 * Linked list node class
55 */
56 static class Node<E> {
57 /** The item, volatile to ensure barrier separating write and read */
58 volatile E item;
59 Node<E> next;
60 Node(E x) { item = x; }
61 }
62
63 /** The capacity bound, or Integer.MAX_VALUE if none */
64 private final int capacity;
65
66 /** Current number of elements */
67 private final AtomicInteger count = new AtomicInteger(0);
68
69 /** Head of linked list */
70 private transient Node<E> head;
71
72 /** Tail of linked list */
73 private transient Node<E> last;
74
75 /** Lock held by take, poll, etc */
76 private final ReentrantLock takeLock = new ReentrantLock();
77
78 /** Wait queue for waiting takes */
79 private final Condition notEmpty = takeLock.newCondition();
80
81 /** Lock held by put, offer, etc */
82 private final ReentrantLock putLock = new ReentrantLock();
83
84 /** Wait queue for waiting puts */
85 private final Condition notFull = putLock.newCondition();
86
87 /**
88 * Signal a waiting take. Called only from put/offer (which do not
89 * otherwise ordinarily lock takeLock.)
90 */
91 private void signalNotEmpty() {
92 takeLock.lock();
93 try {
94 notEmpty.signal();
95 } finally {
96 takeLock.unlock();
97 }
98 }
99
100 /**
101 * Signal a waiting put. Called only from take/poll.
102 */
103 private void signalNotFull() {
104 putLock.lock();
105 try {
106 notFull.signal();
107 } finally {
108 putLock.unlock();
109 }
110 }
111
112 /**
113 * Create a node and link it at end of queue
114 * @param x the item
115 */
116 private void insert(E x) {
117 last = last.next = new Node<E>(x);
118 }
119
120 /**
121 * Remove a node from head of queue,
122 * @return the node
123 */
124 private E extract() {
125 Node<E> first = head.next;
126 head = first;
127 E x = (E)first.item;
128 first.item = null;
129 return x;
130 }
131
132 /**
133 * Lock to prevent both puts and takes.
134 */
135 private void fullyLock() {
136 putLock.lock();
137 takeLock.lock();
138 }
139
140 /**
141 * Unlock to allow both puts and takes.
142 */
143 private void fullyUnlock() {
144 takeLock.unlock();
145 putLock.unlock();
146 }
147
148
149 /**
150 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
151 * {@link Integer#MAX_VALUE}.
152 */
153 public LinkedBlockingQueue() {
154 this(Integer.MAX_VALUE);
155 }
156
157 /**
158 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
159 *
160 * @param capacity the capacity of this queue.
161 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
162 * than zero.
163 */
164 public LinkedBlockingQueue(int capacity) {
165 if (capacity <= 0) throw new IllegalArgumentException();
166 this.capacity = capacity;
167 last = head = new Node<E>(null);
168 }
169
170 /**
171 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
172 * {@link Integer#MAX_VALUE}, initially containing the elements of the
173 * given collection,
174 * added in traversal order of the collection's iterator.
175 * @param c the collection of elements to initially contain
176 * @throws NullPointerException if <tt>c</tt> or any element within it
177 * is <tt>null</tt>
178 */
179 public LinkedBlockingQueue(Collection<? extends E> c) {
180 this(Integer.MAX_VALUE);
181 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
182 add(it.next());
183 }
184
185
186 // this doc comment is overridden to remove the reference to collections
187 // greater in size than Integer.MAX_VALUE
188 /**
189 * Returns the number of elements in this queue.
190 *
191 * @return the number of elements in this queue.
192 */
193 public int size() {
194 return count.get();
195 }
196
197 // this doc comment is a modified copy of the inherited doc comment,
198 // without the reference to unlimited queues.
199 /**
200 * Returns the number of elements that this queue can ideally (in
201 * the absence of memory or resource constraints) accept without
202 * blocking. This is always equal to the initial capacity of this queue
203 * less the current <tt>size</tt> of this queue.
204 * <p>Note that you <em>cannot</em> always tell if
205 * an attempt to <tt>add</tt> an element will succeed by
206 * inspecting <tt>remainingCapacity</tt> because it may be the
207 * case that a waiting consumer is ready to <tt>take</tt> an
208 * element out of an otherwise full queue.
209 */
210 public int remainingCapacity() {
211 return capacity - count.get();
212 }
213
214 public void put(E o) throws InterruptedException {
215 if (o == null) throw new NullPointerException();
216 // Note: convention in all put/take/etc is to preset
217 // local var holding count negative to indicate failure unless set.
218 int c = -1;
219 putLock.lockInterruptibly();
220 try {
221 /*
222 * Note that count is used in wait guard even though it is
223 * not protected by lock. This works because count can
224 * only decrease at this point (all other puts are shut
225 * out by lock), and we (or some other waiting put) are
226 * signalled if it ever changes from
227 * capacity. Similarly for all other uses of count in
228 * other wait guards.
229 */
230 try {
231 while (count.get() == capacity)
232 notFull.await();
233 } catch (InterruptedException ie) {
234 notFull.signal(); // propagate to a non-interrupted thread
235 throw ie;
236 }
237 insert(o);
238 c = count.getAndIncrement();
239 if (c + 1 < capacity)
240 notFull.signal();
241 } finally {
242 putLock.unlock();
243 }
244 if (c == 0)
245 signalNotEmpty();
246 }
247
248 public boolean offer(E o, long timeout, TimeUnit unit)
249 throws InterruptedException {
250
251 if (o == null) throw new NullPointerException();
252 long nanos = unit.toNanos(timeout);
253 int c = -1;
254 putLock.lockInterruptibly();
255 try {
256 for (;;) {
257 if (count.get() < capacity) {
258 insert(o);
259 c = count.getAndIncrement();
260 if (c + 1 < capacity)
261 notFull.signal();
262 break;
263 }
264 if (nanos <= 0)
265 return false;
266 try {
267 nanos = notFull.awaitNanos(nanos);
268 } catch (InterruptedException ie) {
269 notFull.signal(); // propagate to a non-interrupted thread
270 throw ie;
271 }
272 }
273 } finally {
274 putLock.unlock();
275 }
276 if (c == 0)
277 signalNotEmpty();
278 return true;
279 }
280
281 /**
282 * Adds the specified element to the tail of this queue if possible,
283 * returning immediately if this queue is full.
284 *
285 * @param o the element to add.
286 * @return <tt>true</tt> if it was possible to add the element to
287 * this queue, else <tt>false</tt>
288 * @throws NullPointerException if the specified element is <tt>null</tt>
289 */
290 public boolean offer(E o) {
291 if (o == null) throw new NullPointerException();
292 if (count.get() == capacity)
293 return false;
294 int c = -1;
295 putLock.lock();
296 try {
297 if (count.get() < capacity) {
298 insert(o);
299 c = count.getAndIncrement();
300 if (c + 1 < capacity)
301 notFull.signal();
302 }
303 } finally {
304 putLock.unlock();
305 }
306 if (c == 0)
307 signalNotEmpty();
308 return c >= 0;
309 }
310
311
312 public E take() throws InterruptedException {
313 E x;
314 int c = -1;
315 takeLock.lockInterruptibly();
316 try {
317 try {
318 while (count.get() == 0)
319 notEmpty.await();
320 } catch (InterruptedException ie) {
321 notEmpty.signal(); // propagate to a non-interrupted thread
322 throw ie;
323 }
324
325 x = extract();
326 c = count.getAndDecrement();
327 if (c > 1)
328 notEmpty.signal();
329 } finally {
330 takeLock.unlock();
331 }
332 if (c == capacity)
333 signalNotFull();
334 return x;
335 }
336
337 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
338 E x = null;
339 int c = -1;
340 long nanos = unit.toNanos(timeout);
341 takeLock.lockInterruptibly();
342 try {
343 for (;;) {
344 if (count.get() > 0) {
345 x = extract();
346 c = count.getAndDecrement();
347 if (c > 1)
348 notEmpty.signal();
349 break;
350 }
351 if (nanos <= 0)
352 return null;
353 try {
354 nanos = notEmpty.awaitNanos(nanos);
355 } catch (InterruptedException ie) {
356 notEmpty.signal(); // propagate to a non-interrupted thread
357 throw ie;
358 }
359 }
360 } finally {
361 takeLock.unlock();
362 }
363 if (c == capacity)
364 signalNotFull();
365 return x;
366 }
367
368 public E poll() {
369 if (count.get() == 0)
370 return null;
371 E x = null;
372 int c = -1;
373 takeLock.tryLock();
374 try {
375 if (count.get() > 0) {
376 x = extract();
377 c = count.getAndDecrement();
378 if (c > 1)
379 notEmpty.signal();
380 }
381 } finally {
382 takeLock.unlock();
383 }
384 if (c == capacity)
385 signalNotFull();
386 return x;
387 }
388
389
390 public E peek() {
391 if (count.get() == 0)
392 return null;
393 takeLock.lock();
394 try {
395 Node<E> first = head.next;
396 if (first == null)
397 return null;
398 else
399 return first.item;
400 } finally {
401 takeLock.unlock();
402 }
403 }
404
405 public boolean remove(Object o) {
406 if (o == null) return false;
407 boolean removed = false;
408 fullyLock();
409 try {
410 Node<E> trail = head;
411 Node<E> p = head.next;
412 while (p != null) {
413 if (o.equals(p.item)) {
414 removed = true;
415 break;
416 }
417 trail = p;
418 p = p.next;
419 }
420 if (removed) {
421 p.item = null;
422 trail.next = p.next;
423 if (count.getAndDecrement() == capacity)
424 notFull.signalAll();
425 }
426 } finally {
427 fullyUnlock();
428 }
429 return removed;
430 }
431
432 public Object[] toArray() {
433 fullyLock();
434 try {
435 int size = count.get();
436 Object[] a = new Object[size];
437 int k = 0;
438 for (Node<E> p = head.next; p != null; p = p.next)
439 a[k++] = p.item;
440 return a;
441 } finally {
442 fullyUnlock();
443 }
444 }
445
446 public <T> T[] toArray(T[] a) {
447 fullyLock();
448 try {
449 int size = count.get();
450 if (a.length < size)
451 a = (T[])java.lang.reflect.Array.newInstance
452 (a.getClass().getComponentType(), size);
453
454 int k = 0;
455 for (Node p = head.next; p != null; p = p.next)
456 a[k++] = (T)p.item;
457 return a;
458 } finally {
459 fullyUnlock();
460 }
461 }
462
463 public String toString() {
464 fullyLock();
465 try {
466 return super.toString();
467 } finally {
468 fullyUnlock();
469 }
470 }
471
472 /**
473 * Returns an iterator over the elements in this queue in proper sequence.
474 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
475 * will never throw {@link java.util.ConcurrentModificationException},
476 * and guarantees to traverse elements as they existed upon
477 * construction of the iterator, and may (but is not guaranteed to)
478 * reflect any modifications subsequent to construction.
479 *
480 * @return an iterator over the elements in this queue in proper sequence.
481 */
482 public Iterator<E> iterator() {
483 return new Itr();
484 }
485
486 private class Itr implements Iterator<E> {
487 /*
488 * Basic weak-consistent iterator. At all times hold the next
489 * item to hand out so that if hasNext() reports true, we will
490 * still have it to return even if lost race with a take etc.
491 */
492 Node<E> current;
493 Node<E> lastRet;
494 E currentElement;
495
496 Itr() {
497 fullyLock();
498 try {
499 current = head.next;
500 if (current != null)
501 currentElement = current.item;
502 } finally {
503 fullyUnlock();
504 }
505 }
506
507 public boolean hasNext() {
508 return current != null;
509 }
510
511 public E next() {
512 fullyLock();
513 try {
514 if (current == null)
515 throw new NoSuchElementException();
516 E x = currentElement;
517 lastRet = current;
518 current = current.next;
519 if (current != null)
520 currentElement = current.item;
521 return x;
522 } finally {
523 fullyUnlock();
524 }
525
526 }
527
528 public void remove() {
529 if (lastRet == null)
530 throw new IllegalStateException();
531 fullyLock();
532 try {
533 Node<E> node = lastRet;
534 lastRet = null;
535 Node<E> trail = head;
536 Node<E> p = head.next;
537 while (p != null && p != node) {
538 trail = p;
539 p = p.next;
540 }
541 if (p == node) {
542 p.item = null;
543 trail.next = p.next;
544 int c = count.getAndDecrement();
545 if (c == capacity)
546 notFull.signalAll();
547 }
548 } finally {
549 fullyUnlock();
550 }
551 }
552 }
553
554 /**
555 * Save the state to a stream (that is, serialize it).
556 *
557 * @serialData The capacity is emitted (int), followed by all of
558 * its elements (each an <tt>Object</tt>) in the proper order,
559 * followed by a null
560 * @param s the stream
561 */
562 private void writeObject(java.io.ObjectOutputStream s)
563 throws java.io.IOException {
564
565 fullyLock();
566 try {
567 // Write out any hidden stuff, plus capacity
568 s.defaultWriteObject();
569
570 // Write out all elements in the proper order.
571 for (Node<E> p = head.next; p != null; p = p.next)
572 s.writeObject(p.item);
573
574 // Use trailing null as sentinel
575 s.writeObject(null);
576 } finally {
577 fullyUnlock();
578 }
579 }
580
581 /**
582 * Reconstitute this queue instance from a stream (that is,
583 * deserialize it).
584 * @param s the stream
585 */
586 private void readObject(java.io.ObjectInputStream s)
587 throws java.io.IOException, ClassNotFoundException {
588 // Read in capacity, and any hidden stuff
589 s.defaultReadObject();
590
591 count.set(0);
592 last = head = new Node<E>(null);
593
594 // Read in all elements and place in queue
595 for (;;) {
596 E item = (E)s.readObject();
597 if (item == null)
598 break;
599 add(item);
600 }
601 }
602 }