ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.81
Committed: Wed Mar 27 19:46:34 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.80: +1 -1 lines
Log Message:
conform to updated lambda Spliterator

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.AbstractQueue;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.Spliterator;
18 import java.util.Spliterators;
19 import java.util.stream.Stream;
20 import java.util.stream.Streams;
21 import java.util.function.Consumer;
22
23 /**
24 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
25 * linked nodes.
26 * This queue orders elements FIFO (first-in-first-out).
27 * The <em>head</em> of the queue is that element that has been on the
28 * queue the longest time.
29 * The <em>tail</em> of the queue is that element that has been on the
30 * queue the shortest time. New elements
31 * are inserted at the tail of the queue, and the queue retrieval
32 * operations obtain elements at the head of the queue.
33 * Linked queues typically have higher throughput than array-based queues but
34 * less predictable performance in most concurrent applications.
35 *
36 * <p>The optional capacity bound constructor argument serves as a
37 * way to prevent excessive queue expansion. The capacity, if unspecified,
38 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
39 * dynamically created upon each insertion unless this would bring the
40 * queue above capacity.
41 *
42 * <p>This class and its iterator implement all of the
43 * <em>optional</em> methods of the {@link Collection} and {@link
44 * Iterator} interfaces.
45 *
46 * <p>This class is a member of the
47 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
48 * Java Collections Framework</a>.
49 *
50 * @since 1.5
51 * @author Doug Lea
52 * @param <E> the type of elements held in this collection
53 */
54 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
55 implements BlockingQueue<E>, java.io.Serializable {
56 private static final long serialVersionUID = -6903933977591709194L;
57
58 /*
59 * A variant of the "two lock queue" algorithm. The putLock gates
60 * entry to put (and offer), and has an associated condition for
61 * waiting puts. Similarly for the takeLock. The "count" field
62 * that they both rely on is maintained as an atomic to avoid
63 * needing to get both locks in most cases. Also, to minimize need
64 * for puts to get takeLock and vice-versa, cascading notifies are
65 * used. When a put notices that it has enabled at least one take,
66 * it signals taker. That taker in turn signals others if more
67 * items have been entered since the signal. And symmetrically for
68 * takes signalling puts. Operations such as remove(Object) and
69 * iterators acquire both locks.
70 *
71 * Visibility between writers and readers is provided as follows:
72 *
73 * Whenever an element is enqueued, the putLock is acquired and
74 * count updated. A subsequent reader guarantees visibility to the
75 * enqueued Node by either acquiring the putLock (via fullyLock)
76 * or by acquiring the takeLock, and then reading n = count.get();
77 * this gives visibility to the first n items.
78 *
79 * To implement weakly consistent iterators, it appears we need to
80 * keep all Nodes GC-reachable from a predecessor dequeued Node.
81 * That would cause two problems:
82 * - allow a rogue Iterator to cause unbounded memory retention
83 * - cause cross-generational linking of old Nodes to new Nodes if
84 * a Node was tenured while live, which generational GCs have a
85 * hard time dealing with, causing repeated major collections.
86 * However, only non-deleted Nodes need to be reachable from
87 * dequeued Nodes, and reachability does not necessarily have to
88 * be of the kind understood by the GC. We use the trick of
89 * linking a Node that has just been dequeued to itself. Such a
90 * self-link implicitly means to advance to head.next.
91 */
92
93 /**
94 * Linked list node class
95 */
96 static class Node<E> {
97 E item;
98
99 /**
100 * One of:
101 * - the real successor Node
102 * - this Node, meaning the successor is head.next
103 * - null, meaning there is no successor (this is the last node)
104 */
105 Node<E> next;
106
107 Node(E x) { item = x; }
108 }
109
110 /** The capacity bound, or Integer.MAX_VALUE if none */
111 private final int capacity;
112
113 /** Current number of elements */
114 private final AtomicInteger count = new AtomicInteger();
115
116 /**
117 * Head of linked list.
118 * Invariant: head.item == null
119 */
120 transient Node<E> head;
121
122 /**
123 * Tail of linked list.
124 * Invariant: last.next == null
125 */
126 private transient Node<E> last;
127
128 /** Lock held by take, poll, etc */
129 private final ReentrantLock takeLock = new ReentrantLock();
130
131 /** Wait queue for waiting takes */
132 private final Condition notEmpty = takeLock.newCondition();
133
134 /** Lock held by put, offer, etc */
135 private final ReentrantLock putLock = new ReentrantLock();
136
137 /** Wait queue for waiting puts */
138 private final Condition notFull = putLock.newCondition();
139
140 /**
141 * Signals a waiting take. Called only from put/offer (which do not
142 * otherwise ordinarily lock takeLock.)
143 */
144 private void signalNotEmpty() {
145 final ReentrantLock takeLock = this.takeLock;
146 takeLock.lock();
147 try {
148 notEmpty.signal();
149 } finally {
150 takeLock.unlock();
151 }
152 }
153
154 /**
155 * Signals a waiting put. Called only from take/poll.
156 */
157 private void signalNotFull() {
158 final ReentrantLock putLock = this.putLock;
159 putLock.lock();
160 try {
161 notFull.signal();
162 } finally {
163 putLock.unlock();
164 }
165 }
166
167 /**
168 * Links node at end of queue.
169 *
170 * @param node the node
171 */
172 private void enqueue(Node<E> node) {
173 // assert putLock.isHeldByCurrentThread();
174 // assert last.next == null;
175 last = last.next = node;
176 }
177
178 /**
179 * Removes a node from head of queue.
180 *
181 * @return the node
182 */
183 private E dequeue() {
184 // assert takeLock.isHeldByCurrentThread();
185 // assert head.item == null;
186 Node<E> h = head;
187 Node<E> first = h.next;
188 h.next = h; // help GC
189 head = first;
190 E x = first.item;
191 first.item = null;
192 return x;
193 }
194
195 /**
196 * Locks to prevent both puts and takes.
197 */
198 void fullyLock() {
199 putLock.lock();
200 takeLock.lock();
201 }
202
203 /**
204 * Unlocks to allow both puts and takes.
205 */
206 void fullyUnlock() {
207 takeLock.unlock();
208 putLock.unlock();
209 }
210
211 // /**
212 // * Tells whether both locks are held by current thread.
213 // */
214 // boolean isFullyLocked() {
215 // return (putLock.isHeldByCurrentThread() &&
216 // takeLock.isHeldByCurrentThread());
217 // }
218
219 /**
220 * Creates a {@code LinkedBlockingQueue} with a capacity of
221 * {@link Integer#MAX_VALUE}.
222 */
223 public LinkedBlockingQueue() {
224 this(Integer.MAX_VALUE);
225 }
226
227 /**
228 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
229 *
230 * @param capacity the capacity of this queue
231 * @throws IllegalArgumentException if {@code capacity} is not greater
232 * than zero
233 */
234 public LinkedBlockingQueue(int capacity) {
235 if (capacity <= 0) throw new IllegalArgumentException();
236 this.capacity = capacity;
237 last = head = new Node<E>(null);
238 }
239
240 /**
241 * Creates a {@code LinkedBlockingQueue} with a capacity of
242 * {@link Integer#MAX_VALUE}, initially containing the elements of the
243 * given collection,
244 * added in traversal order of the collection's iterator.
245 *
246 * @param c the collection of elements to initially contain
247 * @throws NullPointerException if the specified collection or any
248 * of its elements are null
249 */
250 public LinkedBlockingQueue(Collection<? extends E> c) {
251 this(Integer.MAX_VALUE);
252 final ReentrantLock putLock = this.putLock;
253 putLock.lock(); // Never contended, but necessary for visibility
254 try {
255 int n = 0;
256 for (E e : c) {
257 if (e == null)
258 throw new NullPointerException();
259 if (n == capacity)
260 throw new IllegalStateException("Queue full");
261 enqueue(new Node<E>(e));
262 ++n;
263 }
264 count.set(n);
265 } finally {
266 putLock.unlock();
267 }
268 }
269
270 // this doc comment is overridden to remove the reference to collections
271 // greater in size than Integer.MAX_VALUE
272 /**
273 * Returns the number of elements in this queue.
274 *
275 * @return the number of elements in this queue
276 */
277 public int size() {
278 return count.get();
279 }
280
281 // this doc comment is a modified copy of the inherited doc comment,
282 // without the reference to unlimited queues.
283 /**
284 * Returns the number of additional elements that this queue can ideally
285 * (in the absence of memory or resource constraints) accept without
286 * blocking. This is always equal to the initial capacity of this queue
287 * less the current {@code size} of this queue.
288 *
289 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
290 * an element will succeed by inspecting {@code remainingCapacity}
291 * because it may be the case that another thread is about to
292 * insert or remove an element.
293 */
294 public int remainingCapacity() {
295 return capacity - count.get();
296 }
297
298 /**
299 * Inserts the specified element at the tail of this queue, waiting if
300 * necessary for space to become available.
301 *
302 * @throws InterruptedException {@inheritDoc}
303 * @throws NullPointerException {@inheritDoc}
304 */
305 public void put(E e) throws InterruptedException {
306 if (e == null) throw new NullPointerException();
307 // Note: convention in all put/take/etc is to preset local var
308 // holding count negative to indicate failure unless set.
309 int c = -1;
310 Node<E> node = new Node<E>(e);
311 final ReentrantLock putLock = this.putLock;
312 final AtomicInteger count = this.count;
313 putLock.lockInterruptibly();
314 try {
315 /*
316 * Note that count is used in wait guard even though it is
317 * not protected by lock. This works because count can
318 * only decrease at this point (all other puts are shut
319 * out by lock), and we (or some other waiting put) are
320 * signalled if it ever changes from capacity. Similarly
321 * for all other uses of count in other wait guards.
322 */
323 while (count.get() == capacity) {
324 notFull.await();
325 }
326 enqueue(node);
327 c = count.getAndIncrement();
328 if (c + 1 < capacity)
329 notFull.signal();
330 } finally {
331 putLock.unlock();
332 }
333 if (c == 0)
334 signalNotEmpty();
335 }
336
337 /**
338 * Inserts the specified element at the tail of this queue, waiting if
339 * necessary up to the specified wait time for space to become available.
340 *
341 * @return {@code true} if successful, or {@code false} if
342 * the specified waiting time elapses before space is available
343 * @throws InterruptedException {@inheritDoc}
344 * @throws NullPointerException {@inheritDoc}
345 */
346 public boolean offer(E e, long timeout, TimeUnit unit)
347 throws InterruptedException {
348
349 if (e == null) throw new NullPointerException();
350 long nanos = unit.toNanos(timeout);
351 int c = -1;
352 final ReentrantLock putLock = this.putLock;
353 final AtomicInteger count = this.count;
354 putLock.lockInterruptibly();
355 try {
356 while (count.get() == capacity) {
357 if (nanos <= 0)
358 return false;
359 nanos = notFull.awaitNanos(nanos);
360 }
361 enqueue(new Node<E>(e));
362 c = count.getAndIncrement();
363 if (c + 1 < capacity)
364 notFull.signal();
365 } finally {
366 putLock.unlock();
367 }
368 if (c == 0)
369 signalNotEmpty();
370 return true;
371 }
372
373 /**
374 * Inserts the specified element at the tail of this queue if it is
375 * possible to do so immediately without exceeding the queue's capacity,
376 * returning {@code true} upon success and {@code false} if this queue
377 * is full.
378 * When using a capacity-restricted queue, this method is generally
379 * preferable to method {@link BlockingQueue#add add}, which can fail to
380 * insert an element only by throwing an exception.
381 *
382 * @throws NullPointerException if the specified element is null
383 */
384 public boolean offer(E e) {
385 if (e == null) throw new NullPointerException();
386 final AtomicInteger count = this.count;
387 if (count.get() == capacity)
388 return false;
389 int c = -1;
390 Node<E> node = new Node<E>(e);
391 final ReentrantLock putLock = this.putLock;
392 putLock.lock();
393 try {
394 if (count.get() < capacity) {
395 enqueue(node);
396 c = count.getAndIncrement();
397 if (c + 1 < capacity)
398 notFull.signal();
399 }
400 } finally {
401 putLock.unlock();
402 }
403 if (c == 0)
404 signalNotEmpty();
405 return c >= 0;
406 }
407
408 public E take() throws InterruptedException {
409 E x;
410 int c = -1;
411 final AtomicInteger count = this.count;
412 final ReentrantLock takeLock = this.takeLock;
413 takeLock.lockInterruptibly();
414 try {
415 while (count.get() == 0) {
416 notEmpty.await();
417 }
418 x = dequeue();
419 c = count.getAndDecrement();
420 if (c > 1)
421 notEmpty.signal();
422 } finally {
423 takeLock.unlock();
424 }
425 if (c == capacity)
426 signalNotFull();
427 return x;
428 }
429
430 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
431 E x = null;
432 int c = -1;
433 long nanos = unit.toNanos(timeout);
434 final AtomicInteger count = this.count;
435 final ReentrantLock takeLock = this.takeLock;
436 takeLock.lockInterruptibly();
437 try {
438 while (count.get() == 0) {
439 if (nanos <= 0)
440 return null;
441 nanos = notEmpty.awaitNanos(nanos);
442 }
443 x = dequeue();
444 c = count.getAndDecrement();
445 if (c > 1)
446 notEmpty.signal();
447 } finally {
448 takeLock.unlock();
449 }
450 if (c == capacity)
451 signalNotFull();
452 return x;
453 }
454
455 public E poll() {
456 final AtomicInteger count = this.count;
457 if (count.get() == 0)
458 return null;
459 E x = null;
460 int c = -1;
461 final ReentrantLock takeLock = this.takeLock;
462 takeLock.lock();
463 try {
464 if (count.get() > 0) {
465 x = dequeue();
466 c = count.getAndDecrement();
467 if (c > 1)
468 notEmpty.signal();
469 }
470 } finally {
471 takeLock.unlock();
472 }
473 if (c == capacity)
474 signalNotFull();
475 return x;
476 }
477
478 public E peek() {
479 if (count.get() == 0)
480 return null;
481 final ReentrantLock takeLock = this.takeLock;
482 takeLock.lock();
483 try {
484 Node<E> first = head.next;
485 if (first == null)
486 return null;
487 else
488 return first.item;
489 } finally {
490 takeLock.unlock();
491 }
492 }
493
494 /**
495 * Unlinks interior Node p with predecessor trail.
496 */
497 void unlink(Node<E> p, Node<E> trail) {
498 // assert isFullyLocked();
499 // p.next is not changed, to allow iterators that are
500 // traversing p to maintain their weak-consistency guarantee.
501 p.item = null;
502 trail.next = p.next;
503 if (last == p)
504 last = trail;
505 if (count.getAndDecrement() == capacity)
506 notFull.signal();
507 }
508
509 /**
510 * Removes a single instance of the specified element from this queue,
511 * if it is present. More formally, removes an element {@code e} such
512 * that {@code o.equals(e)}, if this queue contains one or more such
513 * elements.
514 * Returns {@code true} if this queue contained the specified element
515 * (or equivalently, if this queue changed as a result of the call).
516 *
517 * @param o element to be removed from this queue, if present
518 * @return {@code true} if this queue changed as a result of the call
519 */
520 public boolean remove(Object o) {
521 if (o == null) return false;
522 fullyLock();
523 try {
524 for (Node<E> trail = head, p = trail.next;
525 p != null;
526 trail = p, p = p.next) {
527 if (o.equals(p.item)) {
528 unlink(p, trail);
529 return true;
530 }
531 }
532 return false;
533 } finally {
534 fullyUnlock();
535 }
536 }
537
538 /**
539 * Returns {@code true} if this queue contains the specified element.
540 * More formally, returns {@code true} if and only if this queue contains
541 * at least one element {@code e} such that {@code o.equals(e)}.
542 *
543 * @param o object to be checked for containment in this queue
544 * @return {@code true} if this queue contains the specified element
545 */
546 public boolean contains(Object o) {
547 if (o == null) return false;
548 fullyLock();
549 try {
550 for (Node<E> p = head.next; p != null; p = p.next)
551 if (o.equals(p.item))
552 return true;
553 return false;
554 } finally {
555 fullyUnlock();
556 }
557 }
558
559 /**
560 * Returns an array containing all of the elements in this queue, in
561 * proper sequence.
562 *
563 * <p>The returned array will be "safe" in that no references to it are
564 * maintained by this queue. (In other words, this method must allocate
565 * a new array). The caller is thus free to modify the returned array.
566 *
567 * <p>This method acts as bridge between array-based and collection-based
568 * APIs.
569 *
570 * @return an array containing all of the elements in this queue
571 */
572 public Object[] toArray() {
573 fullyLock();
574 try {
575 int size = count.get();
576 Object[] a = new Object[size];
577 int k = 0;
578 for (Node<E> p = head.next; p != null; p = p.next)
579 a[k++] = p.item;
580 return a;
581 } finally {
582 fullyUnlock();
583 }
584 }
585
586 /**
587 * Returns an array containing all of the elements in this queue, in
588 * proper sequence; the runtime type of the returned array is that of
589 * the specified array. If the queue fits in the specified array, it
590 * is returned therein. Otherwise, a new array is allocated with the
591 * runtime type of the specified array and the size of this queue.
592 *
593 * <p>If this queue fits in the specified array with room to spare
594 * (i.e., the array has more elements than this queue), the element in
595 * the array immediately following the end of the queue is set to
596 * {@code null}.
597 *
598 * <p>Like the {@link #toArray()} method, this method acts as bridge between
599 * array-based and collection-based APIs. Further, this method allows
600 * precise control over the runtime type of the output array, and may,
601 * under certain circumstances, be used to save allocation costs.
602 *
603 * <p>Suppose {@code x} is a queue known to contain only strings.
604 * The following code can be used to dump the queue into a newly
605 * allocated array of {@code String}:
606 *
607 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
608 *
609 * Note that {@code toArray(new Object[0])} is identical in function to
610 * {@code toArray()}.
611 *
612 * @param a the array into which the elements of the queue are to
613 * be stored, if it is big enough; otherwise, a new array of the
614 * same runtime type is allocated for this purpose
615 * @return an array containing all of the elements in this queue
616 * @throws ArrayStoreException if the runtime type of the specified array
617 * is not a supertype of the runtime type of every element in
618 * this queue
619 * @throws NullPointerException if the specified array is null
620 */
621 @SuppressWarnings("unchecked")
622 public <T> T[] toArray(T[] a) {
623 fullyLock();
624 try {
625 int size = count.get();
626 if (a.length < size)
627 a = (T[])java.lang.reflect.Array.newInstance
628 (a.getClass().getComponentType(), size);
629
630 int k = 0;
631 for (Node<E> p = head.next; p != null; p = p.next)
632 a[k++] = (T)p.item;
633 if (a.length > k)
634 a[k] = null;
635 return a;
636 } finally {
637 fullyUnlock();
638 }
639 }
640
641 public String toString() {
642 fullyLock();
643 try {
644 Node<E> p = head.next;
645 if (p == null)
646 return "[]";
647
648 StringBuilder sb = new StringBuilder();
649 sb.append('[');
650 for (;;) {
651 E e = p.item;
652 sb.append(e == this ? "(this Collection)" : e);
653 p = p.next;
654 if (p == null)
655 return sb.append(']').toString();
656 sb.append(',').append(' ');
657 }
658 } finally {
659 fullyUnlock();
660 }
661 }
662
663 /**
664 * Atomically removes all of the elements from this queue.
665 * The queue will be empty after this call returns.
666 */
667 public void clear() {
668 fullyLock();
669 try {
670 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
671 h.next = h;
672 p.item = null;
673 }
674 head = last;
675 // assert head.item == null && head.next == null;
676 if (count.getAndSet(0) == capacity)
677 notFull.signal();
678 } finally {
679 fullyUnlock();
680 }
681 }
682
683 /**
684 * @throws UnsupportedOperationException {@inheritDoc}
685 * @throws ClassCastException {@inheritDoc}
686 * @throws NullPointerException {@inheritDoc}
687 * @throws IllegalArgumentException {@inheritDoc}
688 */
689 public int drainTo(Collection<? super E> c) {
690 return drainTo(c, Integer.MAX_VALUE);
691 }
692
693 /**
694 * @throws UnsupportedOperationException {@inheritDoc}
695 * @throws ClassCastException {@inheritDoc}
696 * @throws NullPointerException {@inheritDoc}
697 * @throws IllegalArgumentException {@inheritDoc}
698 */
699 public int drainTo(Collection<? super E> c, int maxElements) {
700 if (c == null)
701 throw new NullPointerException();
702 if (c == this)
703 throw new IllegalArgumentException();
704 if (maxElements <= 0)
705 return 0;
706 boolean signalNotFull = false;
707 final ReentrantLock takeLock = this.takeLock;
708 takeLock.lock();
709 try {
710 int n = Math.min(maxElements, count.get());
711 // count.get provides visibility to first n Nodes
712 Node<E> h = head;
713 int i = 0;
714 try {
715 while (i < n) {
716 Node<E> p = h.next;
717 c.add(p.item);
718 p.item = null;
719 h.next = h;
720 h = p;
721 ++i;
722 }
723 return n;
724 } finally {
725 // Restore invariants even if c.add() threw
726 if (i > 0) {
727 // assert h.item == null;
728 head = h;
729 signalNotFull = (count.getAndAdd(-i) == capacity);
730 }
731 }
732 } finally {
733 takeLock.unlock();
734 if (signalNotFull)
735 signalNotFull();
736 }
737 }
738
739 /**
740 * Returns an iterator over the elements in this queue in proper sequence.
741 * The elements will be returned in order from first (head) to last (tail).
742 *
743 * <p>The returned iterator is a "weakly consistent" iterator that
744 * will never throw {@link java.util.ConcurrentModificationException
745 * ConcurrentModificationException}, and guarantees to traverse
746 * elements as they existed upon construction of the iterator, and
747 * may (but is not guaranteed to) reflect any modifications
748 * subsequent to construction.
749 *
750 * @return an iterator over the elements in this queue in proper sequence
751 */
752 public Iterator<E> iterator() {
753 return new Itr();
754 }
755
756 private class Itr implements Iterator<E> {
757 /*
758 * Basic weakly-consistent iterator. At all times hold the next
759 * item to hand out so that if hasNext() reports true, we will
760 * still have it to return even if lost race with a take etc.
761 */
762
763 private Node<E> current;
764 private Node<E> lastRet;
765 private E currentElement;
766
767 Itr() {
768 fullyLock();
769 try {
770 current = head.next;
771 if (current != null)
772 currentElement = current.item;
773 } finally {
774 fullyUnlock();
775 }
776 }
777
778 public boolean hasNext() {
779 return current != null;
780 }
781
782 /**
783 * Returns the next live successor of p, or null if no such.
784 *
785 * Unlike other traversal methods, iterators need to handle both:
786 * - dequeued nodes (p.next == p)
787 * - (possibly multiple) interior removed nodes (p.item == null)
788 */
789 private Node<E> nextNode(Node<E> p) {
790 for (;;) {
791 Node<E> s = p.next;
792 if (s == p)
793 return head.next;
794 if (s == null || s.item != null)
795 return s;
796 p = s;
797 }
798 }
799
800 public E next() {
801 fullyLock();
802 try {
803 if (current == null)
804 throw new NoSuchElementException();
805 E x = currentElement;
806 lastRet = current;
807 current = nextNode(current);
808 currentElement = (current == null) ? null : current.item;
809 return x;
810 } finally {
811 fullyUnlock();
812 }
813 }
814
815 public void remove() {
816 if (lastRet == null)
817 throw new IllegalStateException();
818 fullyLock();
819 try {
820 Node<E> node = lastRet;
821 lastRet = null;
822 for (Node<E> trail = head, p = trail.next;
823 p != null;
824 trail = p, p = p.next) {
825 if (p == node) {
826 unlink(p, trail);
827 break;
828 }
829 }
830 } finally {
831 fullyUnlock();
832 }
833 }
834 }
835
836 /** A customized variant of Spliterators.IteratorSpliterator */
837 static final class LBQSpliterator<E> implements Spliterator<E> {
838 static final int MAX_BATCH = 1 << 25; // max batch array size;
839 final LinkedBlockingQueue<E> queue;
840 Node<E> current; // current node; null until initialized
841 int batch; // batch size for splits
842 boolean exhausted; // true when no more nodes
843 long est; // size estimate
844 LBQSpliterator(LinkedBlockingQueue<E> queue) {
845 this.queue = queue;
846 this.est = queue.size();
847 }
848
849 public long estimateSize() { return est; }
850
851 public Spliterator<E> trySplit() {
852 Node<E> h;
853 final LinkedBlockingQueue<E> q = this.queue;
854 int b = batch;
855 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
856 if (!exhausted &&
857 ((h = current) != null || (h = q.head.next) != null) &&
858 h.next != null) {
859 Object[] a;
860 try {
861 a = new Object[n];
862 } catch (OutOfMemoryError oome) {
863 return null;
864 }
865 int i = 0;
866 Node<E> p = current;
867 q.fullyLock();
868 try {
869 if (p != null || (p = q.head.next) != null) {
870 do {
871 if ((a[i] = p.item) != null)
872 ++i;
873 } while ((p = p.next) != null && i < n);
874 }
875 } finally {
876 q.fullyUnlock();
877 }
878 if ((current = p) == null) {
879 est = 0L;
880 exhausted = true;
881 }
882 else if ((est -= i) < 0L)
883 est = 0L;
884 if (i > 0) {
885 batch = i;
886 return Spliterators.spliterator
887 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
888 Spliterator.CONCURRENT);
889 }
890 }
891 return null;
892 }
893
894 public void forEachRemaining(Consumer<? super E> action) {
895 if (action == null) throw new NullPointerException();
896 final LinkedBlockingQueue<E> q = this.queue;
897 if (!exhausted) {
898 exhausted = true;
899 Node<E> p = current;
900 do {
901 E e = null;
902 q.fullyLock();
903 try {
904 if (p == null)
905 p = q.head.next;
906 while (p != null) {
907 e = p.item;
908 p = p.next;
909 if (e != null)
910 break;
911 }
912 } finally {
913 q.fullyUnlock();
914 }
915 if (e != null)
916 action.accept(e);
917 } while (p != null);
918 }
919 }
920
921 public boolean tryAdvance(Consumer<? super E> action) {
922 if (action == null) throw new NullPointerException();
923 final LinkedBlockingQueue<E> q = this.queue;
924 if (!exhausted) {
925 E e = null;
926 q.fullyLock();
927 try {
928 if (current == null)
929 current = q.head.next;
930 while (current != null) {
931 e = current.item;
932 current = current.next;
933 if (e != null)
934 break;
935 }
936 } finally {
937 q.fullyUnlock();
938 }
939 if (current == null)
940 exhausted = true;
941 if (e != null) {
942 action.accept(e);
943 return true;
944 }
945 }
946 return false;
947 }
948
949 public int characteristics() {
950 return Spliterator.ORDERED | Spliterator.NONNULL |
951 Spliterator.CONCURRENT;
952 }
953 }
954
955 public Spliterator<E> spliterator() {
956 return new LBQSpliterator<E>(this);
957 }
958
959 /**
960 * Saves this queue to a stream (that is, serializes it).
961 *
962 * @serialData The capacity is emitted (int), followed by all of
963 * its elements (each an {@code Object}) in the proper order,
964 * followed by a null
965 */
966 private void writeObject(java.io.ObjectOutputStream s)
967 throws java.io.IOException {
968
969 fullyLock();
970 try {
971 // Write out any hidden stuff, plus capacity
972 s.defaultWriteObject();
973
974 // Write out all elements in the proper order.
975 for (Node<E> p = head.next; p != null; p = p.next)
976 s.writeObject(p.item);
977
978 // Use trailing null as sentinel
979 s.writeObject(null);
980 } finally {
981 fullyUnlock();
982 }
983 }
984
985 /**
986 * Reconstitutes this queue from a stream (that is, deserializes it).
987 */
988 private void readObject(java.io.ObjectInputStream s)
989 throws java.io.IOException, ClassNotFoundException {
990 // Read in capacity, and any hidden stuff
991 s.defaultReadObject();
992
993 count.set(0);
994 last = head = new Node<E>(null);
995
996 // Read in all elements and place in queue
997 for (;;) {
998 @SuppressWarnings("unchecked")
999 E item = (E)s.readObject();
1000 if (item == null)
1001 break;
1002 add(item);
1003 }
1004 }
1005 }