ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingDeque.java
Revision: 1.36
Committed: Sun Feb 17 23:36:34 2013 UTC (11 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.35: +140 -0 lines
Log Message:
Spliterator sync

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.Spliterator;
17 import java.util.stream.Stream;
18 import java.util.stream.Streams;
19 import java.util.function.Consumer;
20
21 /**
22 * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
23 * linked nodes.
24 *
25 * <p>The optional capacity bound constructor argument serves as a
26 * way to prevent excessive expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * deque above capacity.
30 *
31 * <p>Most operations run in constant time (ignoring time spent
32 * blocking). Exceptions include {@link #remove(Object) remove},
33 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
34 * #removeLastOccurrence removeLastOccurrence}, {@link #contains
35 * contains}, {@link #iterator iterator.remove()}, and the bulk
36 * operations, all of which run in linear time.
37 *
38 * <p>This class and its iterator implement all of the
39 * <em>optional</em> methods of the {@link Collection} and {@link
40 * Iterator} interfaces.
41 *
42 * <p>This class is a member of the
43 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
44 * Java Collections Framework</a>.
45 *
46 * @since 1.6
47 * @author Doug Lea
48 * @param <E> the type of elements held in this collection
49 */
50 public class LinkedBlockingDeque<E>
51 extends AbstractQueue<E>
52 implements BlockingDeque<E>, java.io.Serializable {
53
54 /*
55 * Implemented as a simple doubly-linked list protected by a
56 * single lock and using conditions to manage blocking.
57 *
58 * To implement weakly consistent iterators, it appears we need to
59 * keep all Nodes GC-reachable from a predecessor dequeued Node.
60 * That would cause two problems:
61 * - allow a rogue Iterator to cause unbounded memory retention
62 * - cause cross-generational linking of old Nodes to new Nodes if
63 * a Node was tenured while live, which generational GCs have a
64 * hard time dealing with, causing repeated major collections.
65 * However, only non-deleted Nodes need to be reachable from
66 * dequeued Nodes, and reachability does not necessarily have to
67 * be of the kind understood by the GC. We use the trick of
68 * linking a Node that has just been dequeued to itself. Such a
69 * self-link implicitly means to jump to "first" (for next links)
70 * or "last" (for prev links).
71 */
72
73 /*
74 * We have "diamond" multiple interface/abstract class inheritance
75 * here, and that introduces ambiguities. Often we want the
76 * BlockingDeque javadoc combined with the AbstractQueue
77 * implementation, so a lot of method specs are duplicated here.
78 */
79
80 private static final long serialVersionUID = -387911632671998426L;
81
82 /** Doubly-linked list node class */
83 static final class Node<E> {
84 /**
85 * The item, or null if this node has been removed.
86 */
87 E item;
88
89 /**
90 * One of:
91 * - the real predecessor Node
92 * - this Node, meaning the predecessor is tail
93 * - null, meaning there is no predecessor
94 */
95 Node<E> prev;
96
97 /**
98 * One of:
99 * - the real successor Node
100 * - this Node, meaning the successor is head
101 * - null, meaning there is no successor
102 */
103 Node<E> next;
104
105 Node(E x) {
106 item = x;
107 }
108 }
109
110 /**
111 * Pointer to first node.
112 * Invariant: (first == null && last == null) ||
113 * (first.prev == null && first.item != null)
114 */
115 transient Node<E> first;
116
117 /**
118 * Pointer to last node.
119 * Invariant: (first == null && last == null) ||
120 * (last.next == null && last.item != null)
121 */
122 transient Node<E> last;
123
124 /** Number of items in the deque */
125 private transient int count;
126
127 /** Maximum number of items in the deque */
128 private final int capacity;
129
130 /** Main lock guarding all access */
131 final ReentrantLock lock = new ReentrantLock();
132
133 /** Condition for waiting takes */
134 private final Condition notEmpty = lock.newCondition();
135
136 /** Condition for waiting puts */
137 private final Condition notFull = lock.newCondition();
138
139 /**
140 * Creates a {@code LinkedBlockingDeque} with a capacity of
141 * {@link Integer#MAX_VALUE}.
142 */
143 public LinkedBlockingDeque() {
144 this(Integer.MAX_VALUE);
145 }
146
147 /**
148 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
149 *
150 * @param capacity the capacity of this deque
151 * @throws IllegalArgumentException if {@code capacity} is less than 1
152 */
153 public LinkedBlockingDeque(int capacity) {
154 if (capacity <= 0) throw new IllegalArgumentException();
155 this.capacity = capacity;
156 }
157
158 /**
159 * Creates a {@code LinkedBlockingDeque} with a capacity of
160 * {@link Integer#MAX_VALUE}, initially containing the elements of
161 * the given collection, added in traversal order of the
162 * collection's iterator.
163 *
164 * @param c the collection of elements to initially contain
165 * @throws NullPointerException if the specified collection or any
166 * of its elements are null
167 */
168 public LinkedBlockingDeque(Collection<? extends E> c) {
169 this(Integer.MAX_VALUE);
170 final ReentrantLock lock = this.lock;
171 lock.lock(); // Never contended, but necessary for visibility
172 try {
173 for (E e : c) {
174 if (e == null)
175 throw new NullPointerException();
176 if (!linkLast(new Node<E>(e)))
177 throw new IllegalStateException("Deque full");
178 }
179 } finally {
180 lock.unlock();
181 }
182 }
183
184
185 // Basic linking and unlinking operations, called only while holding lock
186
187 /**
188 * Links node as first element, or returns false if full.
189 */
190 private boolean linkFirst(Node<E> node) {
191 // assert lock.isHeldByCurrentThread();
192 if (count >= capacity)
193 return false;
194 Node<E> f = first;
195 node.next = f;
196 first = node;
197 if (last == null)
198 last = node;
199 else
200 f.prev = node;
201 ++count;
202 notEmpty.signal();
203 return true;
204 }
205
206 /**
207 * Links node as last element, or returns false if full.
208 */
209 private boolean linkLast(Node<E> node) {
210 // assert lock.isHeldByCurrentThread();
211 if (count >= capacity)
212 return false;
213 Node<E> l = last;
214 node.prev = l;
215 last = node;
216 if (first == null)
217 first = node;
218 else
219 l.next = node;
220 ++count;
221 notEmpty.signal();
222 return true;
223 }
224
225 /**
226 * Removes and returns first element, or null if empty.
227 */
228 private E unlinkFirst() {
229 // assert lock.isHeldByCurrentThread();
230 Node<E> f = first;
231 if (f == null)
232 return null;
233 Node<E> n = f.next;
234 E item = f.item;
235 f.item = null;
236 f.next = f; // help GC
237 first = n;
238 if (n == null)
239 last = null;
240 else
241 n.prev = null;
242 --count;
243 notFull.signal();
244 return item;
245 }
246
247 /**
248 * Removes and returns last element, or null if empty.
249 */
250 private E unlinkLast() {
251 // assert lock.isHeldByCurrentThread();
252 Node<E> l = last;
253 if (l == null)
254 return null;
255 Node<E> p = l.prev;
256 E item = l.item;
257 l.item = null;
258 l.prev = l; // help GC
259 last = p;
260 if (p == null)
261 first = null;
262 else
263 p.next = null;
264 --count;
265 notFull.signal();
266 return item;
267 }
268
269 /**
270 * Unlinks x.
271 */
272 void unlink(Node<E> x) {
273 // assert lock.isHeldByCurrentThread();
274 Node<E> p = x.prev;
275 Node<E> n = x.next;
276 if (p == null) {
277 unlinkFirst();
278 } else if (n == null) {
279 unlinkLast();
280 } else {
281 p.next = n;
282 n.prev = p;
283 x.item = null;
284 // Don't mess with x's links. They may still be in use by
285 // an iterator.
286 --count;
287 notFull.signal();
288 }
289 }
290
291 // BlockingDeque methods
292
293 /**
294 * @throws IllegalStateException {@inheritDoc}
295 * @throws NullPointerException {@inheritDoc}
296 */
297 public void addFirst(E e) {
298 if (!offerFirst(e))
299 throw new IllegalStateException("Deque full");
300 }
301
302 /**
303 * @throws IllegalStateException {@inheritDoc}
304 * @throws NullPointerException {@inheritDoc}
305 */
306 public void addLast(E e) {
307 if (!offerLast(e))
308 throw new IllegalStateException("Deque full");
309 }
310
311 /**
312 * @throws NullPointerException {@inheritDoc}
313 */
314 public boolean offerFirst(E e) {
315 if (e == null) throw new NullPointerException();
316 Node<E> node = new Node<E>(e);
317 final ReentrantLock lock = this.lock;
318 lock.lock();
319 try {
320 return linkFirst(node);
321 } finally {
322 lock.unlock();
323 }
324 }
325
326 /**
327 * @throws NullPointerException {@inheritDoc}
328 */
329 public boolean offerLast(E e) {
330 if (e == null) throw new NullPointerException();
331 Node<E> node = new Node<E>(e);
332 final ReentrantLock lock = this.lock;
333 lock.lock();
334 try {
335 return linkLast(node);
336 } finally {
337 lock.unlock();
338 }
339 }
340
341 /**
342 * @throws NullPointerException {@inheritDoc}
343 * @throws InterruptedException {@inheritDoc}
344 */
345 public void putFirst(E e) throws InterruptedException {
346 if (e == null) throw new NullPointerException();
347 Node<E> node = new Node<E>(e);
348 final ReentrantLock lock = this.lock;
349 lock.lock();
350 try {
351 while (!linkFirst(node))
352 notFull.await();
353 } finally {
354 lock.unlock();
355 }
356 }
357
358 /**
359 * @throws NullPointerException {@inheritDoc}
360 * @throws InterruptedException {@inheritDoc}
361 */
362 public void putLast(E e) throws InterruptedException {
363 if (e == null) throw new NullPointerException();
364 Node<E> node = new Node<E>(e);
365 final ReentrantLock lock = this.lock;
366 lock.lock();
367 try {
368 while (!linkLast(node))
369 notFull.await();
370 } finally {
371 lock.unlock();
372 }
373 }
374
375 /**
376 * @throws NullPointerException {@inheritDoc}
377 * @throws InterruptedException {@inheritDoc}
378 */
379 public boolean offerFirst(E e, long timeout, TimeUnit unit)
380 throws InterruptedException {
381 if (e == null) throw new NullPointerException();
382 Node<E> node = new Node<E>(e);
383 long nanos = unit.toNanos(timeout);
384 final ReentrantLock lock = this.lock;
385 lock.lockInterruptibly();
386 try {
387 while (!linkFirst(node)) {
388 if (nanos <= 0)
389 return false;
390 nanos = notFull.awaitNanos(nanos);
391 }
392 return true;
393 } finally {
394 lock.unlock();
395 }
396 }
397
398 /**
399 * @throws NullPointerException {@inheritDoc}
400 * @throws InterruptedException {@inheritDoc}
401 */
402 public boolean offerLast(E e, long timeout, TimeUnit unit)
403 throws InterruptedException {
404 if (e == null) throw new NullPointerException();
405 Node<E> node = new Node<E>(e);
406 long nanos = unit.toNanos(timeout);
407 final ReentrantLock lock = this.lock;
408 lock.lockInterruptibly();
409 try {
410 while (!linkLast(node)) {
411 if (nanos <= 0)
412 return false;
413 nanos = notFull.awaitNanos(nanos);
414 }
415 return true;
416 } finally {
417 lock.unlock();
418 }
419 }
420
421 /**
422 * @throws NoSuchElementException {@inheritDoc}
423 */
424 public E removeFirst() {
425 E x = pollFirst();
426 if (x == null) throw new NoSuchElementException();
427 return x;
428 }
429
430 /**
431 * @throws NoSuchElementException {@inheritDoc}
432 */
433 public E removeLast() {
434 E x = pollLast();
435 if (x == null) throw new NoSuchElementException();
436 return x;
437 }
438
439 public E pollFirst() {
440 final ReentrantLock lock = this.lock;
441 lock.lock();
442 try {
443 return unlinkFirst();
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 public E pollLast() {
450 final ReentrantLock lock = this.lock;
451 lock.lock();
452 try {
453 return unlinkLast();
454 } finally {
455 lock.unlock();
456 }
457 }
458
459 public E takeFirst() throws InterruptedException {
460 final ReentrantLock lock = this.lock;
461 lock.lock();
462 try {
463 E x;
464 while ( (x = unlinkFirst()) == null)
465 notEmpty.await();
466 return x;
467 } finally {
468 lock.unlock();
469 }
470 }
471
472 public E takeLast() throws InterruptedException {
473 final ReentrantLock lock = this.lock;
474 lock.lock();
475 try {
476 E x;
477 while ( (x = unlinkLast()) == null)
478 notEmpty.await();
479 return x;
480 } finally {
481 lock.unlock();
482 }
483 }
484
485 public E pollFirst(long timeout, TimeUnit unit)
486 throws InterruptedException {
487 long nanos = unit.toNanos(timeout);
488 final ReentrantLock lock = this.lock;
489 lock.lockInterruptibly();
490 try {
491 E x;
492 while ( (x = unlinkFirst()) == null) {
493 if (nanos <= 0)
494 return null;
495 nanos = notEmpty.awaitNanos(nanos);
496 }
497 return x;
498 } finally {
499 lock.unlock();
500 }
501 }
502
503 public E pollLast(long timeout, TimeUnit unit)
504 throws InterruptedException {
505 long nanos = unit.toNanos(timeout);
506 final ReentrantLock lock = this.lock;
507 lock.lockInterruptibly();
508 try {
509 E x;
510 while ( (x = unlinkLast()) == null) {
511 if (nanos <= 0)
512 return null;
513 nanos = notEmpty.awaitNanos(nanos);
514 }
515 return x;
516 } finally {
517 lock.unlock();
518 }
519 }
520
521 /**
522 * @throws NoSuchElementException {@inheritDoc}
523 */
524 public E getFirst() {
525 E x = peekFirst();
526 if (x == null) throw new NoSuchElementException();
527 return x;
528 }
529
530 /**
531 * @throws NoSuchElementException {@inheritDoc}
532 */
533 public E getLast() {
534 E x = peekLast();
535 if (x == null) throw new NoSuchElementException();
536 return x;
537 }
538
539 public E peekFirst() {
540 final ReentrantLock lock = this.lock;
541 lock.lock();
542 try {
543 return (first == null) ? null : first.item;
544 } finally {
545 lock.unlock();
546 }
547 }
548
549 public E peekLast() {
550 final ReentrantLock lock = this.lock;
551 lock.lock();
552 try {
553 return (last == null) ? null : last.item;
554 } finally {
555 lock.unlock();
556 }
557 }
558
559 public boolean removeFirstOccurrence(Object o) {
560 if (o == null) return false;
561 final ReentrantLock lock = this.lock;
562 lock.lock();
563 try {
564 for (Node<E> p = first; p != null; p = p.next) {
565 if (o.equals(p.item)) {
566 unlink(p);
567 return true;
568 }
569 }
570 return false;
571 } finally {
572 lock.unlock();
573 }
574 }
575
576 public boolean removeLastOccurrence(Object o) {
577 if (o == null) return false;
578 final ReentrantLock lock = this.lock;
579 lock.lock();
580 try {
581 for (Node<E> p = last; p != null; p = p.prev) {
582 if (o.equals(p.item)) {
583 unlink(p);
584 return true;
585 }
586 }
587 return false;
588 } finally {
589 lock.unlock();
590 }
591 }
592
593 // BlockingQueue methods
594
595 /**
596 * Inserts the specified element at the end of this deque unless it would
597 * violate capacity restrictions. When using a capacity-restricted deque,
598 * it is generally preferable to use method {@link #offer(Object) offer}.
599 *
600 * <p>This method is equivalent to {@link #addLast}.
601 *
602 * @throws IllegalStateException if the element cannot be added at this
603 * time due to capacity restrictions
604 * @throws NullPointerException if the specified element is null
605 */
606 public boolean add(E e) {
607 addLast(e);
608 return true;
609 }
610
611 /**
612 * @throws NullPointerException if the specified element is null
613 */
614 public boolean offer(E e) {
615 return offerLast(e);
616 }
617
618 /**
619 * @throws NullPointerException {@inheritDoc}
620 * @throws InterruptedException {@inheritDoc}
621 */
622 public void put(E e) throws InterruptedException {
623 putLast(e);
624 }
625
626 /**
627 * @throws NullPointerException {@inheritDoc}
628 * @throws InterruptedException {@inheritDoc}
629 */
630 public boolean offer(E e, long timeout, TimeUnit unit)
631 throws InterruptedException {
632 return offerLast(e, timeout, unit);
633 }
634
635 /**
636 * Retrieves and removes the head of the queue represented by this deque.
637 * This method differs from {@link #poll poll} only in that it throws an
638 * exception if this deque is empty.
639 *
640 * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
641 *
642 * @return the head of the queue represented by this deque
643 * @throws NoSuchElementException if this deque is empty
644 */
645 public E remove() {
646 return removeFirst();
647 }
648
649 public E poll() {
650 return pollFirst();
651 }
652
653 public E take() throws InterruptedException {
654 return takeFirst();
655 }
656
657 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
658 return pollFirst(timeout, unit);
659 }
660
661 /**
662 * Retrieves, but does not remove, the head of the queue represented by
663 * this deque. This method differs from {@link #peek peek} only in that
664 * it throws an exception if this deque is empty.
665 *
666 * <p>This method is equivalent to {@link #getFirst() getFirst}.
667 *
668 * @return the head of the queue represented by this deque
669 * @throws NoSuchElementException if this deque is empty
670 */
671 public E element() {
672 return getFirst();
673 }
674
675 public E peek() {
676 return peekFirst();
677 }
678
679 /**
680 * Returns the number of additional elements that this deque can ideally
681 * (in the absence of memory or resource constraints) accept without
682 * blocking. This is always equal to the initial capacity of this deque
683 * less the current {@code size} of this deque.
684 *
685 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
686 * an element will succeed by inspecting {@code remainingCapacity}
687 * because it may be the case that another thread is about to
688 * insert or remove an element.
689 */
690 public int remainingCapacity() {
691 final ReentrantLock lock = this.lock;
692 lock.lock();
693 try {
694 return capacity - count;
695 } finally {
696 lock.unlock();
697 }
698 }
699
700 /**
701 * @throws UnsupportedOperationException {@inheritDoc}
702 * @throws ClassCastException {@inheritDoc}
703 * @throws NullPointerException {@inheritDoc}
704 * @throws IllegalArgumentException {@inheritDoc}
705 */
706 public int drainTo(Collection<? super E> c) {
707 return drainTo(c, Integer.MAX_VALUE);
708 }
709
710 /**
711 * @throws UnsupportedOperationException {@inheritDoc}
712 * @throws ClassCastException {@inheritDoc}
713 * @throws NullPointerException {@inheritDoc}
714 * @throws IllegalArgumentException {@inheritDoc}
715 */
716 public int drainTo(Collection<? super E> c, int maxElements) {
717 if (c == null)
718 throw new NullPointerException();
719 if (c == this)
720 throw new IllegalArgumentException();
721 if (maxElements <= 0)
722 return 0;
723 final ReentrantLock lock = this.lock;
724 lock.lock();
725 try {
726 int n = Math.min(maxElements, count);
727 for (int i = 0; i < n; i++) {
728 c.add(first.item); // In this order, in case add() throws.
729 unlinkFirst();
730 }
731 return n;
732 } finally {
733 lock.unlock();
734 }
735 }
736
737 // Stack methods
738
739 /**
740 * @throws IllegalStateException {@inheritDoc}
741 * @throws NullPointerException {@inheritDoc}
742 */
743 public void push(E e) {
744 addFirst(e);
745 }
746
747 /**
748 * @throws NoSuchElementException {@inheritDoc}
749 */
750 public E pop() {
751 return removeFirst();
752 }
753
754 // Collection methods
755
756 /**
757 * Removes the first occurrence of the specified element from this deque.
758 * If the deque does not contain the element, it is unchanged.
759 * More formally, removes the first element {@code e} such that
760 * {@code o.equals(e)} (if such an element exists).
761 * Returns {@code true} if this deque contained the specified element
762 * (or equivalently, if this deque changed as a result of the call).
763 *
764 * <p>This method is equivalent to
765 * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
766 *
767 * @param o element to be removed from this deque, if present
768 * @return {@code true} if this deque changed as a result of the call
769 */
770 public boolean remove(Object o) {
771 return removeFirstOccurrence(o);
772 }
773
774 /**
775 * Returns the number of elements in this deque.
776 *
777 * @return the number of elements in this deque
778 */
779 public int size() {
780 final ReentrantLock lock = this.lock;
781 lock.lock();
782 try {
783 return count;
784 } finally {
785 lock.unlock();
786 }
787 }
788
789 /**
790 * Returns {@code true} if this deque contains the specified element.
791 * More formally, returns {@code true} if and only if this deque contains
792 * at least one element {@code e} such that {@code o.equals(e)}.
793 *
794 * @param o object to be checked for containment in this deque
795 * @return {@code true} if this deque contains the specified element
796 */
797 public boolean contains(Object o) {
798 if (o == null) return false;
799 final ReentrantLock lock = this.lock;
800 lock.lock();
801 try {
802 for (Node<E> p = first; p != null; p = p.next)
803 if (o.equals(p.item))
804 return true;
805 return false;
806 } finally {
807 lock.unlock();
808 }
809 }
810
811 /*
812 * TODO: Add support for more efficient bulk operations.
813 *
814 * We don't want to acquire the lock for every iteration, but we
815 * also want other threads a chance to interact with the
816 * collection, especially when count is close to capacity.
817 */
818
819 // /**
820 // * Adds all of the elements in the specified collection to this
821 // * queue. Attempts to addAll of a queue to itself result in
822 // * {@code IllegalArgumentException}. Further, the behavior of
823 // * this operation is undefined if the specified collection is
824 // * modified while the operation is in progress.
825 // *
826 // * @param c collection containing elements to be added to this queue
827 // * @return {@code true} if this queue changed as a result of the call
828 // * @throws ClassCastException {@inheritDoc}
829 // * @throws NullPointerException {@inheritDoc}
830 // * @throws IllegalArgumentException {@inheritDoc}
831 // * @throws IllegalStateException {@inheritDoc}
832 // * @see #add(Object)
833 // */
834 // public boolean addAll(Collection<? extends E> c) {
835 // if (c == null)
836 // throw new NullPointerException();
837 // if (c == this)
838 // throw new IllegalArgumentException();
839 // final ReentrantLock lock = this.lock;
840 // lock.lock();
841 // try {
842 // boolean modified = false;
843 // for (E e : c)
844 // if (linkLast(e))
845 // modified = true;
846 // return modified;
847 // } finally {
848 // lock.unlock();
849 // }
850 // }
851
852 /**
853 * Returns an array containing all of the elements in this deque, in
854 * proper sequence (from first to last element).
855 *
856 * <p>The returned array will be "safe" in that no references to it are
857 * maintained by this deque. (In other words, this method must allocate
858 * a new array). The caller is thus free to modify the returned array.
859 *
860 * <p>This method acts as bridge between array-based and collection-based
861 * APIs.
862 *
863 * @return an array containing all of the elements in this deque
864 */
865 @SuppressWarnings("unchecked")
866 public Object[] toArray() {
867 final ReentrantLock lock = this.lock;
868 lock.lock();
869 try {
870 Object[] a = new Object[count];
871 int k = 0;
872 for (Node<E> p = first; p != null; p = p.next)
873 a[k++] = p.item;
874 return a;
875 } finally {
876 lock.unlock();
877 }
878 }
879
880 /**
881 * Returns an array containing all of the elements in this deque, in
882 * proper sequence; the runtime type of the returned array is that of
883 * the specified array. If the deque fits in the specified array, it
884 * is returned therein. Otherwise, a new array is allocated with the
885 * runtime type of the specified array and the size of this deque.
886 *
887 * <p>If this deque fits in the specified array with room to spare
888 * (i.e., the array has more elements than this deque), the element in
889 * the array immediately following the end of the deque is set to
890 * {@code null}.
891 *
892 * <p>Like the {@link #toArray()} method, this method acts as bridge between
893 * array-based and collection-based APIs. Further, this method allows
894 * precise control over the runtime type of the output array, and may,
895 * under certain circumstances, be used to save allocation costs.
896 *
897 * <p>Suppose {@code x} is a deque known to contain only strings.
898 * The following code can be used to dump the deque into a newly
899 * allocated array of {@code String}:
900 *
901 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
902 *
903 * Note that {@code toArray(new Object[0])} is identical in function to
904 * {@code toArray()}.
905 *
906 * @param a the array into which the elements of the deque are to
907 * be stored, if it is big enough; otherwise, a new array of the
908 * same runtime type is allocated for this purpose
909 * @return an array containing all of the elements in this deque
910 * @throws ArrayStoreException if the runtime type of the specified array
911 * is not a supertype of the runtime type of every element in
912 * this deque
913 * @throws NullPointerException if the specified array is null
914 */
915 @SuppressWarnings("unchecked")
916 public <T> T[] toArray(T[] a) {
917 final ReentrantLock lock = this.lock;
918 lock.lock();
919 try {
920 if (a.length < count)
921 a = (T[])java.lang.reflect.Array.newInstance
922 (a.getClass().getComponentType(), count);
923
924 int k = 0;
925 for (Node<E> p = first; p != null; p = p.next)
926 a[k++] = (T)p.item;
927 if (a.length > k)
928 a[k] = null;
929 return a;
930 } finally {
931 lock.unlock();
932 }
933 }
934
935 public String toString() {
936 final ReentrantLock lock = this.lock;
937 lock.lock();
938 try {
939 Node<E> p = first;
940 if (p == null)
941 return "[]";
942
943 StringBuilder sb = new StringBuilder();
944 sb.append('[');
945 for (;;) {
946 E e = p.item;
947 sb.append(e == this ? "(this Collection)" : e);
948 p = p.next;
949 if (p == null)
950 return sb.append(']').toString();
951 sb.append(',').append(' ');
952 }
953 } finally {
954 lock.unlock();
955 }
956 }
957
958 /**
959 * Atomically removes all of the elements from this deque.
960 * The deque will be empty after this call returns.
961 */
962 public void clear() {
963 final ReentrantLock lock = this.lock;
964 lock.lock();
965 try {
966 for (Node<E> f = first; f != null; ) {
967 f.item = null;
968 Node<E> n = f.next;
969 f.prev = null;
970 f.next = null;
971 f = n;
972 }
973 first = last = null;
974 count = 0;
975 notFull.signalAll();
976 } finally {
977 lock.unlock();
978 }
979 }
980
981 /**
982 * Returns an iterator over the elements in this deque in proper sequence.
983 * The elements will be returned in order from first (head) to last (tail).
984 *
985 * <p>The returned iterator is a "weakly consistent" iterator that
986 * will never throw {@link java.util.ConcurrentModificationException
987 * ConcurrentModificationException}, and guarantees to traverse
988 * elements as they existed upon construction of the iterator, and
989 * may (but is not guaranteed to) reflect any modifications
990 * subsequent to construction.
991 *
992 * @return an iterator over the elements in this deque in proper sequence
993 */
994 public Iterator<E> iterator() {
995 return new Itr();
996 }
997
998 /**
999 * Returns an iterator over the elements in this deque in reverse
1000 * sequential order. The elements will be returned in order from
1001 * last (tail) to first (head).
1002 *
1003 * <p>The returned iterator is a "weakly consistent" iterator that
1004 * will never throw {@link java.util.ConcurrentModificationException
1005 * ConcurrentModificationException}, and guarantees to traverse
1006 * elements as they existed upon construction of the iterator, and
1007 * may (but is not guaranteed to) reflect any modifications
1008 * subsequent to construction.
1009 *
1010 * @return an iterator over the elements in this deque in reverse order
1011 */
1012 public Iterator<E> descendingIterator() {
1013 return new DescendingItr();
1014 }
1015
1016 /**
1017 * Base class for Iterators for LinkedBlockingDeque
1018 */
1019 private abstract class AbstractItr implements Iterator<E> {
1020 /**
1021 * The next node to return in next()
1022 */
1023 Node<E> next;
1024
1025 /**
1026 * nextItem holds on to item fields because once we claim that
1027 * an element exists in hasNext(), we must return item read
1028 * under lock (in advance()) even if it was in the process of
1029 * being removed when hasNext() was called.
1030 */
1031 E nextItem;
1032
1033 /**
1034 * Node returned by most recent call to next. Needed by remove.
1035 * Reset to null if this element is deleted by a call to remove.
1036 */
1037 private Node<E> lastRet;
1038
1039 abstract Node<E> firstNode();
1040 abstract Node<E> nextNode(Node<E> n);
1041
1042 AbstractItr() {
1043 // set to initial position
1044 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1045 lock.lock();
1046 try {
1047 next = firstNode();
1048 nextItem = (next == null) ? null : next.item;
1049 } finally {
1050 lock.unlock();
1051 }
1052 }
1053
1054 /**
1055 * Returns the successor node of the given non-null, but
1056 * possibly previously deleted, node.
1057 */
1058 private Node<E> succ(Node<E> n) {
1059 // Chains of deleted nodes ending in null or self-links
1060 // are possible if multiple interior nodes are removed.
1061 for (;;) {
1062 Node<E> s = nextNode(n);
1063 if (s == null)
1064 return null;
1065 else if (s.item != null)
1066 return s;
1067 else if (s == n)
1068 return firstNode();
1069 else
1070 n = s;
1071 }
1072 }
1073
1074 /**
1075 * Advances next.
1076 */
1077 void advance() {
1078 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1079 lock.lock();
1080 try {
1081 // assert next != null;
1082 next = succ(next);
1083 nextItem = (next == null) ? null : next.item;
1084 } finally {
1085 lock.unlock();
1086 }
1087 }
1088
1089 public boolean hasNext() {
1090 return next != null;
1091 }
1092
1093 public E next() {
1094 if (next == null)
1095 throw new NoSuchElementException();
1096 lastRet = next;
1097 E x = nextItem;
1098 advance();
1099 return x;
1100 }
1101
1102 public void remove() {
1103 Node<E> n = lastRet;
1104 if (n == null)
1105 throw new IllegalStateException();
1106 lastRet = null;
1107 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1108 lock.lock();
1109 try {
1110 if (n.item != null)
1111 unlink(n);
1112 } finally {
1113 lock.unlock();
1114 }
1115 }
1116 }
1117
1118 /** Forward iterator */
1119 private class Itr extends AbstractItr {
1120 Node<E> firstNode() { return first; }
1121 Node<E> nextNode(Node<E> n) { return n.next; }
1122 // minimal, unsplittable Spliterator implementation
1123 public boolean tryAdvance(Consumer<? super E> action) {
1124 if (hasNext()) {
1125 action.accept(next());
1126 return true;
1127 }
1128 return false;
1129 }
1130 public void forEach(Consumer<? super E> action) {
1131 while (hasNext())
1132 action.accept(next());
1133 }
1134 public int characteristics() {
1135 return Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT;
1136 }
1137 }
1138
1139 /** Descending iterator */
1140 private class DescendingItr extends AbstractItr {
1141 Node<E> firstNode() { return last; }
1142 Node<E> nextNode(Node<E> n) { return n.prev; }
1143 }
1144
1145 static final class LBDSpliterator<E> implements Spliterator<E> {
1146 // Similar idea to ConcurrentLinkedQueue spliterator
1147 static final int MAX_BATCH = 1 << 11; // saturate batch size
1148 final LinkedBlockingDeque<E> queue;
1149 Node<E> current; // current node; null until initialized
1150 int batch; // batch size for splits
1151 boolean exhausted; // true when no more nodes
1152 long est; // size estimate
1153 LBDSpliterator(LinkedBlockingDeque<E> queue) {
1154 this.queue = queue;
1155 this.est = queue.size();
1156 }
1157
1158 public long estimateSize() { return est; }
1159
1160 public Spliterator<E> trySplit() {
1161 int n;
1162 final LinkedBlockingDeque<E> q = this.queue;
1163 final ReentrantLock lock = q.lock;
1164 if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) {
1165 Object[] a = new Object[batch = n];
1166 int i = 0;
1167 Node<E> p = current;
1168 lock.lock();
1169 try {
1170 if (p != null || (p = q.first) != null) {
1171 do {
1172 if ((a[i] = p.item) != null)
1173 ++i;
1174 } while ((p = p.next) != null && i < n);
1175 }
1176 } finally {
1177 lock.unlock();
1178 }
1179 if ((current = p) == null) {
1180 est = 0L;
1181 exhausted = true;
1182 }
1183 else if ((est -= i) <= 0L)
1184 est = 1L;
1185 return Collections.arraySnapshotSpliterator
1186 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
1187 Spliterator.CONCURRENT);
1188 }
1189 return null;
1190 }
1191
1192 public void forEach(Consumer<? super E> action) {
1193 if (action == null) throw new NullPointerException();
1194 final LinkedBlockingDeque<E> q = this.queue;
1195 final ReentrantLock lock = q.lock;
1196 if (!exhausted) {
1197 exhausted = true;
1198 Node<E> p = current;
1199 do {
1200 E e = null;
1201 lock.lock();
1202 try {
1203 if (p == null)
1204 p = q.first;
1205 while (p != null) {
1206 e = p.item;
1207 p = p.next;
1208 if (e != null)
1209 break;
1210 }
1211 } finally {
1212 lock.unlock();
1213 }
1214 if (e != null)
1215 action.accept(e);
1216 } while (p != null);
1217 }
1218 }
1219
1220 public boolean tryAdvance(Consumer<? super E> action) {
1221 if (action == null) throw new NullPointerException();
1222 final LinkedBlockingDeque<E> q = this.queue;
1223 final ReentrantLock lock = q.lock;
1224 if (!exhausted) {
1225 E e = null;
1226 lock.lock();
1227 try {
1228 if (current == null)
1229 current = q.first;
1230 while (current != null) {
1231 e = current.item;
1232 current = current.next;
1233 if (e != null)
1234 break;
1235 }
1236 } finally {
1237 lock.unlock();
1238 }
1239 if (e != null) {
1240 action.accept(e);
1241 return true;
1242 }
1243 exhausted = true;
1244 }
1245 return false;
1246 }
1247
1248 public int characteristics() {
1249 return Spliterator.ORDERED | Spliterator.NONNULL |
1250 Spliterator.CONCURRENT;
1251 }
1252 }
1253
1254 Spliterator<E> spliterator() {
1255 return new LBDSpliterator<E>(this);
1256 }
1257 public Stream<E> stream() {
1258 return Streams.stream(spliterator());
1259 }
1260
1261 public Stream<E> parallelStream() {
1262 return Streams.parallelStream(spliterator());
1263 }
1264
1265 /**
1266 * Saves this deque to a stream (that is, serializes it).
1267 *
1268 * @serialData The capacity (int), followed by elements (each an
1269 * {@code Object}) in the proper order, followed by a null
1270 */
1271 private void writeObject(java.io.ObjectOutputStream s)
1272 throws java.io.IOException {
1273 final ReentrantLock lock = this.lock;
1274 lock.lock();
1275 try {
1276 // Write out capacity and any hidden stuff
1277 s.defaultWriteObject();
1278 // Write out all elements in the proper order.
1279 for (Node<E> p = first; p != null; p = p.next)
1280 s.writeObject(p.item);
1281 // Use trailing null as sentinel
1282 s.writeObject(null);
1283 } finally {
1284 lock.unlock();
1285 }
1286 }
1287
1288 /**
1289 * Reconstitutes this deque from a stream (that is, deserializes it).
1290 */
1291 private void readObject(java.io.ObjectInputStream s)
1292 throws java.io.IOException, ClassNotFoundException {
1293 s.defaultReadObject();
1294 count = 0;
1295 first = null;
1296 last = null;
1297 // Read in all elements and place in queue
1298 for (;;) {
1299 @SuppressWarnings("unchecked")
1300 E item = (E)s.readObject();
1301 if (item == null)
1302 break;
1303 add(item);
1304 }
1305 }
1306
1307 }