ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.7
Committed: Tue Jul 8 00:46:34 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2
Changes since 1.6: +1 -0 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded blocking queue based on linked nodes. Linked
14 * queues typically have higher throughput than array-based queues but
15 * less predicatble performance in most concurrent applications.
16 *
17 * <p> The optional capacity bound constructor argument serves as a
18 * way to prevent unlmited queue expansion. Linked nodes are
19 * dynamically created upon each insertion unless this would bring the
20 * queue above capacity.
21 * @since 1.5
22 * @author Doug Lea
23 *
24 **/
25 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
26 implements BlockingQueue<E>, java.io.Serializable {
27
28 /*
29 * A variant of the "two lock queue" algorithm. The putLock gates
30 * entry to put (and offer), and has an associated condition for
31 * waiting puts. Similarly for the takeLock. The "count" field
32 * that they both rely on is maintained as an atomic to avoid
33 * needing to get both locks in most cases. Also, to minimize need
34 * for puts to get takeLock and vice-versa, cascading notifies are
35 * used. When a put notices that it has enabled at least one take,
36 * it signals taker. That taker in turn signals others if more
37 * items have been entered since the signal. And symmetrically for
38 * takes signalling puts. Operations such as remove(Object) and
39 * iterators acquire both locks.
40 */
41
42 /**
43 * Linked list node class
44 */
45 static class Node<E> {
46 /** The item, volatile to ensure barrier separating write and read */
47 volatile E item;
48 Node<E> next;
49 Node(E x) { item = x; }
50 }
51
52 /** The capacity bound, or Integer.MAX_VALUE if none */
53 private final int capacity;
54
55 /** Current number of elements */
56 private transient final AtomicInteger count = new AtomicInteger(0);
57
58 /** Head of linked list */
59 private transient Node<E> head;
60
61 /** Tail of lined list */
62 private transient Node<E> last;
63
64 /** Lock held by take, poll, etc */
65 private final ReentrantLock takeLock = new ReentrantLock();
66
67 /** Wait queue for waiting takes */
68 private final Condition notEmpty = takeLock.newCondition();
69
70 /** Lock held by put, offer, etc */
71 private final ReentrantLock putLock = new ReentrantLock();
72
73 /** Wait queue for waiting puts */
74 private final Condition notFull = putLock.newCondition();
75
76 /**
77 * Signal a waiting take. Called only from put/offer (which do not
78 * otherwise ordinarily lock takeLock.)
79 */
80 private void signalNotEmpty() {
81 takeLock.lock();
82 try {
83 notEmpty.signal();
84 }
85 finally {
86 takeLock.unlock();
87 }
88 }
89
90 /**
91 * Signal a waiting put. Called only from take/poll.
92 */
93 private void signalNotFull() {
94 putLock.lock();
95 try {
96 notFull.signal();
97 }
98 finally {
99 putLock.unlock();
100 }
101 }
102
103 /**
104 * Create a node and link it and end of queue
105 * @param x the item
106 */
107 private void insert(E x) {
108 last = last.next = new Node<E>(x);
109 }
110
111 /**
112 * Remove a node from head of queue,
113 * @return the node
114 */
115 private E extract() {
116 Node<E> first = head.next;
117 head = first;
118 E x = (E)first.item;
119 first.item = null;
120 return x;
121 }
122
123 /**
124 * Lock to prevent both puts and takes.
125 */
126 private void fullyLock() {
127 putLock.lock();
128 takeLock.lock();
129 }
130
131 /**
132 * Unlock to allow both puts and takes.
133 */
134 private void fullyUnlock() {
135 takeLock.unlock();
136 putLock.unlock();
137 }
138
139
140 /**
141 * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
142 */
143 public LinkedBlockingQueue() {
144 this(Integer.MAX_VALUE);
145 }
146
147 /**
148 * Creates a LinkedBlockingQueue with the given capacity constraint.
149 * @param capacity the maminum number of elements to hold without blocking.
150 */
151 public LinkedBlockingQueue(int capacity) {
152 if (capacity <= 0) throw new NullPointerException();
153 this.capacity = capacity;
154 last = head = new Node<E>(null);
155 }
156
157 /**
158 * Creates a LinkedBlockingQueue without an intrinsic capacity
159 * constraint, initially holding the given elements, added in
160 * traveral order of the collection's iterator.
161 * @param initialElements the elements to initially contain
162 */
163 public LinkedBlockingQueue(Collection<E> initialElements) {
164 this(Integer.MAX_VALUE);
165 for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
166 add(it.next());
167 }
168
169 public int size() {
170 return count.get();
171 }
172
173 public int remainingCapacity() {
174 return capacity - count.get();
175 }
176
177 public void put(E x) throws InterruptedException {
178 if (x == null) throw new NullPointerException();
179 // Note: convention in all put/take/etc is to preset
180 // local var holding count negative to indicate failure unless set.
181 int c = -1;
182 putLock.lockInterruptibly();
183 try {
184 /*
185 * Note that count is used in wait guard even though it is
186 * not protected by lock. This works because count can
187 * only decrease at this point (all other puts are shut
188 * out by lock), and we (or some other waiting put) are
189 * signalled if it ever changes from
190 * capacity. Similarly for all other uses of count in
191 * other wait guards.
192 */
193 try {
194 while (count.get() == capacity)
195 notFull.await();
196 }
197 catch (InterruptedException ie) {
198 notFull.signal(); // propagate to a non-interrupted thread
199 throw ie;
200 }
201 insert(x);
202 c = count.getAndIncrement();
203 if (c + 1 < capacity)
204 notFull.signal();
205 }
206 finally {
207 putLock.unlock();
208 }
209 if (c == 0)
210 signalNotEmpty();
211 }
212
213 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
214 if (x == null) throw new NullPointerException();
215 putLock.lockInterruptibly();
216 long nanos = unit.toNanos(timeout);
217 int c = -1;
218 try {
219 for (;;) {
220 if (count.get() < capacity) {
221 insert(x);
222 c = count.getAndIncrement();
223 if (c + 1 < capacity)
224 notFull.signal();
225 break;
226 }
227 if (nanos <= 0)
228 return false;
229 try {
230 nanos = notFull.awaitNanos(nanos);
231 }
232 catch (InterruptedException ie) {
233 notFull.signal(); // propagate to a non-interrupted thread
234 throw ie;
235 }
236 }
237 }
238 finally {
239 putLock.unlock();
240 }
241 if (c == 0)
242 signalNotEmpty();
243 return true;
244 }
245
246 public boolean offer(E x) {
247 if (x == null) throw new NullPointerException();
248 if (count.get() == capacity)
249 return false;
250 putLock.tryLock();
251 int c = -1;
252 try {
253 if (count.get() < capacity) {
254 insert(x);
255 c = count.getAndIncrement();
256 if (c + 1 < capacity)
257 notFull.signal();
258 }
259 }
260 finally {
261 putLock.unlock();
262 }
263 if (c == 0)
264 signalNotEmpty();
265 return c >= 0;
266 }
267
268
269 public E take() throws InterruptedException {
270 E x;
271 int c = -1;
272 takeLock.lockInterruptibly();
273 try {
274 try {
275 while (count.get() == 0)
276 notEmpty.await();
277 }
278 catch (InterruptedException ie) {
279 notEmpty.signal(); // propagate to a non-interrupted thread
280 throw ie;
281 }
282
283 x = extract();
284 c = count.getAndDecrement();
285 if (c > 1)
286 notEmpty.signal();
287 }
288 finally {
289 takeLock.unlock();
290 }
291 if (c == capacity)
292 signalNotFull();
293 return x;
294 }
295
296 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
297 E x = null;
298 int c = -1;
299 takeLock.lockInterruptibly();
300 long nanos = unit.toNanos(timeout);
301 try {
302 for (;;) {
303 if (count.get() > 0) {
304 x = extract();
305 c = count.getAndDecrement();
306 if (c > 1)
307 notEmpty.signal();
308 break;
309 }
310 if (nanos <= 0)
311 return null;
312 try {
313 nanos = notEmpty.awaitNanos(nanos);
314 }
315 catch (InterruptedException ie) {
316 notEmpty.signal(); // propagate to a non-interrupted thread
317 throw ie;
318 }
319 }
320 }
321 finally {
322 takeLock.unlock();
323 }
324 if (c == capacity)
325 signalNotFull();
326 return x;
327 }
328
329 public E poll() {
330 if (count.get() == 0)
331 return null;
332 E x = null;
333 int c = -1;
334 takeLock.tryLock();
335 try {
336 if (count.get() > 0) {
337 x = extract();
338 c = count.getAndDecrement();
339 if (c > 1)
340 notEmpty.signal();
341 }
342 }
343 finally {
344 takeLock.unlock();
345 }
346 if (c == capacity)
347 signalNotFull();
348 return x;
349 }
350
351
352 public E peek() {
353 if (count.get() == 0)
354 return null;
355 takeLock.tryLock();
356 try {
357 Node<E> first = head.next;
358 if (first == null)
359 return null;
360 else
361 return first.item;
362 }
363 finally {
364 takeLock.unlock();
365 }
366 }
367
368 public boolean remove(Object x) {
369 if (x == null) return false;
370 boolean removed = false;
371 fullyLock();
372 try {
373 Node<E> trail = head;
374 Node<E> p = head.next;
375 while (p != null) {
376 if (x.equals(p.item)) {
377 removed = true;
378 break;
379 }
380 trail = p;
381 p = p.next;
382 }
383 if (removed) {
384 p.item = null;
385 trail.next = p.next;
386 if (count.getAndDecrement() == capacity)
387 notFull.signalAll();
388 }
389 }
390 finally {
391 fullyUnlock();
392 }
393 return removed;
394 }
395
396 public Object[] toArray() {
397 fullyLock();
398 try {
399 int size = count.get();
400 Object[] a = new Object[size];
401 int k = 0;
402 for (Node<E> p = head.next; p != null; p = p.next)
403 a[k++] = p.item;
404 return a;
405 }
406 finally {
407 fullyUnlock();
408 }
409 }
410
411 public <T> T[] toArray(T[] a) {
412 fullyLock();
413 try {
414 int size = count.get();
415 if (a.length < size)
416 a = (T[])java.lang.reflect.Array.newInstance
417 (a.getClass().getComponentType(), size);
418
419 int k = 0;
420 for (Node p = head.next; p != null; p = p.next)
421 a[k++] = (T)p.item;
422 return a;
423 }
424 finally {
425 fullyUnlock();
426 }
427 }
428
429 public String toString() {
430 fullyLock();
431 try {
432 return super.toString();
433 }
434 finally {
435 fullyUnlock();
436 }
437 }
438
439 public Iterator<E> iterator() {
440 return new Itr();
441 }
442
443 private class Itr implements Iterator<E> {
444 /*
445 * Basic weak-consistent iterator. At all times hold the next
446 * item to hand out so that if hasNext() reports true, we will
447 * still have it to return even if lost race with a take etc.
448 */
449 Node<E> current;
450 Node<E> lastRet;
451 E currentElement;
452
453 Itr() {
454 fullyLock();
455 try {
456 current = head.next;
457 if (current != null)
458 currentElement = current.item;
459 }
460 finally {
461 fullyUnlock();
462 }
463 }
464
465 public boolean hasNext() {
466 return current != null;
467 }
468
469 public E next() {
470 fullyLock();
471 try {
472 if (current == null)
473 throw new NoSuchElementException();
474 E x = currentElement;
475 lastRet = current;
476 current = current.next;
477 if (current != null)
478 currentElement = current.item;
479 return x;
480 }
481 finally {
482 fullyUnlock();
483 }
484
485 }
486
487 public void remove() {
488 if (lastRet == null)
489 throw new IllegalStateException();
490 fullyLock();
491 try {
492 Node<E> node = lastRet;
493 lastRet = null;
494 Node<E> trail = head;
495 Node<E> p = head.next;
496 while (p != null && p != node) {
497 trail = p;
498 p = p.next;
499 }
500 if (p == node) {
501 p.item = null;
502 trail.next = p.next;
503 int c = count.getAndDecrement();
504 if (c == capacity)
505 notFull.signalAll();
506 }
507 }
508 finally {
509 fullyUnlock();
510 }
511 }
512 }
513
514 /**
515 * Save the state to a stream (that is, serialize it).
516 *
517 * @serialData The capacity is emitted (int), followed by all of
518 * its elements (each an <tt>Object</tt>) in the proper order,
519 * followed by a null
520 * @param s the stream
521 */
522 private void writeObject(java.io.ObjectOutputStream s)
523 throws java.io.IOException {
524
525 fullyLock();
526 try {
527 // Write out any hidden stuff, plus capacity
528 s.defaultWriteObject();
529
530 // Write out all elements in the proper order.
531 for (Node<E> p = head.next; p != null; p = p.next)
532 s.writeObject(p.item);
533
534 // Use trailing null as sentinel
535 s.writeObject(null);
536 }
537 finally {
538 fullyUnlock();
539 }
540 }
541
542 /**
543 * Reconstitute the Queue instance from a stream (that is,
544 * deserialize it).
545 * @param s the stream
546 */
547 private void readObject(java.io.ObjectInputStream s)
548 throws java.io.IOException, ClassNotFoundException {
549 // Read in capacity, and any hidden stuff
550 s.defaultReadObject();
551
552 // Read in all elements and place in queue
553 for (;;) {
554 E item = (E)s.readObject();
555 if (item == null)
556 break;
557 add(item);
558 }
559 }
560 }
561