ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.74
Committed: Sun Feb 17 23:36:34 2013 UTC (11 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.73: +123 -0 lines
Log Message:
Spliterator sync

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.AbstractQueue;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.Spliterator;
18 import java.util.stream.Stream;
19 import java.util.stream.Streams;
20 import java.util.function.Consumer;
21
22 /**
23 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
24 * linked nodes.
25 * This queue orders elements FIFO (first-in-first-out).
26 * The <em>head</em> of the queue is that element that has been on the
27 * queue the longest time.
28 * The <em>tail</em> of the queue is that element that has been on the
29 * queue the shortest time. New elements
30 * are inserted at the tail of the queue, and the queue retrieval
31 * operations obtain elements at the head of the queue.
32 * Linked queues typically have higher throughput than array-based queues but
33 * less predictable performance in most concurrent applications.
34 *
35 * <p>The optional capacity bound constructor argument serves as a
36 * way to prevent excessive queue expansion. The capacity, if unspecified,
37 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
38 * dynamically created upon each insertion unless this would bring the
39 * queue above capacity.
40 *
41 * <p>This class and its iterator implement all of the
42 * <em>optional</em> methods of the {@link Collection} and {@link
43 * Iterator} interfaces.
44 *
45 * <p>This class is a member of the
46 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
47 * Java Collections Framework</a>.
48 *
49 * @since 1.5
50 * @author Doug Lea
51 * @param <E> the type of elements held in this collection
52 */
53 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
54 implements BlockingQueue<E>, java.io.Serializable {
55 private static final long serialVersionUID = -6903933977591709194L;
56
57 /*
58 * A variant of the "two lock queue" algorithm. The putLock gates
59 * entry to put (and offer), and has an associated condition for
60 * waiting puts. Similarly for the takeLock. The "count" field
61 * that they both rely on is maintained as an atomic to avoid
62 * needing to get both locks in most cases. Also, to minimize need
63 * for puts to get takeLock and vice-versa, cascading notifies are
64 * used. When a put notices that it has enabled at least one take,
65 * it signals taker. That taker in turn signals others if more
66 * items have been entered since the signal. And symmetrically for
67 * takes signalling puts. Operations such as remove(Object) and
68 * iterators acquire both locks.
69 *
70 * Visibility between writers and readers is provided as follows:
71 *
72 * Whenever an element is enqueued, the putLock is acquired and
73 * count updated. A subsequent reader guarantees visibility to the
74 * enqueued Node by either acquiring the putLock (via fullyLock)
75 * or by acquiring the takeLock, and then reading n = count.get();
76 * this gives visibility to the first n items.
77 *
78 * To implement weakly consistent iterators, it appears we need to
79 * keep all Nodes GC-reachable from a predecessor dequeued Node.
80 * That would cause two problems:
81 * - allow a rogue Iterator to cause unbounded memory retention
82 * - cause cross-generational linking of old Nodes to new Nodes if
83 * a Node was tenured while live, which generational GCs have a
84 * hard time dealing with, causing repeated major collections.
85 * However, only non-deleted Nodes need to be reachable from
86 * dequeued Nodes, and reachability does not necessarily have to
87 * be of the kind understood by the GC. We use the trick of
88 * linking a Node that has just been dequeued to itself. Such a
89 * self-link implicitly means to advance to head.next.
90 */
91
92 /**
93 * Linked list node class
94 */
95 static class Node<E> {
96 E item;
97
98 /**
99 * One of:
100 * - the real successor Node
101 * - this Node, meaning the successor is head.next
102 * - null, meaning there is no successor (this is the last node)
103 */
104 Node<E> next;
105
106 Node(E x) { item = x; }
107 }
108
109 /** The capacity bound, or Integer.MAX_VALUE if none */
110 private final int capacity;
111
112 /** Current number of elements */
113 private final AtomicInteger count = new AtomicInteger();
114
115 /**
116 * Head of linked list.
117 * Invariant: head.item == null
118 */
119 transient Node<E> head;
120
121 /**
122 * Tail of linked list.
123 * Invariant: last.next == null
124 */
125 private transient Node<E> last;
126
127 /** Lock held by take, poll, etc */
128 private final ReentrantLock takeLock = new ReentrantLock();
129
130 /** Wait queue for waiting takes */
131 private final Condition notEmpty = takeLock.newCondition();
132
133 /** Lock held by put, offer, etc */
134 private final ReentrantLock putLock = new ReentrantLock();
135
136 /** Wait queue for waiting puts */
137 private final Condition notFull = putLock.newCondition();
138
139 /**
140 * Signals a waiting take. Called only from put/offer (which do not
141 * otherwise ordinarily lock takeLock.)
142 */
143 private void signalNotEmpty() {
144 final ReentrantLock takeLock = this.takeLock;
145 takeLock.lock();
146 try {
147 notEmpty.signal();
148 } finally {
149 takeLock.unlock();
150 }
151 }
152
153 /**
154 * Signals a waiting put. Called only from take/poll.
155 */
156 private void signalNotFull() {
157 final ReentrantLock putLock = this.putLock;
158 putLock.lock();
159 try {
160 notFull.signal();
161 } finally {
162 putLock.unlock();
163 }
164 }
165
166 /**
167 * Links node at end of queue.
168 *
169 * @param node the node
170 */
171 private void enqueue(Node<E> node) {
172 // assert putLock.isHeldByCurrentThread();
173 // assert last.next == null;
174 last = last.next = node;
175 }
176
177 /**
178 * Removes a node from head of queue.
179 *
180 * @return the node
181 */
182 private E dequeue() {
183 // assert takeLock.isHeldByCurrentThread();
184 // assert head.item == null;
185 Node<E> h = head;
186 Node<E> first = h.next;
187 h.next = h; // help GC
188 head = first;
189 E x = first.item;
190 first.item = null;
191 return x;
192 }
193
194 /**
195 * Locks to prevent both puts and takes.
196 */
197 void fullyLock() {
198 putLock.lock();
199 takeLock.lock();
200 }
201
202 /**
203 * Unlocks to allow both puts and takes.
204 */
205 void fullyUnlock() {
206 takeLock.unlock();
207 putLock.unlock();
208 }
209
210 // /**
211 // * Tells whether both locks are held by current thread.
212 // */
213 // boolean isFullyLocked() {
214 // return (putLock.isHeldByCurrentThread() &&
215 // takeLock.isHeldByCurrentThread());
216 // }
217
218 /**
219 * Creates a {@code LinkedBlockingQueue} with a capacity of
220 * {@link Integer#MAX_VALUE}.
221 */
222 public LinkedBlockingQueue() {
223 this(Integer.MAX_VALUE);
224 }
225
226 /**
227 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
228 *
229 * @param capacity the capacity of this queue
230 * @throws IllegalArgumentException if {@code capacity} is not greater
231 * than zero
232 */
233 public LinkedBlockingQueue(int capacity) {
234 if (capacity <= 0) throw new IllegalArgumentException();
235 this.capacity = capacity;
236 last = head = new Node<E>(null);
237 }
238
239 /**
240 * Creates a {@code LinkedBlockingQueue} with a capacity of
241 * {@link Integer#MAX_VALUE}, initially containing the elements of the
242 * given collection,
243 * added in traversal order of the collection's iterator.
244 *
245 * @param c the collection of elements to initially contain
246 * @throws NullPointerException if the specified collection or any
247 * of its elements are null
248 */
249 public LinkedBlockingQueue(Collection<? extends E> c) {
250 this(Integer.MAX_VALUE);
251 final ReentrantLock putLock = this.putLock;
252 putLock.lock(); // Never contended, but necessary for visibility
253 try {
254 int n = 0;
255 for (E e : c) {
256 if (e == null)
257 throw new NullPointerException();
258 if (n == capacity)
259 throw new IllegalStateException("Queue full");
260 enqueue(new Node<E>(e));
261 ++n;
262 }
263 count.set(n);
264 } finally {
265 putLock.unlock();
266 }
267 }
268
269 // this doc comment is overridden to remove the reference to collections
270 // greater in size than Integer.MAX_VALUE
271 /**
272 * Returns the number of elements in this queue.
273 *
274 * @return the number of elements in this queue
275 */
276 public int size() {
277 return count.get();
278 }
279
280 // this doc comment is a modified copy of the inherited doc comment,
281 // without the reference to unlimited queues.
282 /**
283 * Returns the number of additional elements that this queue can ideally
284 * (in the absence of memory or resource constraints) accept without
285 * blocking. This is always equal to the initial capacity of this queue
286 * less the current {@code size} of this queue.
287 *
288 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
289 * an element will succeed by inspecting {@code remainingCapacity}
290 * because it may be the case that another thread is about to
291 * insert or remove an element.
292 */
293 public int remainingCapacity() {
294 return capacity - count.get();
295 }
296
297 /**
298 * Inserts the specified element at the tail of this queue, waiting if
299 * necessary for space to become available.
300 *
301 * @throws InterruptedException {@inheritDoc}
302 * @throws NullPointerException {@inheritDoc}
303 */
304 public void put(E e) throws InterruptedException {
305 if (e == null) throw new NullPointerException();
306 // Note: convention in all put/take/etc is to preset local var
307 // holding count negative to indicate failure unless set.
308 int c = -1;
309 Node<E> node = new Node<E>(e);
310 final ReentrantLock putLock = this.putLock;
311 final AtomicInteger count = this.count;
312 putLock.lockInterruptibly();
313 try {
314 /*
315 * Note that count is used in wait guard even though it is
316 * not protected by lock. This works because count can
317 * only decrease at this point (all other puts are shut
318 * out by lock), and we (or some other waiting put) are
319 * signalled if it ever changes from capacity. Similarly
320 * for all other uses of count in other wait guards.
321 */
322 while (count.get() == capacity) {
323 notFull.await();
324 }
325 enqueue(node);
326 c = count.getAndIncrement();
327 if (c + 1 < capacity)
328 notFull.signal();
329 } finally {
330 putLock.unlock();
331 }
332 if (c == 0)
333 signalNotEmpty();
334 }
335
336 /**
337 * Inserts the specified element at the tail of this queue, waiting if
338 * necessary up to the specified wait time for space to become available.
339 *
340 * @return {@code true} if successful, or {@code false} if
341 * the specified waiting time elapses before space is available
342 * @throws InterruptedException {@inheritDoc}
343 * @throws NullPointerException {@inheritDoc}
344 */
345 public boolean offer(E e, long timeout, TimeUnit unit)
346 throws InterruptedException {
347
348 if (e == null) throw new NullPointerException();
349 long nanos = unit.toNanos(timeout);
350 int c = -1;
351 final ReentrantLock putLock = this.putLock;
352 final AtomicInteger count = this.count;
353 putLock.lockInterruptibly();
354 try {
355 while (count.get() == capacity) {
356 if (nanos <= 0)
357 return false;
358 nanos = notFull.awaitNanos(nanos);
359 }
360 enqueue(new Node<E>(e));
361 c = count.getAndIncrement();
362 if (c + 1 < capacity)
363 notFull.signal();
364 } finally {
365 putLock.unlock();
366 }
367 if (c == 0)
368 signalNotEmpty();
369 return true;
370 }
371
372 /**
373 * Inserts the specified element at the tail of this queue if it is
374 * possible to do so immediately without exceeding the queue's capacity,
375 * returning {@code true} upon success and {@code false} if this queue
376 * is full.
377 * When using a capacity-restricted queue, this method is generally
378 * preferable to method {@link BlockingQueue#add add}, which can fail to
379 * insert an element only by throwing an exception.
380 *
381 * @throws NullPointerException if the specified element is null
382 */
383 public boolean offer(E e) {
384 if (e == null) throw new NullPointerException();
385 final AtomicInteger count = this.count;
386 if (count.get() == capacity)
387 return false;
388 int c = -1;
389 Node<E> node = new Node<E>(e);
390 final ReentrantLock putLock = this.putLock;
391 putLock.lock();
392 try {
393 if (count.get() < capacity) {
394 enqueue(node);
395 c = count.getAndIncrement();
396 if (c + 1 < capacity)
397 notFull.signal();
398 }
399 } finally {
400 putLock.unlock();
401 }
402 if (c == 0)
403 signalNotEmpty();
404 return c >= 0;
405 }
406
407 public E take() throws InterruptedException {
408 E x;
409 int c = -1;
410 final AtomicInteger count = this.count;
411 final ReentrantLock takeLock = this.takeLock;
412 takeLock.lockInterruptibly();
413 try {
414 while (count.get() == 0) {
415 notEmpty.await();
416 }
417 x = dequeue();
418 c = count.getAndDecrement();
419 if (c > 1)
420 notEmpty.signal();
421 } finally {
422 takeLock.unlock();
423 }
424 if (c == capacity)
425 signalNotFull();
426 return x;
427 }
428
429 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
430 E x = null;
431 int c = -1;
432 long nanos = unit.toNanos(timeout);
433 final AtomicInteger count = this.count;
434 final ReentrantLock takeLock = this.takeLock;
435 takeLock.lockInterruptibly();
436 try {
437 while (count.get() == 0) {
438 if (nanos <= 0)
439 return null;
440 nanos = notEmpty.awaitNanos(nanos);
441 }
442 x = dequeue();
443 c = count.getAndDecrement();
444 if (c > 1)
445 notEmpty.signal();
446 } finally {
447 takeLock.unlock();
448 }
449 if (c == capacity)
450 signalNotFull();
451 return x;
452 }
453
454 public E poll() {
455 final AtomicInteger count = this.count;
456 if (count.get() == 0)
457 return null;
458 E x = null;
459 int c = -1;
460 final ReentrantLock takeLock = this.takeLock;
461 takeLock.lock();
462 try {
463 if (count.get() > 0) {
464 x = dequeue();
465 c = count.getAndDecrement();
466 if (c > 1)
467 notEmpty.signal();
468 }
469 } finally {
470 takeLock.unlock();
471 }
472 if (c == capacity)
473 signalNotFull();
474 return x;
475 }
476
477 public E peek() {
478 if (count.get() == 0)
479 return null;
480 final ReentrantLock takeLock = this.takeLock;
481 takeLock.lock();
482 try {
483 Node<E> first = head.next;
484 if (first == null)
485 return null;
486 else
487 return first.item;
488 } finally {
489 takeLock.unlock();
490 }
491 }
492
493 /**
494 * Unlinks interior Node p with predecessor trail.
495 */
496 void unlink(Node<E> p, Node<E> trail) {
497 // assert isFullyLocked();
498 // p.next is not changed, to allow iterators that are
499 // traversing p to maintain their weak-consistency guarantee.
500 p.item = null;
501 trail.next = p.next;
502 if (last == p)
503 last = trail;
504 if (count.getAndDecrement() == capacity)
505 notFull.signal();
506 }
507
508 /**
509 * Removes a single instance of the specified element from this queue,
510 * if it is present. More formally, removes an element {@code e} such
511 * that {@code o.equals(e)}, if this queue contains one or more such
512 * elements.
513 * Returns {@code true} if this queue contained the specified element
514 * (or equivalently, if this queue changed as a result of the call).
515 *
516 * @param o element to be removed from this queue, if present
517 * @return {@code true} if this queue changed as a result of the call
518 */
519 public boolean remove(Object o) {
520 if (o == null) return false;
521 fullyLock();
522 try {
523 for (Node<E> trail = head, p = trail.next;
524 p != null;
525 trail = p, p = p.next) {
526 if (o.equals(p.item)) {
527 unlink(p, trail);
528 return true;
529 }
530 }
531 return false;
532 } finally {
533 fullyUnlock();
534 }
535 }
536
537 /**
538 * Returns {@code true} if this queue contains the specified element.
539 * More formally, returns {@code true} if and only if this queue contains
540 * at least one element {@code e} such that {@code o.equals(e)}.
541 *
542 * @param o object to be checked for containment in this queue
543 * @return {@code true} if this queue contains the specified element
544 */
545 public boolean contains(Object o) {
546 if (o == null) return false;
547 fullyLock();
548 try {
549 for (Node<E> p = head.next; p != null; p = p.next)
550 if (o.equals(p.item))
551 return true;
552 return false;
553 } finally {
554 fullyUnlock();
555 }
556 }
557
558 /**
559 * Returns an array containing all of the elements in this queue, in
560 * proper sequence.
561 *
562 * <p>The returned array will be "safe" in that no references to it are
563 * maintained by this queue. (In other words, this method must allocate
564 * a new array). The caller is thus free to modify the returned array.
565 *
566 * <p>This method acts as bridge between array-based and collection-based
567 * APIs.
568 *
569 * @return an array containing all of the elements in this queue
570 */
571 public Object[] toArray() {
572 fullyLock();
573 try {
574 int size = count.get();
575 Object[] a = new Object[size];
576 int k = 0;
577 for (Node<E> p = head.next; p != null; p = p.next)
578 a[k++] = p.item;
579 return a;
580 } finally {
581 fullyUnlock();
582 }
583 }
584
585 /**
586 * Returns an array containing all of the elements in this queue, in
587 * proper sequence; the runtime type of the returned array is that of
588 * the specified array. If the queue fits in the specified array, it
589 * is returned therein. Otherwise, a new array is allocated with the
590 * runtime type of the specified array and the size of this queue.
591 *
592 * <p>If this queue fits in the specified array with room to spare
593 * (i.e., the array has more elements than this queue), the element in
594 * the array immediately following the end of the queue is set to
595 * {@code null}.
596 *
597 * <p>Like the {@link #toArray()} method, this method acts as bridge between
598 * array-based and collection-based APIs. Further, this method allows
599 * precise control over the runtime type of the output array, and may,
600 * under certain circumstances, be used to save allocation costs.
601 *
602 * <p>Suppose {@code x} is a queue known to contain only strings.
603 * The following code can be used to dump the queue into a newly
604 * allocated array of {@code String}:
605 *
606 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
607 *
608 * Note that {@code toArray(new Object[0])} is identical in function to
609 * {@code toArray()}.
610 *
611 * @param a the array into which the elements of the queue are to
612 * be stored, if it is big enough; otherwise, a new array of the
613 * same runtime type is allocated for this purpose
614 * @return an array containing all of the elements in this queue
615 * @throws ArrayStoreException if the runtime type of the specified array
616 * is not a supertype of the runtime type of every element in
617 * this queue
618 * @throws NullPointerException if the specified array is null
619 */
620 @SuppressWarnings("unchecked")
621 public <T> T[] toArray(T[] a) {
622 fullyLock();
623 try {
624 int size = count.get();
625 if (a.length < size)
626 a = (T[])java.lang.reflect.Array.newInstance
627 (a.getClass().getComponentType(), size);
628
629 int k = 0;
630 for (Node<E> p = head.next; p != null; p = p.next)
631 a[k++] = (T)p.item;
632 if (a.length > k)
633 a[k] = null;
634 return a;
635 } finally {
636 fullyUnlock();
637 }
638 }
639
640 public String toString() {
641 fullyLock();
642 try {
643 Node<E> p = head.next;
644 if (p == null)
645 return "[]";
646
647 StringBuilder sb = new StringBuilder();
648 sb.append('[');
649 for (;;) {
650 E e = p.item;
651 sb.append(e == this ? "(this Collection)" : e);
652 p = p.next;
653 if (p == null)
654 return sb.append(']').toString();
655 sb.append(',').append(' ');
656 }
657 } finally {
658 fullyUnlock();
659 }
660 }
661
662 /**
663 * Atomically removes all of the elements from this queue.
664 * The queue will be empty after this call returns.
665 */
666 public void clear() {
667 fullyLock();
668 try {
669 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
670 h.next = h;
671 p.item = null;
672 }
673 head = last;
674 // assert head.item == null && head.next == null;
675 if (count.getAndSet(0) == capacity)
676 notFull.signal();
677 } finally {
678 fullyUnlock();
679 }
680 }
681
682 /**
683 * @throws UnsupportedOperationException {@inheritDoc}
684 * @throws ClassCastException {@inheritDoc}
685 * @throws NullPointerException {@inheritDoc}
686 * @throws IllegalArgumentException {@inheritDoc}
687 */
688 public int drainTo(Collection<? super E> c) {
689 return drainTo(c, Integer.MAX_VALUE);
690 }
691
692 /**
693 * @throws UnsupportedOperationException {@inheritDoc}
694 * @throws ClassCastException {@inheritDoc}
695 * @throws NullPointerException {@inheritDoc}
696 * @throws IllegalArgumentException {@inheritDoc}
697 */
698 public int drainTo(Collection<? super E> c, int maxElements) {
699 if (c == null)
700 throw new NullPointerException();
701 if (c == this)
702 throw new IllegalArgumentException();
703 if (maxElements <= 0)
704 return 0;
705 boolean signalNotFull = false;
706 final ReentrantLock takeLock = this.takeLock;
707 takeLock.lock();
708 try {
709 int n = Math.min(maxElements, count.get());
710 // count.get provides visibility to first n Nodes
711 Node<E> h = head;
712 int i = 0;
713 try {
714 while (i < n) {
715 Node<E> p = h.next;
716 c.add(p.item);
717 p.item = null;
718 h.next = h;
719 h = p;
720 ++i;
721 }
722 return n;
723 } finally {
724 // Restore invariants even if c.add() threw
725 if (i > 0) {
726 // assert h.item == null;
727 head = h;
728 signalNotFull = (count.getAndAdd(-i) == capacity);
729 }
730 }
731 } finally {
732 takeLock.unlock();
733 if (signalNotFull)
734 signalNotFull();
735 }
736 }
737
738 /**
739 * Returns an iterator over the elements in this queue in proper sequence.
740 * The elements will be returned in order from first (head) to last (tail).
741 *
742 * <p>The returned iterator is a "weakly consistent" iterator that
743 * will never throw {@link java.util.ConcurrentModificationException
744 * ConcurrentModificationException}, and guarantees to traverse
745 * elements as they existed upon construction of the iterator, and
746 * may (but is not guaranteed to) reflect any modifications
747 * subsequent to construction.
748 *
749 * @return an iterator over the elements in this queue in proper sequence
750 */
751 public Iterator<E> iterator() {
752 return new Itr();
753 }
754
755 private class Itr implements Iterator<E> {
756 /*
757 * Basic weakly-consistent iterator. At all times hold the next
758 * item to hand out so that if hasNext() reports true, we will
759 * still have it to return even if lost race with a take etc.
760 */
761
762 private Node<E> current;
763 private Node<E> lastRet;
764 private E currentElement;
765
766 Itr() {
767 fullyLock();
768 try {
769 current = head.next;
770 if (current != null)
771 currentElement = current.item;
772 } finally {
773 fullyUnlock();
774 }
775 }
776
777 public boolean hasNext() {
778 return current != null;
779 }
780
781 /**
782 * Returns the next live successor of p, or null if no such.
783 *
784 * Unlike other traversal methods, iterators need to handle both:
785 * - dequeued nodes (p.next == p)
786 * - (possibly multiple) interior removed nodes (p.item == null)
787 */
788 private Node<E> nextNode(Node<E> p) {
789 for (;;) {
790 Node<E> s = p.next;
791 if (s == p)
792 return head.next;
793 if (s == null || s.item != null)
794 return s;
795 p = s;
796 }
797 }
798
799 public E next() {
800 fullyLock();
801 try {
802 if (current == null)
803 throw new NoSuchElementException();
804 E x = currentElement;
805 lastRet = current;
806 current = nextNode(current);
807 currentElement = (current == null) ? null : current.item;
808 return x;
809 } finally {
810 fullyUnlock();
811 }
812 }
813
814 public void remove() {
815 if (lastRet == null)
816 throw new IllegalStateException();
817 fullyLock();
818 try {
819 Node<E> node = lastRet;
820 lastRet = null;
821 for (Node<E> trail = head, p = trail.next;
822 p != null;
823 trail = p, p = p.next) {
824 if (p == node) {
825 unlink(p, trail);
826 break;
827 }
828 }
829 } finally {
830 fullyUnlock();
831 }
832 }
833 }
834
835 static final class LBQSpliterator<E> implements Spliterator<E> {
836 // Similar idea to ConcurrentLinkedQueue spliterator
837 static final int MAX_BATCH = 1 << 11; // saturate batch size
838 final LinkedBlockingQueue<E> queue;
839 Node<E> current; // current node; null until initialized
840 int batch; // batch size for splits
841 boolean exhausted; // true when no more nodes
842 long est; // size estimate
843 LBQSpliterator(LinkedBlockingQueue<E> queue) {
844 this.queue = queue;
845 this.est = queue.size();
846 }
847
848 public long estimateSize() { return est; }
849
850 public Spliterator<E> trySplit() {
851 int n;
852 final LinkedBlockingQueue<E> q = this.queue;
853 if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) {
854 Object[] a = new Object[batch = n];
855 int i = 0;
856 Node<E> p = current;
857 q.fullyLock();
858 try {
859 if (p != null || (p = q.head.next) != null) {
860 do {
861 if ((a[i] = p.item) != null)
862 ++i;
863 } while ((p = p.next) != null && i < n);
864 }
865 } finally {
866 q.fullyUnlock();
867 }
868 if ((current = p) == null) {
869 est = 0L;
870 exhausted = true;
871 }
872 else if ((est -= i) <= 0L)
873 est = 1L;
874 return Collections.arraySnapshotSpliterator
875 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
876 Spliterator.CONCURRENT);
877 }
878 return null;
879 }
880
881 public void forEach(Consumer<? super E> action) {
882 if (action == null) throw new NullPointerException();
883 final LinkedBlockingQueue<E> q = this.queue;
884 if (!exhausted) {
885 exhausted = true;
886 Node<E> p = current;
887 do {
888 E e = null;
889 q.fullyLock();
890 try {
891 if (p == null)
892 p = q.head.next;
893 while (p != null) {
894 e = p.item;
895 p = p.next;
896 if (e != null)
897 break;
898 }
899 } finally {
900 q.fullyUnlock();
901 }
902 if (e != null)
903 action.accept(e);
904 } while (p != null);
905 }
906 }
907
908 public boolean tryAdvance(Consumer<? super E> action) {
909 if (action == null) throw new NullPointerException();
910 final LinkedBlockingQueue<E> q = this.queue;
911 if (!exhausted) {
912 E e = null;
913 q.fullyLock();
914 try {
915 if (current == null)
916 current = q.head.next;
917 while (current != null) {
918 e = current.item;
919 current = current.next;
920 if (e != null)
921 break;
922 }
923 } finally {
924 q.fullyUnlock();
925 }
926 if (e != null) {
927 action.accept(e);
928 return true;
929 }
930 exhausted = true;
931 }
932 return false;
933 }
934
935 public int characteristics() {
936 return Spliterator.ORDERED | Spliterator.NONNULL |
937 Spliterator.CONCURRENT;
938 }
939 }
940
941 Spliterator<E> spliterator() {
942 return new LBQSpliterator<E>(this);
943 }
944
945 public Stream<E> stream() {
946 return Streams.stream(spliterator());
947 }
948
949 public Stream<E> parallelStream() {
950 return Streams.parallelStream(spliterator());
951 }
952
953 /**
954 * Saves this queue to a stream (that is, serializes it).
955 *
956 * @serialData The capacity is emitted (int), followed by all of
957 * its elements (each an {@code Object}) in the proper order,
958 * followed by a null
959 */
960 private void writeObject(java.io.ObjectOutputStream s)
961 throws java.io.IOException {
962
963 fullyLock();
964 try {
965 // Write out any hidden stuff, plus capacity
966 s.defaultWriteObject();
967
968 // Write out all elements in the proper order.
969 for (Node<E> p = head.next; p != null; p = p.next)
970 s.writeObject(p.item);
971
972 // Use trailing null as sentinel
973 s.writeObject(null);
974 } finally {
975 fullyUnlock();
976 }
977 }
978
979 /**
980 * Reconstitutes this queue from a stream (that is, deserializes it).
981 */
982 private void readObject(java.io.ObjectInputStream s)
983 throws java.io.IOException, ClassNotFoundException {
984 // Read in capacity, and any hidden stuff
985 s.defaultReadObject();
986
987 count.set(0);
988 last = head = new Node<E>(null);
989
990 // Read in all elements and place in queue
991 for (;;) {
992 @SuppressWarnings("unchecked")
993 E item = (E)s.readObject();
994 if (item == null)
995 break;
996 add(item);
997 }
998 }
999 }