ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRERELEASE_0_1
Changes since 1.1: +509 -34 lines
Log Message:
re-check-in initial implementations

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An unbounded queue based on linked nodes.
13 **/
14 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
15 implements BlockingQueue<E>, java.io.Serializable {
16
17 /*
18 * A variant of the "two lock queue" algorithm. The putLock gates
19 * entry to put (and offer), and has an associated condition for
20 * waiting puts. Similarly for the takeLock. The "count" field
21 * that they both rely on is maintained as an atomic to avoid
22 * needing to get both locks in most cases. Also, to minimize need
23 * for puts to get takeLock and vice-versa, cascading notifies are
24 * used. When a put notices that it has enabled at least one take,
25 * it signals taker. That taker in turn signals others if more
26 * items have been entered since the signal. And symmetrically for
27 * takes signalling puts. Operations such as remove(Object) and
28 * iterators acquire both locks.
29 */
30
31 static class Node<E> {
32 volatile E item;
33 Node<E> next;
34 Node(E x) { item = x; }
35 }
36
37 private final int capacity;
38 private transient final AtomicInteger count = new AtomicInteger(0);
39
40 private transient Node<E> head = new Node<E>(null);
41 private transient Node<E> last = head;
42
43 private transient final ReentrantLock takeLock = new ReentrantLock();
44 private transient final Condition notEmpty = takeLock.newCondition();
45
46 private transient final ReentrantLock putLock = new ReentrantLock();
47 private transient final Condition notFull = putLock.newCondition();
48
49
50 /**
51 * Signal a waiting take. Called only from put/offer (which do not
52 * otherwise ordinarily have takeLock.)
53 */
54 private void signalNotEmpty() {
55 takeLock.lock();
56 try {
57 notEmpty.signal();
58 }
59 finally {
60 takeLock.unlock();
61 }
62 }
63
64 /**
65 * Signal a waiting put. Called only from take/poll.
66 */
67 private void signalNotFull() {
68 putLock.lock();
69 try {
70 notFull.signal();
71 }
72 finally {
73 putLock.unlock();
74 }
75 }
76
77 /**
78 * Create a node and link it and end of queue
79 */
80 private void insert(E x) {
81 last = last.next = new Node<E>(x);
82 }
83
84 /**
85 * Remove a node from head of queue,
86 */
87 private E extract() {
88 Node<E> first = head.next;
89 head = first;
90 E x = (E)first.item;
91 first.item = null;
92 return x;
93 }
94
95 /**
96 * Lock to prevent both puts and takes.
97 */
98 private void fullyLock() {
99 putLock.lock();
100 takeLock.lock();
101 }
102
103 /**
104 * Unlock to allow both puts and takes.
105 */
106 private void fullyUnlock() {
107 takeLock.unlock();
108 putLock.unlock();
109 }
110
111
112 /**
113 * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
114 */
115 public LinkedBlockingQueue() {
116 this(Integer.MAX_VALUE);
117 }
118
119 /**
120 * Creates a LinkedBlockingQueue with the given capacity constraint.
121 */
122 public LinkedBlockingQueue(int capacity) {
123 if (capacity <= 0) throw new IllegalArgumentException();
124 this.capacity = capacity;
125 }
126
127 /**
128 * Creates a LinkedBlockingQueue without an intrinsic capacity
129 * constraint, initially holding the given elements.
130 */
131 public LinkedBlockingQueue(Collection<E> initialElements) {
132 this(Integer.MAX_VALUE);
133 for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
134 add(it.next());
135 }
136
137 public int size() {
138 return count.get();
139 }
140
141 public int remainingCapacity() {
142 return capacity - count.get();
143 }
144
145 public void put(E x) throws InterruptedException {
146 if (x == null) throw new IllegalArgumentException();
147 // Note: convention in all put/take/etc is to preset
148 // local var holding count negative to indicate failure unless set.
149 int c = -1;
150 putLock.lockInterruptibly();
151 try {
152 /*
153 * Note that count is used in wait guard even though it is
154 * not protected by lock. This works because count can
155 * only decrease at this point (all other puts are shut
156 * out by lock), and we (or some other waiting put) are
157 * signalled if it ever changes from
158 * capacity. Similarly for all other uses of count in
159 * other wait guards.
160 */
161 try {
162 while (count.get() == capacity)
163 notFull.await();
164 }
165 catch (InterruptedException ie) {
166 notFull.signal(); // propagate to a non-interrupted thread
167 throw ie;
168 }
169 insert(x);
170 c = count.getAndIncrement();
171 if (c+1 < capacity)
172 notFull.signal();
173 }
174 finally {
175 putLock.unlock();
176 }
177 if (c == 0)
178 signalNotEmpty();
179 }
180
181 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
182 if (x == null) throw new IllegalArgumentException();
183 putLock.lockInterruptibly();
184 long nanos = unit.toNanos(timeout);
185 int c = -1;
186 try {
187 for (;;) {
188 if (count.get() < capacity) {
189 insert(x);
190 c = count.getAndIncrement();
191 if (c+1 < capacity)
192 notFull.signal();
193 break;
194 }
195 if (nanos <= 0)
196 return false;
197 try {
198 nanos = notFull.awaitNanos(nanos);
199 }
200 catch (InterruptedException ie) {
201 notFull.signal(); // propagate to a non-interrupted thread
202 throw ie;
203 }
204 }
205 }
206 finally {
207 putLock.unlock();
208 }
209 if (c == 0)
210 signalNotEmpty();
211 return true;
212 }
213
214 public boolean offer(E x) {
215 if (x == null) throw new IllegalArgumentException();
216 if (count.get() == capacity)
217 return false;
218 putLock.tryLock();
219 int c = -1;
220 try {
221 if (count.get() < capacity) {
222 insert(x);
223 c = count.getAndIncrement();
224 if (c+1 < capacity)
225 notFull.signal();
226 }
227 }
228 finally {
229 putLock.unlock();
230 }
231 if (c == 0)
232 signalNotEmpty();
233 return c >= 0;
234 }
235
236
237 public E take() throws InterruptedException {
238 E x;
239 int c = -1;
240 takeLock.lockInterruptibly();
241 try {
242 try {
243 while (count.get() == 0)
244 notEmpty.await();
245 }
246 catch (InterruptedException ie) {
247 notEmpty.signal(); // propagate to a non-interrupted thread
248 throw ie;
249 }
250
251 x = extract();
252 c = count.getAndDecrement();
253 if (c > 1)
254 notEmpty.signal();
255 }
256 finally {
257 takeLock.unlock();
258 }
259 if (c == capacity)
260 signalNotFull();
261 return x;
262 }
263
264 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
265
266 E x = null;
267 int c = -1;
268 takeLock.lockInterruptibly();
269 long nanos = unit.toNanos(timeout);
270 try {
271 for (;;) {
272 if (count.get() > 0) {
273 x = extract();
274 c = count.getAndDecrement();
275 if (c > 1)
276 notEmpty.signal();
277 break;
278 }
279 if (nanos <= 0)
280 return null;
281 try {
282 nanos = notEmpty.awaitNanos(nanos);
283 }
284 catch (InterruptedException ie) {
285 notEmpty.signal(); // propagate to a non-interrupted thread
286 throw ie;
287 }
288 }
289 }
290 finally {
291 takeLock.unlock();
292 }
293 if (c == capacity)
294 signalNotFull();
295 return x;
296 }
297
298 public E poll() {
299 if (count.get() == 0)
300 return null;
301 E x = null;
302 int c = -1;
303 takeLock.tryLock();
304 try {
305 if (count.get() > 0) {
306 x = extract();
307 c = count.getAndDecrement();
308 if (c > 1)
309 notEmpty.signal();
310 }
311 }
312 finally {
313 takeLock.unlock();
314 }
315 if (c == capacity)
316 signalNotFull();
317 return x;
318 }
319
320
321 public E peek() {
322 if (count.get() == 0)
323 return null;
324 takeLock.tryLock();
325 try {
326 Node<E> first = head.next;
327 if (first == null)
328 return null;
329 else
330 return first.item;
331 }
332 finally {
333 takeLock.unlock();
334 }
335 }
336
337 public boolean remove(Object x) {
338 if (x == null) return false;
339 boolean removed = false;
340 fullyLock();
341 try {
342 Node<E> trail = head;
343 Node<E> p = head.next;
344 while (p != null) {
345 if (x.equals(p.item)) {
346 removed = true;
347 break;
348 }
349 trail = p;
350 p = p.next;
351 }
352 if (removed) {
353 p.item = null;
354 trail.next = p.next;
355 if (count.getAndDecrement() == capacity)
356 notFull.signalAll();
357 }
358 }
359 finally {
360 fullyUnlock();
361 }
362 return removed;
363 }
364
365 public Object[] toArray() {
366 fullyLock();
367 try {
368 int size = count.get();
369 Object[] a = new Object[size];
370 int k = 0;
371 for (Node<E> p = head.next; p != null; p = p.next)
372 a[k++] = p.item;
373 return a;
374 }
375 finally {
376 fullyUnlock();
377 }
378 }
379
380 public <T> T[] toArray(T[] a) {
381 fullyLock();
382 try {
383 int size = count.get();
384 if (a.length < size)
385 a = (T[])java.lang.reflect.Array.newInstance(
386 a.getClass().getComponentType(), size);
387
388 int k = 0;
389 for (Node p = head.next; p != null; p = p.next)
390 a[k++] = (T)p.item;
391 return a;
392 }
393 finally {
394 fullyUnlock();
395 }
396 }
397
398 public String toString() {
399 fullyLock();
400 try {
401 return super.toString();
402 }
403 finally {
404 fullyUnlock();
405 }
406 }
407
408 public Iterator<E> iterator() {
409 return new Itr();
410 }
411
412 private class Itr implements Iterator<E> {
413 Node<E> current;
414 Node<E> lastRet;
415
416 // for comodification checks
417 Node<E> expectedHead;
418 Node<E> expectedLast;
419 int expectedCount;
420
421 Itr() {
422 fullyLock();
423 try {
424 expectedHead = head;
425 current = head.next;
426 expectedLast = last;
427 expectedCount = count.get();
428 }
429 finally {
430 fullyUnlock();
431 }
432 }
433
434 public boolean hasNext() {
435 return current != null;
436 }
437
438 private void checkForModification() {
439 if (expectedHead != head ||
440 expectedLast != last ||
441 expectedCount != count.get())
442 throw new ConcurrentModificationException();
443 }
444
445 public E next() {
446 fullyLock();
447 try {
448 if (current == null)
449 throw new NoSuchElementException();
450 checkForModification();
451 E x = current.item;
452 lastRet = current;
453 current = current.next;
454 return x;
455 }
456 finally {
457 fullyUnlock();
458 }
459
460 }
461
462 public void remove() {
463 if (lastRet == null)
464 throw new IllegalStateException();
465 fullyLock();
466 try {
467 checkForModification();
468 Node<E> node = lastRet;
469 lastRet = null;
470 Node<E> trail = head;
471 Node<E> p = head.next;
472 while (p != null && p != node) {
473 trail = p;
474 p = p.next;
475 }
476 if (p == node) {
477 p.item = null;
478 trail.next = p.next;
479 int c = count.getAndDecrement();
480 expectedHead = head;
481 expectedLast = last;
482 expectedCount = c;
483 if (c == capacity)
484 notFull.signalAll();
485 }
486 }
487 finally {
488 fullyUnlock();
489 }
490 }
491 }
492
493 /**
494 * Save the state to a stream (that is, serialize it).
495 *
496 * @serialData The capacity is emitted (int), followed by all of
497 * its elements (each an <tt>Object</tt>) in the proper order,
498 * followed by a null
499 */
500 private void writeObject(java.io.ObjectOutputStream s)
501 throws java.io.IOException {
502
503 fullyLock();
504 try {
505 // Write out any hidden stuff, plus capacity
506 s.defaultWriteObject();
507
508 // Write out all elements in the proper order.
509 for (Node<E> p = head.next; p != null; p = p.next)
510 s.writeObject(p.item);
511
512 // Use trailing null as sentinel
513 s.writeObject(null);
514 }
515 finally {
516 fullyUnlock();
517 }
518 }
519
520 /**
521 * Reconstitute the Queue instance from a stream (that is,
522 * deserialize it).
523 */
524 private void readObject(java.io.ObjectInputStream s)
525 throws java.io.IOException, ClassNotFoundException {
526 // Read in capacity, and any hidden stuff
527 s.defaultReadObject();
528
529 // Read in all elements and place in queue
530 for (;;) {
531 E item = (E)s.readObject();
532 if (item == null)
533 break;
534 add(item);
535 }
536 }
537 }
538