ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingDeque.java
Revision: 1.5
Committed: Mon May 2 03:16:16 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.4: +1 -1 lines
Log Message:
first sentence; third person

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9 import java.util.concurrent.locks.*;
10
11 /**
12 * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
13 * linked nodes.
14 *
15 * <p> The optional capacity bound constructor argument serves as a
16 * way to prevent excessive expansion. The capacity, if unspecified,
17 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
18 * dynamically created upon each insertion unless this would bring the
19 * deque above capacity.
20 *
21 * <p>Most operations run in constant time (ignoring time spent
22 * blocking). Exceptions include {@link #remove(Object) remove},
23 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
24 * #removeLastOccurrence removeLastOccurrence}, {@link #contains
25 * contains }, {@link #iterator iterator.remove()}, and the bulk
26 * operations, all of which run in linear time.
27 *
28 * <p>This class and its iterator implement all of the
29 * <em>optional</em> methods of the {@link Collection} and {@link
30 * Iterator} interfaces. This class is a member of the <a
31 * href="{@docRoot}/../guide/collections/index.html"> Java Collections
32 * Framework</a>.
33 *
34 * @since 1.6
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class LinkedBlockingDeque<E>
39 extends AbstractQueue<E>
40 implements BlockingDeque<E>, java.io.Serializable {
41
42 /*
43 * Implemented as a simple doubly-linked list protected by a
44 * single lock and using conditions to manage blocking.
45 */
46
47 private static final long serialVersionUID = -387911632671998426L;
48
49 /** Doubly-linked list node class */
50 static final class Node<E> {
51 E item;
52 Node<E> prev;
53 Node<E> next;
54 Node(E x, Node<E> p, Node<E> n) {
55 item = x;
56 prev = p;
57 next = n;
58 }
59 }
60
61 /** Pointer to first node */
62 private transient Node<E> first;
63 /** Pointer to last node */
64 private transient Node<E> last;
65 /** Number of items in the deque */
66 private transient int count;
67 /** Maximum number of items in the deque */
68 private final int capacity;
69 /** Main lock guarding all access */
70 private final ReentrantLock lock = new ReentrantLock();
71 /** Condition for waiting takes */
72 private final Condition notEmpty = lock.newCondition();
73 /** Condition for waiting puts */
74 private final Condition notFull = lock.newCondition();
75
76 /**
77 * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
78 * {@link Integer#MAX_VALUE}.
79 */
80 public LinkedBlockingDeque() {
81 this(Integer.MAX_VALUE);
82 }
83
84 /**
85 * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed)
86 * capacity.
87 * @param capacity the capacity of this deque
88 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
89 */
90 public LinkedBlockingDeque(int capacity) {
91 if (capacity <= 0) throw new IllegalArgumentException();
92 this.capacity = capacity;
93 }
94
95 /**
96 * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
97 * {@link Integer#MAX_VALUE}, initially containing the elements of the
98 * given collection,
99 * added in traversal order of the collection's iterator.
100 * @param c the collection of elements to initially contain
101 * @throws NullPointerException if <tt>c</tt> or any element within it
102 * is <tt>null</tt>.
103 */
104 public LinkedBlockingDeque(Collection<? extends E> c) {
105 this(Integer.MAX_VALUE);
106 for (E e : c)
107 add(e);
108 }
109
110
111 // Basic linking and unlinking operations, called only while holding lock
112
113 /**
114 * Links e as first element, or returns false if full.
115 */
116 private boolean linkFirst(E e) {
117 if (count >= capacity)
118 return false;
119 ++count;
120 Node<E> f = first;
121 Node<E> x = new Node<E>(e, null, f);
122 first = x;
123 if (last == null)
124 last = x;
125 else
126 f.prev = x;
127 notEmpty.signal();
128 return true;
129 }
130
131 /**
132 * Links e as last element, or returns false if full.
133 */
134 private boolean linkLast(E e) {
135 if (count >= capacity)
136 return false;
137 ++count;
138 Node<E> l = last;
139 Node<E> x = new Node<E>(e, l, null);
140 last = x;
141 if (first == null)
142 first = x;
143 else
144 l.next = x;
145 notEmpty.signal();
146 return true;
147 }
148
149 /**
150 * Removes and returns first element, or null if empty.
151 */
152 private E unlinkFirst() {
153 Node<E> f = first;
154 if (f == null)
155 return null;
156 Node<E> n = f.next;
157 first = n;
158 if (n == null)
159 last = null;
160 else
161 n.prev = null;
162 --count;
163 notFull.signal();
164 return f.item;
165 }
166
167 /**
168 * Removes and returns last element, or null if empty.
169 */
170 private E unlinkLast() {
171 Node<E> l = last;
172 if (l == null)
173 return null;
174 Node<E> p = l.prev;
175 last = p;
176 if (p == null)
177 first = null;
178 else
179 p.next = null;
180 --count;
181 notFull.signal();
182 return l.item;
183 }
184
185 /**
186 * Unlink e
187 */
188 private void unlink(Node<E> x) {
189 Node<E> p = x.prev;
190 Node<E> n = x.next;
191 if (p == null) {
192 if (n == null)
193 first = last = null;
194 else {
195 n.prev = null;
196 first = n;
197 }
198 } else if (n == null) {
199 p.next = null;
200 last = p;
201 } else {
202 p.next = n;
203 n.prev = p;
204 }
205 --count;
206 notFull.signalAll();
207 }
208
209 // Deque methods
210
211 public boolean offerFirst(E o) {
212 if (o == null) throw new NullPointerException();
213 lock.lock();
214 try {
215 return linkFirst(o);
216 } finally {
217 lock.unlock();
218 }
219 }
220
221 public boolean offerLast(E o) {
222 if (o == null) throw new NullPointerException();
223 lock.lock();
224 try {
225 return linkLast(o);
226 } finally {
227 lock.unlock();
228 }
229 }
230
231 public void addFirst(E e) {
232 if (!offerFirst(e))
233 throw new IllegalStateException("Deque full");
234 }
235
236 public void addLast(E e) {
237 if (!offerLast(e))
238 throw new IllegalStateException("Deque full");
239 }
240
241 public E pollFirst() {
242 lock.lock();
243 try {
244 return unlinkFirst();
245 } finally {
246 lock.unlock();
247 }
248 }
249
250 public E pollLast() {
251 lock.lock();
252 try {
253 return unlinkLast();
254 } finally {
255 lock.unlock();
256 }
257 }
258
259 public E removeFirst() {
260 E x = pollFirst();
261 if (x == null) throw new NoSuchElementException();
262 return x;
263 }
264
265 public E removeLast() {
266 E x = pollLast();
267 if (x == null) throw new NoSuchElementException();
268 return x;
269 }
270
271 public E peekFirst() {
272 lock.lock();
273 try {
274 return (first == null) ? null : first.item;
275 } finally {
276 lock.unlock();
277 }
278 }
279
280 public E peekLast() {
281 lock.lock();
282 try {
283 return (last == null) ? null : last.item;
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 public E getFirst() {
290 E x = peekFirst();
291 if (x == null) throw new NoSuchElementException();
292 return x;
293 }
294
295 public E getLast() {
296 E x = peekLast();
297 if (x == null) throw new NoSuchElementException();
298 return x;
299 }
300
301 // BlockingDeque methods
302
303 public void putFirst(E o) throws InterruptedException {
304 if (o == null) throw new NullPointerException();
305 lock.lock();
306 try {
307 while (!linkFirst(o))
308 notFull.await();
309 } finally {
310 lock.unlock();
311 }
312 }
313
314 public void putLast(E o) throws InterruptedException {
315 if (o == null) throw new NullPointerException();
316 lock.lock();
317 try {
318 while (!linkLast(o))
319 notFull.await();
320 } finally {
321 lock.unlock();
322 }
323 }
324
325 public E takeFirst() throws InterruptedException {
326 lock.lock();
327 try {
328 E x;
329 while ( (x = unlinkFirst()) == null)
330 notEmpty.await();
331 return x;
332 } finally {
333 lock.unlock();
334 }
335 }
336
337 public E takeLast() throws InterruptedException {
338 lock.lock();
339 try {
340 E x;
341 while ( (x = unlinkLast()) == null)
342 notEmpty.await();
343 return x;
344 } finally {
345 lock.unlock();
346 }
347 }
348
349 public boolean offerFirst(E o, long timeout, TimeUnit unit)
350 throws InterruptedException {
351 if (o == null) throw new NullPointerException();
352 lock.lockInterruptibly();
353 try {
354 long nanos = unit.toNanos(timeout);
355 for (;;) {
356 if (linkFirst(o))
357 return true;
358 if (nanos <= 0)
359 return false;
360 nanos = notFull.awaitNanos(nanos);
361 }
362 } finally {
363 lock.unlock();
364 }
365 }
366
367 public boolean offerLast(E o, long timeout, TimeUnit unit)
368 throws InterruptedException {
369 if (o == null) throw new NullPointerException();
370 lock.lockInterruptibly();
371 try {
372 long nanos = unit.toNanos(timeout);
373 for (;;) {
374 if (linkLast(o))
375 return true;
376 if (nanos <= 0)
377 return false;
378 nanos = notFull.awaitNanos(nanos);
379 }
380 } finally {
381 lock.unlock();
382 }
383 }
384
385 public E pollFirst(long timeout, TimeUnit unit)
386 throws InterruptedException {
387 lock.lockInterruptibly();
388 try {
389 long nanos = unit.toNanos(timeout);
390 for (;;) {
391 E x = unlinkFirst();
392 if (x != null)
393 return x;
394 if (nanos <= 0)
395 return null;
396 nanos = notEmpty.awaitNanos(nanos);
397 }
398 } finally {
399 lock.unlock();
400 }
401 }
402
403 public E pollLast(long timeout, TimeUnit unit)
404 throws InterruptedException {
405 lock.lockInterruptibly();
406 try {
407 long nanos = unit.toNanos(timeout);
408 for (;;) {
409 E x = unlinkLast();
410 if (x != null)
411 return x;
412 if (nanos <= 0)
413 return null;
414 nanos = notEmpty.awaitNanos(nanos);
415 }
416 } finally {
417 lock.unlock();
418 }
419 }
420
421 // Queue and stack methods
422
423 public boolean offer(E e) { return offerLast(e); }
424 public boolean add(E e) { addLast(e); return true; }
425 public void push(E e) { addFirst(e); }
426 public E poll() { return pollFirst(); }
427 public E remove() { return removeFirst(); }
428 public E pop() { return removeFirst(); }
429 public E peek() { return peekFirst(); }
430 public E element() { return getFirst(); }
431 public boolean remove(Object o) { return removeFirstOccurrence(o); }
432
433 // BlockingQueue methods
434
435 public void put(E o) throws InterruptedException { putLast(o); }
436 public E take() throws InterruptedException { return takeFirst(); }
437 public boolean offer(E o, long timeout, TimeUnit unit)
438 throws InterruptedException { return offerLast(o, timeout, unit); }
439 public E poll(long timeout, TimeUnit unit)
440 throws InterruptedException { return pollFirst(timeout, unit); }
441
442 /**
443 * Returns the number of elements in this deque.
444 *
445 * @return the number of elements in this deque.
446 */
447 public int size() {
448 lock.lock();
449 try {
450 return count;
451 } finally {
452 lock.unlock();
453 }
454 }
455
456 /**
457 * Returns the number of additional elements that this deque can ideally
458 * (in the absence of memory or resource constraints) accept without
459 * blocking. This is always equal to the initial capacity of this deque
460 * less the current <tt>size</tt> of this deque.
461 *
462 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
463 * an element will succeed by inspecting <tt>remainingCapacity</tt>
464 * because it may be the case that another thread is about to
465 * <tt>put</tt> or <tt>take</tt> an element.
466 */
467 public int remainingCapacity() {
468 lock.lock();
469 try {
470 return capacity - count;
471 } finally {
472 lock.unlock();
473 }
474 }
475
476 public boolean contains(Object o) {
477 if (o == null) return false;
478 lock.lock();
479 try {
480 for (Node<E> p = first; p != null; p = p.next)
481 if (o.equals(p.item))
482 return true;
483 return false;
484 } finally {
485 lock.unlock();
486 }
487 }
488
489 public boolean removeFirstOccurrence(Object e) {
490 if (e == null) throw new NullPointerException();
491 lock.lock();
492 try {
493 for (Node<E> p = first; p != null; p = p.next) {
494 if (e.equals(p.item)) {
495 unlink(p);
496 return true;
497 }
498 }
499 return false;
500 } finally {
501 lock.unlock();
502 }
503 }
504
505 public boolean removeLastOccurrence(Object e) {
506 if (e == null) throw new NullPointerException();
507 lock.lock();
508 try {
509 for (Node<E> p = last; p != null; p = p.prev) {
510 if (e.equals(p.item)) {
511 unlink(p);
512 return true;
513 }
514 }
515 return false;
516 } finally {
517 lock.unlock();
518 }
519 }
520
521 /**
522 * Variant of removeFirstOccurrence needed by iterator.remove.
523 * Searches for the node, not its contents.
524 */
525 boolean removeNode(Node<E> e) {
526 lock.lock();
527 try {
528 for (Node<E> p = first; p != null; p = p.next) {
529 if (p == e) {
530 unlink(p);
531 return true;
532 }
533 }
534 return false;
535 } finally {
536 lock.unlock();
537 }
538 }
539
540 public Object[] toArray() {
541 lock.lock();
542 try {
543 Object[] a = new Object[count];
544 int k = 0;
545 for (Node<E> p = first; p != null; p = p.next)
546 a[k++] = p.item;
547 return a;
548 } finally {
549 lock.unlock();
550 }
551 }
552
553 public <T> T[] toArray(T[] a) {
554 lock.lock();
555 try {
556 if (a.length < count)
557 a = (T[])java.lang.reflect.Array.newInstance(
558 a.getClass().getComponentType(),
559 count
560 );
561
562 int k = 0;
563 for (Node<E> p = first; p != null; p = p.next)
564 a[k++] = (T)p.item;
565 if (a.length > k)
566 a[k] = null;
567 return a;
568 } finally {
569 lock.unlock();
570 }
571 }
572
573 public String toString() {
574 lock.lock();
575 try {
576 return super.toString();
577 } finally {
578 lock.unlock();
579 }
580 }
581
582 /**
583 * Atomically removes all of the elements from this deque.
584 * The deque will be empty after this call returns.
585 */
586 public void clear() {
587 lock.lock();
588 try {
589 first = last = null;
590 count = 0;
591 notFull.signalAll();
592 } finally {
593 lock.unlock();
594 }
595 }
596
597 public int drainTo(Collection<? super E> c) {
598 if (c == null)
599 throw new NullPointerException();
600 if (c == this)
601 throw new IllegalArgumentException();
602 lock.lock();
603 try {
604 for (Node<E> p = first; p != null; p = p.next)
605 c.add(p.item);
606 int n = count;
607 count = 0;
608 first = last = null;
609 notFull.signalAll();
610 return n;
611 } finally {
612 lock.unlock();
613 }
614 }
615
616 public int drainTo(Collection<? super E> c, int maxElements) {
617 if (c == null)
618 throw new NullPointerException();
619 if (c == this)
620 throw new IllegalArgumentException();
621 lock.lock();
622 try {
623 int n = 0;
624 while (n < maxElements && first != null) {
625 c.add(first.item);
626 first.prev = null;
627 first = first.next;
628 --count;
629 ++n;
630 }
631 if (first == null)
632 last = null;
633 notFull.signalAll();
634 return n;
635 } finally {
636 lock.unlock();
637 }
638 }
639
640 /**
641 * Returns an iterator over the elements in this deque in proper sequence.
642 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
643 * will never throw {@link ConcurrentModificationException},
644 * and guarantees to traverse elements as they existed upon
645 * construction of the iterator, and may (but is not guaranteed to)
646 * reflect any modifications subsequent to construction.
647 *
648 * @return an iterator over the elements in this deque in proper sequence.
649 */
650 public Iterator<E> iterator() {
651 return new Itr();
652 }
653
654 /**
655 * Iterator for LinkedBlockingDeque
656 */
657 private class Itr implements Iterator<E> {
658 private Node<E> next;
659
660 /**
661 * nextItem holds on to item fields because once we claim that
662 * an element exists in hasNext(), we must return item read
663 * under lock (in advance()) even if it was in the process of
664 * being removed when hasNext() was called.
665 */
666 private E nextItem;
667
668 /**
669 * Node returned by most recent call to next. Needed by remove.
670 * Reset to null if this element is deleted by a call to remove.
671 */
672 private Node<E> last;
673
674 Itr() {
675 advance();
676 }
677
678 /**
679 * Advances next, or if not yet initialized, set to first node.
680 */
681 private void advance() {
682 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
683 lock.lock();
684 try {
685 next = (next == null)? first : next.next;
686 nextItem = (next == null)? null : next.item;
687 } finally {
688 lock.unlock();
689 }
690 }
691
692 public boolean hasNext() {
693 return next != null;
694 }
695
696 public E next() {
697 if (next == null)
698 throw new NoSuchElementException();
699 last = next;
700 E x = nextItem;
701 advance();
702 return x;
703 }
704
705 public void remove() {
706 Node<E> n = last;
707 if (n == null)
708 throw new IllegalStateException();
709 last = null;
710 // Note: removeNode rescans looking for this node to make
711 // sure it was not already removed. Otherwise, trying to
712 // re-remove could corrupt list.
713 removeNode(n);
714 }
715 }
716
717 /**
718 * Save the state of this deque to a stream (that is, serialize it).
719 *
720 * @serialData The capacity (int), followed by elements (each an
721 * <tt>Object</tt>) in the proper order, followed by a null
722 * @param s the stream
723 */
724 private void writeObject(java.io.ObjectOutputStream s)
725 throws java.io.IOException {
726 lock.lock();
727 try {
728 // Write out capacity and any hidden stuff
729 s.defaultWriteObject();
730 // Write out all elements in the proper order.
731 for (Node<E> p = first; p != null; p = p.next)
732 s.writeObject(p.item);
733 // Use trailing null as sentinel
734 s.writeObject(null);
735 } finally {
736 lock.unlock();
737 }
738 }
739
740 /**
741 * Reconstitute this deque from a stream (that is,
742 * deserialize it).
743 * @param s the stream
744 */
745 private void readObject(java.io.ObjectInputStream s)
746 throws java.io.IOException, ClassNotFoundException {
747 s.defaultReadObject();
748 count = 0;
749 first = null;
750 last = null;
751 // Read in all elements and place in queue
752 for (;;) {
753 E item = (E)s.readObject();
754 if (item == null)
755 break;
756 add(item);
757 }
758 }
759
760 }