ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.5
Committed: Sun Jun 22 23:51:37 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.4: +4 -4 lines
Log Message:
Added synched toString()

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An optionally-bounded blocking queue based on linked nodes. Linked
13 * queues typically have higher throughput than array-based queues but
14 * less predicatble performance in most concurrent applications.
15 *
16 * <p> The optional capacity bound constructor argument serves as a
17 * way to prevent unlmited queue expansion. Linked nodes are
18 * dynamically created upon each insertion unless this would bring the
19 * queue above capacity.
20 *
21 **/
22 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
23 implements BlockingQueue<E>, java.io.Serializable {
24
25 /*
26 * A variant of the "two lock queue" algorithm. The putLock gates
27 * entry to put (and offer), and has an associated condition for
28 * waiting puts. Similarly for the takeLock. The "count" field
29 * that they both rely on is maintained as an atomic to avoid
30 * needing to get both locks in most cases. Also, to minimize need
31 * for puts to get takeLock and vice-versa, cascading notifies are
32 * used. When a put notices that it has enabled at least one take,
33 * it signals taker. That taker in turn signals others if more
34 * items have been entered since the signal. And symmetrically for
35 * takes signalling puts. Operations such as remove(Object) and
36 * iterators acquire both locks.
37 */
38
39 static class Node<E> {
40 volatile E item;
41 Node<E> next;
42 Node(E x) { item = x; }
43 }
44
45 private final int capacity;
46 private transient final AtomicInteger count = new AtomicInteger(0);
47
48 private transient Node<E> head = new Node<E>(null);
49 private transient Node<E> last = head;
50
51 private final ReentrantLock takeLock = new ReentrantLock();
52 private final Condition notEmpty = takeLock.newCondition();
53
54 private final ReentrantLock putLock = new ReentrantLock();
55 private final Condition notFull = putLock.newCondition();
56
57
58 /**
59 * Signal a waiting take. Called only from put/offer (which do not
60 * otherwise ordinarily lock takeLock.)
61 */
62 private void signalNotEmpty() {
63 takeLock.lock();
64 try {
65 notEmpty.signal();
66 }
67 finally {
68 takeLock.unlock();
69 }
70 }
71
72 /**
73 * Signal a waiting put. Called only from take/poll.
74 */
75 private void signalNotFull() {
76 putLock.lock();
77 try {
78 notFull.signal();
79 }
80 finally {
81 putLock.unlock();
82 }
83 }
84
85 /**
86 * Create a node and link it and end of queue
87 */
88 private void insert(E x) {
89 last = last.next = new Node<E>(x);
90 }
91
92 /**
93 * Remove a node from head of queue,
94 */
95 private E extract() {
96 Node<E> first = head.next;
97 head = first;
98 E x = (E)first.item;
99 first.item = null;
100 return x;
101 }
102
103 /**
104 * Lock to prevent both puts and takes.
105 */
106 private void fullyLock() {
107 putLock.lock();
108 takeLock.lock();
109 }
110
111 /**
112 * Unlock to allow both puts and takes.
113 */
114 private void fullyUnlock() {
115 takeLock.unlock();
116 putLock.unlock();
117 }
118
119
120 /**
121 * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
122 */
123 public LinkedBlockingQueue() {
124 this(Integer.MAX_VALUE);
125 }
126
127 /**
128 * Creates a LinkedBlockingQueue with the given capacity constraint.
129 */
130 public LinkedBlockingQueue(int capacity) {
131 if (capacity <= 0) throw new IllegalArgumentException();
132 this.capacity = capacity;
133 }
134
135 /**
136 * Creates a LinkedBlockingQueue without an intrinsic capacity
137 * constraint, initially holding the given elements.
138 */
139 public LinkedBlockingQueue(Collection<E> initialElements) {
140 this(Integer.MAX_VALUE);
141 for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
142 add(it.next());
143 }
144
145 public int size() {
146 return count.get();
147 }
148
149 public int remainingCapacity() {
150 return capacity - count.get();
151 }
152
153 public void put(E x) throws InterruptedException {
154 if (x == null) throw new IllegalArgumentException();
155 // Note: convention in all put/take/etc is to preset
156 // local var holding count negative to indicate failure unless set.
157 int c = -1;
158 putLock.lockInterruptibly();
159 try {
160 /*
161 * Note that count is used in wait guard even though it is
162 * not protected by lock. This works because count can
163 * only decrease at this point (all other puts are shut
164 * out by lock), and we (or some other waiting put) are
165 * signalled if it ever changes from
166 * capacity. Similarly for all other uses of count in
167 * other wait guards.
168 */
169 try {
170 while (count.get() == capacity)
171 notFull.await();
172 }
173 catch (InterruptedException ie) {
174 notFull.signal(); // propagate to a non-interrupted thread
175 throw ie;
176 }
177 insert(x);
178 c = count.getAndIncrement();
179 if (c+1 < capacity)
180 notFull.signal();
181 }
182 finally {
183 putLock.unlock();
184 }
185 if (c == 0)
186 signalNotEmpty();
187 }
188
189 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
190 if (x == null) throw new IllegalArgumentException();
191 putLock.lockInterruptibly();
192 long nanos = unit.toNanos(timeout);
193 int c = -1;
194 try {
195 for (;;) {
196 if (count.get() < capacity) {
197 insert(x);
198 c = count.getAndIncrement();
199 if (c+1 < capacity)
200 notFull.signal();
201 break;
202 }
203 if (nanos <= 0)
204 return false;
205 try {
206 nanos = notFull.awaitNanos(nanos);
207 }
208 catch (InterruptedException ie) {
209 notFull.signal(); // propagate to a non-interrupted thread
210 throw ie;
211 }
212 }
213 }
214 finally {
215 putLock.unlock();
216 }
217 if (c == 0)
218 signalNotEmpty();
219 return true;
220 }
221
222 public boolean offer(E x) {
223 if (x == null) throw new IllegalArgumentException();
224 if (count.get() == capacity)
225 return false;
226 putLock.tryLock();
227 int c = -1;
228 try {
229 if (count.get() < capacity) {
230 insert(x);
231 c = count.getAndIncrement();
232 if (c+1 < capacity)
233 notFull.signal();
234 }
235 }
236 finally {
237 putLock.unlock();
238 }
239 if (c == 0)
240 signalNotEmpty();
241 return c >= 0;
242 }
243
244
245 public E take() throws InterruptedException {
246 E x;
247 int c = -1;
248 takeLock.lockInterruptibly();
249 try {
250 try {
251 while (count.get() == 0)
252 notEmpty.await();
253 }
254 catch (InterruptedException ie) {
255 notEmpty.signal(); // propagate to a non-interrupted thread
256 throw ie;
257 }
258
259 x = extract();
260 c = count.getAndDecrement();
261 if (c > 1)
262 notEmpty.signal();
263 }
264 finally {
265 takeLock.unlock();
266 }
267 if (c == capacity)
268 signalNotFull();
269 return x;
270 }
271
272 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
273
274 E x = null;
275 int c = -1;
276 takeLock.lockInterruptibly();
277 long nanos = unit.toNanos(timeout);
278 try {
279 for (;;) {
280 if (count.get() > 0) {
281 x = extract();
282 c = count.getAndDecrement();
283 if (c > 1)
284 notEmpty.signal();
285 break;
286 }
287 if (nanos <= 0)
288 return null;
289 try {
290 nanos = notEmpty.awaitNanos(nanos);
291 }
292 catch (InterruptedException ie) {
293 notEmpty.signal(); // propagate to a non-interrupted thread
294 throw ie;
295 }
296 }
297 }
298 finally {
299 takeLock.unlock();
300 }
301 if (c == capacity)
302 signalNotFull();
303 return x;
304 }
305
306 public E poll() {
307 if (count.get() == 0)
308 return null;
309 E x = null;
310 int c = -1;
311 takeLock.tryLock();
312 try {
313 if (count.get() > 0) {
314 x = extract();
315 c = count.getAndDecrement();
316 if (c > 1)
317 notEmpty.signal();
318 }
319 }
320 finally {
321 takeLock.unlock();
322 }
323 if (c == capacity)
324 signalNotFull();
325 return x;
326 }
327
328
329 public E peek() {
330 if (count.get() == 0)
331 return null;
332 takeLock.tryLock();
333 try {
334 Node<E> first = head.next;
335 if (first == null)
336 return null;
337 else
338 return first.item;
339 }
340 finally {
341 takeLock.unlock();
342 }
343 }
344
345 public boolean remove(Object x) {
346 if (x == null) return false;
347 boolean removed = false;
348 fullyLock();
349 try {
350 Node<E> trail = head;
351 Node<E> p = head.next;
352 while (p != null) {
353 if (x.equals(p.item)) {
354 removed = true;
355 break;
356 }
357 trail = p;
358 p = p.next;
359 }
360 if (removed) {
361 p.item = null;
362 trail.next = p.next;
363 if (count.getAndDecrement() == capacity)
364 notFull.signalAll();
365 }
366 }
367 finally {
368 fullyUnlock();
369 }
370 return removed;
371 }
372
373 public Object[] toArray() {
374 fullyLock();
375 try {
376 int size = count.get();
377 Object[] a = new Object[size];
378 int k = 0;
379 for (Node<E> p = head.next; p != null; p = p.next)
380 a[k++] = p.item;
381 return a;
382 }
383 finally {
384 fullyUnlock();
385 }
386 }
387
388 public <T> T[] toArray(T[] a) {
389 fullyLock();
390 try {
391 int size = count.get();
392 if (a.length < size)
393 a = (T[])java.lang.reflect.Array.newInstance
394 (a.getClass().getComponentType(), size);
395
396 int k = 0;
397 for (Node p = head.next; p != null; p = p.next)
398 a[k++] = (T)p.item;
399 return a;
400 }
401 finally {
402 fullyUnlock();
403 }
404 }
405
406 public String toString() {
407 fullyLock();
408 try {
409 return super.toString();
410 }
411 finally {
412 fullyUnlock();
413 }
414 }
415
416 public Iterator<E> iterator() {
417 return new Itr();
418 }
419
420 private class Itr implements Iterator<E> {
421 /*
422 * Basic weak-consistent iterator. At all times hold the next
423 * item to hand out so that if hasNext() reports true, we will
424 * still have it to return even if lost race with a take etc.
425 */
426 Node<E> current;
427 Node<E> lastRet;
428 E currentElement;
429
430 Itr() {
431 fullyLock();
432 try {
433 current = head.next;
434 if (current != null)
435 currentElement = current.item;
436 }
437 finally {
438 fullyUnlock();
439 }
440 }
441
442 public boolean hasNext() {
443 return current != null;
444 }
445
446 public E next() {
447 fullyLock();
448 try {
449 if (current == null)
450 throw new NoSuchElementException();
451 E x = currentElement;
452 lastRet = current;
453 current = current.next;
454 if (current != null)
455 currentElement = current.item;
456 return x;
457 }
458 finally {
459 fullyUnlock();
460 }
461
462 }
463
464 public void remove() {
465 if (lastRet == null)
466 throw new IllegalStateException();
467 fullyLock();
468 try {
469 Node<E> node = lastRet;
470 lastRet = null;
471 Node<E> trail = head;
472 Node<E> p = head.next;
473 while (p != null && p != node) {
474 trail = p;
475 p = p.next;
476 }
477 if (p == node) {
478 p.item = null;
479 trail.next = p.next;
480 int c = count.getAndDecrement();
481 if (c == capacity)
482 notFull.signalAll();
483 }
484 }
485 finally {
486 fullyUnlock();
487 }
488 }
489 }
490
491 /**
492 * Save the state to a stream (that is, serialize it).
493 *
494 * @serialData The capacity is emitted (int), followed by all of
495 * its elements (each an <tt>Object</tt>) in the proper order,
496 * followed by a null
497 */
498 private void writeObject(java.io.ObjectOutputStream s)
499 throws java.io.IOException {
500
501 fullyLock();
502 try {
503 // Write out any hidden stuff, plus capacity
504 s.defaultWriteObject();
505
506 // Write out all elements in the proper order.
507 for (Node<E> p = head.next; p != null; p = p.next)
508 s.writeObject(p.item);
509
510 // Use trailing null as sentinel
511 s.writeObject(null);
512 }
513 finally {
514 fullyUnlock();
515 }
516 }
517
518 /**
519 * Reconstitute the Queue instance from a stream (that is,
520 * deserialize it).
521 */
522 private void readObject(java.io.ObjectInputStream s)
523 throws java.io.IOException, ClassNotFoundException {
524 // Read in capacity, and any hidden stuff
525 s.defaultReadObject();
526
527 // Read in all elements and place in queue
528 for (;;) {
529 E item = (E)s.readObject();
530 if (item == null)
531 break;
532 add(item);
533 }
534 }
535 }
536