ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.6
Committed: Tue Jun 24 14:34:48 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.5: +37 -13 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An optionally-bounded blocking queue based on linked nodes. Linked
13 * queues typically have higher throughput than array-based queues but
14 * less predicatble performance in most concurrent applications.
15 *
16 * <p> The optional capacity bound constructor argument serves as a
17 * way to prevent unlmited queue expansion. Linked nodes are
18 * dynamically created upon each insertion unless this would bring the
19 * queue above capacity.
20 * @since 1.5
21 * @author Doug Lea
22 *
23 **/
24 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
25 implements BlockingQueue<E>, java.io.Serializable {
26
27 /*
28 * A variant of the "two lock queue" algorithm. The putLock gates
29 * entry to put (and offer), and has an associated condition for
30 * waiting puts. Similarly for the takeLock. The "count" field
31 * that they both rely on is maintained as an atomic to avoid
32 * needing to get both locks in most cases. Also, to minimize need
33 * for puts to get takeLock and vice-versa, cascading notifies are
34 * used. When a put notices that it has enabled at least one take,
35 * it signals taker. That taker in turn signals others if more
36 * items have been entered since the signal. And symmetrically for
37 * takes signalling puts. Operations such as remove(Object) and
38 * iterators acquire both locks.
39 */
40
41 /**
42 * Linked list node class
43 */
44 static class Node<E> {
45 /** The item, volatile to ensure barrier separating write and read */
46 volatile E item;
47 Node<E> next;
48 Node(E x) { item = x; }
49 }
50
51 /** The capacity bound, or Integer.MAX_VALUE if none */
52 private final int capacity;
53
54 /** Current number of elements */
55 private transient final AtomicInteger count = new AtomicInteger(0);
56
57 /** Head of linked list */
58 private transient Node<E> head;
59
60 /** Tail of lined list */
61 private transient Node<E> last;
62
63 /** Lock held by take, poll, etc */
64 private final ReentrantLock takeLock = new ReentrantLock();
65
66 /** Wait queue for waiting takes */
67 private final Condition notEmpty = takeLock.newCondition();
68
69 /** Lock held by put, offer, etc */
70 private final ReentrantLock putLock = new ReentrantLock();
71
72 /** Wait queue for waiting puts */
73 private final Condition notFull = putLock.newCondition();
74
75 /**
76 * Signal a waiting take. Called only from put/offer (which do not
77 * otherwise ordinarily lock takeLock.)
78 */
79 private void signalNotEmpty() {
80 takeLock.lock();
81 try {
82 notEmpty.signal();
83 }
84 finally {
85 takeLock.unlock();
86 }
87 }
88
89 /**
90 * Signal a waiting put. Called only from take/poll.
91 */
92 private void signalNotFull() {
93 putLock.lock();
94 try {
95 notFull.signal();
96 }
97 finally {
98 putLock.unlock();
99 }
100 }
101
102 /**
103 * Create a node and link it and end of queue
104 * @param x the item
105 */
106 private void insert(E x) {
107 last = last.next = new Node<E>(x);
108 }
109
110 /**
111 * Remove a node from head of queue,
112 * @return the node
113 */
114 private E extract() {
115 Node<E> first = head.next;
116 head = first;
117 E x = (E)first.item;
118 first.item = null;
119 return x;
120 }
121
122 /**
123 * Lock to prevent both puts and takes.
124 */
125 private void fullyLock() {
126 putLock.lock();
127 takeLock.lock();
128 }
129
130 /**
131 * Unlock to allow both puts and takes.
132 */
133 private void fullyUnlock() {
134 takeLock.unlock();
135 putLock.unlock();
136 }
137
138
139 /**
140 * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
141 */
142 public LinkedBlockingQueue() {
143 this(Integer.MAX_VALUE);
144 }
145
146 /**
147 * Creates a LinkedBlockingQueue with the given capacity constraint.
148 * @param capacity the maminum number of elements to hold without blocking.
149 */
150 public LinkedBlockingQueue(int capacity) {
151 if (capacity <= 0) throw new NullPointerException();
152 this.capacity = capacity;
153 last = head = new Node<E>(null);
154 }
155
156 /**
157 * Creates a LinkedBlockingQueue without an intrinsic capacity
158 * constraint, initially holding the given elements, added in
159 * traveral order of the collection's iterator.
160 * @param initialElements the elements to initially contain
161 */
162 public LinkedBlockingQueue(Collection<E> initialElements) {
163 this(Integer.MAX_VALUE);
164 for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
165 add(it.next());
166 }
167
168 public int size() {
169 return count.get();
170 }
171
172 public int remainingCapacity() {
173 return capacity - count.get();
174 }
175
176 public void put(E x) throws InterruptedException {
177 if (x == null) throw new NullPointerException();
178 // Note: convention in all put/take/etc is to preset
179 // local var holding count negative to indicate failure unless set.
180 int c = -1;
181 putLock.lockInterruptibly();
182 try {
183 /*
184 * Note that count is used in wait guard even though it is
185 * not protected by lock. This works because count can
186 * only decrease at this point (all other puts are shut
187 * out by lock), and we (or some other waiting put) are
188 * signalled if it ever changes from
189 * capacity. Similarly for all other uses of count in
190 * other wait guards.
191 */
192 try {
193 while (count.get() == capacity)
194 notFull.await();
195 }
196 catch (InterruptedException ie) {
197 notFull.signal(); // propagate to a non-interrupted thread
198 throw ie;
199 }
200 insert(x);
201 c = count.getAndIncrement();
202 if (c + 1 < capacity)
203 notFull.signal();
204 }
205 finally {
206 putLock.unlock();
207 }
208 if (c == 0)
209 signalNotEmpty();
210 }
211
212 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
213 if (x == null) throw new NullPointerException();
214 putLock.lockInterruptibly();
215 long nanos = unit.toNanos(timeout);
216 int c = -1;
217 try {
218 for (;;) {
219 if (count.get() < capacity) {
220 insert(x);
221 c = count.getAndIncrement();
222 if (c + 1 < capacity)
223 notFull.signal();
224 break;
225 }
226 if (nanos <= 0)
227 return false;
228 try {
229 nanos = notFull.awaitNanos(nanos);
230 }
231 catch (InterruptedException ie) {
232 notFull.signal(); // propagate to a non-interrupted thread
233 throw ie;
234 }
235 }
236 }
237 finally {
238 putLock.unlock();
239 }
240 if (c == 0)
241 signalNotEmpty();
242 return true;
243 }
244
245 public boolean offer(E x) {
246 if (x == null) throw new NullPointerException();
247 if (count.get() == capacity)
248 return false;
249 putLock.tryLock();
250 int c = -1;
251 try {
252 if (count.get() < capacity) {
253 insert(x);
254 c = count.getAndIncrement();
255 if (c + 1 < capacity)
256 notFull.signal();
257 }
258 }
259 finally {
260 putLock.unlock();
261 }
262 if (c == 0)
263 signalNotEmpty();
264 return c >= 0;
265 }
266
267
268 public E take() throws InterruptedException {
269 E x;
270 int c = -1;
271 takeLock.lockInterruptibly();
272 try {
273 try {
274 while (count.get() == 0)
275 notEmpty.await();
276 }
277 catch (InterruptedException ie) {
278 notEmpty.signal(); // propagate to a non-interrupted thread
279 throw ie;
280 }
281
282 x = extract();
283 c = count.getAndDecrement();
284 if (c > 1)
285 notEmpty.signal();
286 }
287 finally {
288 takeLock.unlock();
289 }
290 if (c == capacity)
291 signalNotFull();
292 return x;
293 }
294
295 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
296 E x = null;
297 int c = -1;
298 takeLock.lockInterruptibly();
299 long nanos = unit.toNanos(timeout);
300 try {
301 for (;;) {
302 if (count.get() > 0) {
303 x = extract();
304 c = count.getAndDecrement();
305 if (c > 1)
306 notEmpty.signal();
307 break;
308 }
309 if (nanos <= 0)
310 return null;
311 try {
312 nanos = notEmpty.awaitNanos(nanos);
313 }
314 catch (InterruptedException ie) {
315 notEmpty.signal(); // propagate to a non-interrupted thread
316 throw ie;
317 }
318 }
319 }
320 finally {
321 takeLock.unlock();
322 }
323 if (c == capacity)
324 signalNotFull();
325 return x;
326 }
327
328 public E poll() {
329 if (count.get() == 0)
330 return null;
331 E x = null;
332 int c = -1;
333 takeLock.tryLock();
334 try {
335 if (count.get() > 0) {
336 x = extract();
337 c = count.getAndDecrement();
338 if (c > 1)
339 notEmpty.signal();
340 }
341 }
342 finally {
343 takeLock.unlock();
344 }
345 if (c == capacity)
346 signalNotFull();
347 return x;
348 }
349
350
351 public E peek() {
352 if (count.get() == 0)
353 return null;
354 takeLock.tryLock();
355 try {
356 Node<E> first = head.next;
357 if (first == null)
358 return null;
359 else
360 return first.item;
361 }
362 finally {
363 takeLock.unlock();
364 }
365 }
366
367 public boolean remove(Object x) {
368 if (x == null) return false;
369 boolean removed = false;
370 fullyLock();
371 try {
372 Node<E> trail = head;
373 Node<E> p = head.next;
374 while (p != null) {
375 if (x.equals(p.item)) {
376 removed = true;
377 break;
378 }
379 trail = p;
380 p = p.next;
381 }
382 if (removed) {
383 p.item = null;
384 trail.next = p.next;
385 if (count.getAndDecrement() == capacity)
386 notFull.signalAll();
387 }
388 }
389 finally {
390 fullyUnlock();
391 }
392 return removed;
393 }
394
395 public Object[] toArray() {
396 fullyLock();
397 try {
398 int size = count.get();
399 Object[] a = new Object[size];
400 int k = 0;
401 for (Node<E> p = head.next; p != null; p = p.next)
402 a[k++] = p.item;
403 return a;
404 }
405 finally {
406 fullyUnlock();
407 }
408 }
409
410 public <T> T[] toArray(T[] a) {
411 fullyLock();
412 try {
413 int size = count.get();
414 if (a.length < size)
415 a = (T[])java.lang.reflect.Array.newInstance
416 (a.getClass().getComponentType(), size);
417
418 int k = 0;
419 for (Node p = head.next; p != null; p = p.next)
420 a[k++] = (T)p.item;
421 return a;
422 }
423 finally {
424 fullyUnlock();
425 }
426 }
427
428 public String toString() {
429 fullyLock();
430 try {
431 return super.toString();
432 }
433 finally {
434 fullyUnlock();
435 }
436 }
437
438 public Iterator<E> iterator() {
439 return new Itr();
440 }
441
442 private class Itr implements Iterator<E> {
443 /*
444 * Basic weak-consistent iterator. At all times hold the next
445 * item to hand out so that if hasNext() reports true, we will
446 * still have it to return even if lost race with a take etc.
447 */
448 Node<E> current;
449 Node<E> lastRet;
450 E currentElement;
451
452 Itr() {
453 fullyLock();
454 try {
455 current = head.next;
456 if (current != null)
457 currentElement = current.item;
458 }
459 finally {
460 fullyUnlock();
461 }
462 }
463
464 public boolean hasNext() {
465 return current != null;
466 }
467
468 public E next() {
469 fullyLock();
470 try {
471 if (current == null)
472 throw new NoSuchElementException();
473 E x = currentElement;
474 lastRet = current;
475 current = current.next;
476 if (current != null)
477 currentElement = current.item;
478 return x;
479 }
480 finally {
481 fullyUnlock();
482 }
483
484 }
485
486 public void remove() {
487 if (lastRet == null)
488 throw new IllegalStateException();
489 fullyLock();
490 try {
491 Node<E> node = lastRet;
492 lastRet = null;
493 Node<E> trail = head;
494 Node<E> p = head.next;
495 while (p != null && p != node) {
496 trail = p;
497 p = p.next;
498 }
499 if (p == node) {
500 p.item = null;
501 trail.next = p.next;
502 int c = count.getAndDecrement();
503 if (c == capacity)
504 notFull.signalAll();
505 }
506 }
507 finally {
508 fullyUnlock();
509 }
510 }
511 }
512
513 /**
514 * Save the state to a stream (that is, serialize it).
515 *
516 * @serialData The capacity is emitted (int), followed by all of
517 * its elements (each an <tt>Object</tt>) in the proper order,
518 * followed by a null
519 * @param s the stream
520 */
521 private void writeObject(java.io.ObjectOutputStream s)
522 throws java.io.IOException {
523
524 fullyLock();
525 try {
526 // Write out any hidden stuff, plus capacity
527 s.defaultWriteObject();
528
529 // Write out all elements in the proper order.
530 for (Node<E> p = head.next; p != null; p = p.next)
531 s.writeObject(p.item);
532
533 // Use trailing null as sentinel
534 s.writeObject(null);
535 }
536 finally {
537 fullyUnlock();
538 }
539 }
540
541 /**
542 * Reconstitute the Queue instance from a stream (that is,
543 * deserialize it).
544 * @param s the stream
545 */
546 private void readObject(java.io.ObjectInputStream s)
547 throws java.io.IOException, ClassNotFoundException {
548 // Read in capacity, and any hidden stuff
549 s.defaultReadObject();
550
551 // Read in all elements and place in queue
552 for (;;) {
553 E item = (E)s.readObject();
554 if (item == null)
555 break;
556 add(item);
557 }
558 }
559 }
560