ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.2
Committed: Sat Dec 17 21:56:54 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +145 -92 lines
Log Message:
sync with src/main

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.Iterator;
12 import java.util.NoSuchElementException;
13 import java.util.Objects;
14 import java.util.Spliterator;
15 import java.util.Spliterators;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.function.Consumer;
20
21 /**
22 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
23 * linked nodes.
24 * This queue orders elements FIFO (first-in-first-out).
25 * The <em>head</em> of the queue is that element that has been on the
26 * queue the longest time.
27 * The <em>tail</em> of the queue is that element that has been on the
28 * queue the shortest time. New elements
29 * are inserted at the tail of the queue, and the queue retrieval
30 * operations obtain elements at the head of the queue.
31 * Linked queues typically have higher throughput than array-based queues but
32 * less predictable performance in most concurrent applications.
33 *
34 * <p>The optional capacity bound constructor argument serves as a
35 * way to prevent excessive queue expansion. The capacity, if unspecified,
36 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
37 * dynamically created upon each insertion unless this would bring the
38 * queue above capacity.
39 *
40 * <p>This class and its iterator implement all of the
41 * <em>optional</em> methods of the {@link Collection} and {@link
42 * Iterator} interfaces.
43 *
44 * <p>This class is a member of the
45 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
46 * Java Collections Framework</a>.
47 *
48 * @since 1.5
49 * @author Doug Lea
50 * @param <E> the type of elements held in this queue
51 */
52 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
54 private static final long serialVersionUID = -6903933977591709194L;
55
56 /*
57 * A variant of the "two lock queue" algorithm. The putLock gates
58 * entry to put (and offer), and has an associated condition for
59 * waiting puts. Similarly for the takeLock. The "count" field
60 * that they both rely on is maintained as an atomic to avoid
61 * needing to get both locks in most cases. Also, to minimize need
62 * for puts to get takeLock and vice-versa, cascading notifies are
63 * used. When a put notices that it has enabled at least one take,
64 * it signals taker. That taker in turn signals others if more
65 * items have been entered since the signal. And symmetrically for
66 * takes signalling puts. Operations such as remove(Object) and
67 * iterators acquire both locks.
68 *
69 * Visibility between writers and readers is provided as follows:
70 *
71 * Whenever an element is enqueued, the putLock is acquired and
72 * count updated. A subsequent reader guarantees visibility to the
73 * enqueued Node by either acquiring the putLock (via fullyLock)
74 * or by acquiring the takeLock, and then reading n = count.get();
75 * this gives visibility to the first n items.
76 *
77 * To implement weakly consistent iterators, it appears we need to
78 * keep all Nodes GC-reachable from a predecessor dequeued Node.
79 * That would cause two problems:
80 * - allow a rogue Iterator to cause unbounded memory retention
81 * - cause cross-generational linking of old Nodes to new Nodes if
82 * a Node was tenured while live, which generational GCs have a
83 * hard time dealing with, causing repeated major collections.
84 * However, only non-deleted Nodes need to be reachable from
85 * dequeued Nodes, and reachability does not necessarily have to
86 * be of the kind understood by the GC. We use the trick of
87 * linking a Node that has just been dequeued to itself. Such a
88 * self-link implicitly means to advance to head.next.
89 */
90
91 /**
92 * Linked list node class.
93 */
94 static class Node<E> {
95 E item;
96
97 /**
98 * One of:
99 * - the real successor Node
100 * - this Node, meaning the successor is head.next
101 * - null, meaning there is no successor (this is the last node)
102 */
103 Node<E> next;
104
105 Node(E x) { item = x; }
106 }
107
108 /** The capacity bound, or Integer.MAX_VALUE if none */
109 private final int capacity;
110
111 /** Current number of elements */
112 private final AtomicInteger count = new AtomicInteger();
113
114 /**
115 * Head of linked list.
116 * Invariant: head.item == null
117 */
118 transient Node<E> head;
119
120 /**
121 * Tail of linked list.
122 * Invariant: last.next == null
123 */
124 private transient Node<E> last;
125
126 /** Lock held by take, poll, etc */
127 private final ReentrantLock takeLock = new ReentrantLock();
128
129 /** Wait queue for waiting takes */
130 private final Condition notEmpty = takeLock.newCondition();
131
132 /** Lock held by put, offer, etc */
133 private final ReentrantLock putLock = new ReentrantLock();
134
135 /** Wait queue for waiting puts */
136 private final Condition notFull = putLock.newCondition();
137
138 /**
139 * Signals a waiting take. Called only from put/offer (which do not
140 * otherwise ordinarily lock takeLock.)
141 */
142 private void signalNotEmpty() {
143 final ReentrantLock takeLock = this.takeLock;
144 takeLock.lock();
145 try {
146 notEmpty.signal();
147 } finally {
148 takeLock.unlock();
149 }
150 }
151
152 /**
153 * Signals a waiting put. Called only from take/poll.
154 */
155 private void signalNotFull() {
156 final ReentrantLock putLock = this.putLock;
157 putLock.lock();
158 try {
159 notFull.signal();
160 } finally {
161 putLock.unlock();
162 }
163 }
164
165 /**
166 * Links node at end of queue.
167 *
168 * @param node the node
169 */
170 private void enqueue(Node<E> node) {
171 // assert putLock.isHeldByCurrentThread();
172 // assert last.next == null;
173 last = last.next = node;
174 }
175
176 /**
177 * Removes a node from head of queue.
178 *
179 * @return the node
180 */
181 private E dequeue() {
182 // assert takeLock.isHeldByCurrentThread();
183 // assert head.item == null;
184 Node<E> h = head;
185 Node<E> first = h.next;
186 h.next = h; // help GC
187 head = first;
188 E x = first.item;
189 first.item = null;
190 return x;
191 }
192
193 /**
194 * Locks to prevent both puts and takes.
195 */
196 void fullyLock() {
197 putLock.lock();
198 takeLock.lock();
199 }
200
201 /**
202 * Unlocks to allow both puts and takes.
203 */
204 void fullyUnlock() {
205 takeLock.unlock();
206 putLock.unlock();
207 }
208
209 /**
210 * Creates a {@code LinkedBlockingQueue} with a capacity of
211 * {@link Integer#MAX_VALUE}.
212 */
213 public LinkedBlockingQueue() {
214 this(Integer.MAX_VALUE);
215 }
216
217 /**
218 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
219 *
220 * @param capacity the capacity of this queue
221 * @throws IllegalArgumentException if {@code capacity} is not greater
222 * than zero
223 */
224 public LinkedBlockingQueue(int capacity) {
225 if (capacity <= 0) throw new IllegalArgumentException();
226 this.capacity = capacity;
227 last = head = new Node<E>(null);
228 }
229
230 /**
231 * Creates a {@code LinkedBlockingQueue} with a capacity of
232 * {@link Integer#MAX_VALUE}, initially containing the elements of the
233 * given collection,
234 * added in traversal order of the collection's iterator.
235 *
236 * @param c the collection of elements to initially contain
237 * @throws NullPointerException if the specified collection or any
238 * of its elements are null
239 */
240 public LinkedBlockingQueue(Collection<? extends E> c) {
241 this(Integer.MAX_VALUE);
242 final ReentrantLock putLock = this.putLock;
243 putLock.lock(); // Never contended, but necessary for visibility
244 try {
245 int n = 0;
246 for (E e : c) {
247 if (e == null)
248 throw new NullPointerException();
249 if (n == capacity)
250 throw new IllegalStateException("Queue full");
251 enqueue(new Node<E>(e));
252 ++n;
253 }
254 count.set(n);
255 } finally {
256 putLock.unlock();
257 }
258 }
259
260 // this doc comment is overridden to remove the reference to collections
261 // greater in size than Integer.MAX_VALUE
262 /**
263 * Returns the number of elements in this queue.
264 *
265 * @return the number of elements in this queue
266 */
267 public int size() {
268 return count.get();
269 }
270
271 // this doc comment is a modified copy of the inherited doc comment,
272 // without the reference to unlimited queues.
273 /**
274 * Returns the number of additional elements that this queue can ideally
275 * (in the absence of memory or resource constraints) accept without
276 * blocking. This is always equal to the initial capacity of this queue
277 * less the current {@code size} of this queue.
278 *
279 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
280 * an element will succeed by inspecting {@code remainingCapacity}
281 * because it may be the case that another thread is about to
282 * insert or remove an element.
283 */
284 public int remainingCapacity() {
285 return capacity - count.get();
286 }
287
288 /**
289 * Inserts the specified element at the tail of this queue, waiting if
290 * necessary for space to become available.
291 *
292 * @throws InterruptedException {@inheritDoc}
293 * @throws NullPointerException {@inheritDoc}
294 */
295 public void put(E e) throws InterruptedException {
296 if (e == null) throw new NullPointerException();
297 // Note: convention in all put/take/etc is to preset local var
298 // holding count negative to indicate failure unless set.
299 int c = -1;
300 Node<E> node = new Node<E>(e);
301 final ReentrantLock putLock = this.putLock;
302 final AtomicInteger count = this.count;
303 putLock.lockInterruptibly();
304 try {
305 /*
306 * Note that count is used in wait guard even though it is
307 * not protected by lock. This works because count can
308 * only decrease at this point (all other puts are shut
309 * out by lock), and we (or some other waiting put) are
310 * signalled if it ever changes from capacity. Similarly
311 * for all other uses of count in other wait guards.
312 */
313 while (count.get() == capacity) {
314 notFull.await();
315 }
316 enqueue(node);
317 c = count.getAndIncrement();
318 if (c + 1 < capacity)
319 notFull.signal();
320 } finally {
321 putLock.unlock();
322 }
323 if (c == 0)
324 signalNotEmpty();
325 }
326
327 /**
328 * Inserts the specified element at the tail of this queue, waiting if
329 * necessary up to the specified wait time for space to become available.
330 *
331 * @return {@code true} if successful, or {@code false} if
332 * the specified waiting time elapses before space is available
333 * @throws InterruptedException {@inheritDoc}
334 * @throws NullPointerException {@inheritDoc}
335 */
336 public boolean offer(E e, long timeout, TimeUnit unit)
337 throws InterruptedException {
338
339 if (e == null) throw new NullPointerException();
340 long nanos = unit.toNanos(timeout);
341 int c = -1;
342 final ReentrantLock putLock = this.putLock;
343 final AtomicInteger count = this.count;
344 putLock.lockInterruptibly();
345 try {
346 while (count.get() == capacity) {
347 if (nanos <= 0L)
348 return false;
349 nanos = notFull.awaitNanos(nanos);
350 }
351 enqueue(new Node<E>(e));
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
356 putLock.unlock();
357 }
358 if (c == 0)
359 signalNotEmpty();
360 return true;
361 }
362
363 /**
364 * Inserts the specified element at the tail of this queue if it is
365 * possible to do so immediately without exceeding the queue's capacity,
366 * returning {@code true} upon success and {@code false} if this queue
367 * is full.
368 * When using a capacity-restricted queue, this method is generally
369 * preferable to method {@link BlockingQueue#add add}, which can fail to
370 * insert an element only by throwing an exception.
371 *
372 * @throws NullPointerException if the specified element is null
373 */
374 public boolean offer(E e) {
375 if (e == null) throw new NullPointerException();
376 final AtomicInteger count = this.count;
377 if (count.get() == capacity)
378 return false;
379 int c = -1;
380 Node<E> node = new Node<E>(e);
381 final ReentrantLock putLock = this.putLock;
382 putLock.lock();
383 try {
384 if (count.get() < capacity) {
385 enqueue(node);
386 c = count.getAndIncrement();
387 if (c + 1 < capacity)
388 notFull.signal();
389 }
390 } finally {
391 putLock.unlock();
392 }
393 if (c == 0)
394 signalNotEmpty();
395 return c >= 0;
396 }
397
398 public E take() throws InterruptedException {
399 E x;
400 int c = -1;
401 final AtomicInteger count = this.count;
402 final ReentrantLock takeLock = this.takeLock;
403 takeLock.lockInterruptibly();
404 try {
405 while (count.get() == 0) {
406 notEmpty.await();
407 }
408 x = dequeue();
409 c = count.getAndDecrement();
410 if (c > 1)
411 notEmpty.signal();
412 } finally {
413 takeLock.unlock();
414 }
415 if (c == capacity)
416 signalNotFull();
417 return x;
418 }
419
420 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
421 E x = null;
422 int c = -1;
423 long nanos = unit.toNanos(timeout);
424 final AtomicInteger count = this.count;
425 final ReentrantLock takeLock = this.takeLock;
426 takeLock.lockInterruptibly();
427 try {
428 while (count.get() == 0) {
429 if (nanos <= 0L)
430 return null;
431 nanos = notEmpty.awaitNanos(nanos);
432 }
433 x = dequeue();
434 c = count.getAndDecrement();
435 if (c > 1)
436 notEmpty.signal();
437 } finally {
438 takeLock.unlock();
439 }
440 if (c == capacity)
441 signalNotFull();
442 return x;
443 }
444
445 public E poll() {
446 final AtomicInteger count = this.count;
447 if (count.get() == 0)
448 return null;
449 E x = null;
450 int c = -1;
451 final ReentrantLock takeLock = this.takeLock;
452 takeLock.lock();
453 try {
454 if (count.get() > 0) {
455 x = dequeue();
456 c = count.getAndDecrement();
457 if (c > 1)
458 notEmpty.signal();
459 }
460 } finally {
461 takeLock.unlock();
462 }
463 if (c == capacity)
464 signalNotFull();
465 return x;
466 }
467
468 public E peek() {
469 if (count.get() == 0)
470 return null;
471 final ReentrantLock takeLock = this.takeLock;
472 takeLock.lock();
473 try {
474 return (count.get() > 0) ? head.next.item : null;
475 } finally {
476 takeLock.unlock();
477 }
478 }
479
480 /**
481 * Unlinks interior Node p with predecessor trail.
482 */
483 void unlink(Node<E> p, Node<E> trail) {
484 // assert putLock.isHeldByCurrentThread();
485 // assert takeLock.isHeldByCurrentThread();
486 // p.next is not changed, to allow iterators that are
487 // traversing p to maintain their weak-consistency guarantee.
488 p.item = null;
489 trail.next = p.next;
490 if (last == p)
491 last = trail;
492 if (count.getAndDecrement() == capacity)
493 notFull.signal();
494 }
495
496 /**
497 * Removes a single instance of the specified element from this queue,
498 * if it is present. More formally, removes an element {@code e} such
499 * that {@code o.equals(e)}, if this queue contains one or more such
500 * elements.
501 * Returns {@code true} if this queue contained the specified element
502 * (or equivalently, if this queue changed as a result of the call).
503 *
504 * @param o element to be removed from this queue, if present
505 * @return {@code true} if this queue changed as a result of the call
506 */
507 public boolean remove(Object o) {
508 if (o == null) return false;
509 fullyLock();
510 try {
511 for (Node<E> trail = head, p = trail.next;
512 p != null;
513 trail = p, p = p.next) {
514 if (o.equals(p.item)) {
515 unlink(p, trail);
516 return true;
517 }
518 }
519 return false;
520 } finally {
521 fullyUnlock();
522 }
523 }
524
525 /**
526 * Returns {@code true} if this queue contains the specified element.
527 * More formally, returns {@code true} if and only if this queue contains
528 * at least one element {@code e} such that {@code o.equals(e)}.
529 *
530 * @param o object to be checked for containment in this queue
531 * @return {@code true} if this queue contains the specified element
532 */
533 public boolean contains(Object o) {
534 if (o == null) return false;
535 fullyLock();
536 try {
537 for (Node<E> p = head.next; p != null; p = p.next)
538 if (o.equals(p.item))
539 return true;
540 return false;
541 } finally {
542 fullyUnlock();
543 }
544 }
545
546 /**
547 * Returns an array containing all of the elements in this queue, in
548 * proper sequence.
549 *
550 * <p>The returned array will be "safe" in that no references to it are
551 * maintained by this queue. (In other words, this method must allocate
552 * a new array). The caller is thus free to modify the returned array.
553 *
554 * <p>This method acts as bridge between array-based and collection-based
555 * APIs.
556 *
557 * @return an array containing all of the elements in this queue
558 */
559 public Object[] toArray() {
560 fullyLock();
561 try {
562 int size = count.get();
563 Object[] a = new Object[size];
564 int k = 0;
565 for (Node<E> p = head.next; p != null; p = p.next)
566 a[k++] = p.item;
567 return a;
568 } finally {
569 fullyUnlock();
570 }
571 }
572
573 /**
574 * Returns an array containing all of the elements in this queue, in
575 * proper sequence; the runtime type of the returned array is that of
576 * the specified array. If the queue fits in the specified array, it
577 * is returned therein. Otherwise, a new array is allocated with the
578 * runtime type of the specified array and the size of this queue.
579 *
580 * <p>If this queue fits in the specified array with room to spare
581 * (i.e., the array has more elements than this queue), the element in
582 * the array immediately following the end of the queue is set to
583 * {@code null}.
584 *
585 * <p>Like the {@link #toArray()} method, this method acts as bridge between
586 * array-based and collection-based APIs. Further, this method allows
587 * precise control over the runtime type of the output array, and may,
588 * under certain circumstances, be used to save allocation costs.
589 *
590 * <p>Suppose {@code x} is a queue known to contain only strings.
591 * The following code can be used to dump the queue into a newly
592 * allocated array of {@code String}:
593 *
594 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
595 *
596 * Note that {@code toArray(new Object[0])} is identical in function to
597 * {@code toArray()}.
598 *
599 * @param a the array into which the elements of the queue are to
600 * be stored, if it is big enough; otherwise, a new array of the
601 * same runtime type is allocated for this purpose
602 * @return an array containing all of the elements in this queue
603 * @throws ArrayStoreException if the runtime type of the specified array
604 * is not a supertype of the runtime type of every element in
605 * this queue
606 * @throws NullPointerException if the specified array is null
607 */
608 @SuppressWarnings("unchecked")
609 public <T> T[] toArray(T[] a) {
610 fullyLock();
611 try {
612 int size = count.get();
613 if (a.length < size)
614 a = (T[])java.lang.reflect.Array.newInstance
615 (a.getClass().getComponentType(), size);
616
617 int k = 0;
618 for (Node<E> p = head.next; p != null; p = p.next)
619 a[k++] = (T)p.item;
620 if (a.length > k)
621 a[k] = null;
622 return a;
623 } finally {
624 fullyUnlock();
625 }
626 }
627
628 public String toString() {
629 return Helpers.collectionToString(this);
630 }
631
632 /**
633 * Atomically removes all of the elements from this queue.
634 * The queue will be empty after this call returns.
635 */
636 public void clear() {
637 fullyLock();
638 try {
639 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
640 h.next = h;
641 p.item = null;
642 }
643 head = last;
644 // assert head.item == null && head.next == null;
645 if (count.getAndSet(0) == capacity)
646 notFull.signal();
647 } finally {
648 fullyUnlock();
649 }
650 }
651
652 /**
653 * @throws UnsupportedOperationException {@inheritDoc}
654 * @throws ClassCastException {@inheritDoc}
655 * @throws NullPointerException {@inheritDoc}
656 * @throws IllegalArgumentException {@inheritDoc}
657 */
658 public int drainTo(Collection<? super E> c) {
659 return drainTo(c, Integer.MAX_VALUE);
660 }
661
662 /**
663 * @throws UnsupportedOperationException {@inheritDoc}
664 * @throws ClassCastException {@inheritDoc}
665 * @throws NullPointerException {@inheritDoc}
666 * @throws IllegalArgumentException {@inheritDoc}
667 */
668 public int drainTo(Collection<? super E> c, int maxElements) {
669 Objects.requireNonNull(c);
670 if (c == this)
671 throw new IllegalArgumentException();
672 if (maxElements <= 0)
673 return 0;
674 boolean signalNotFull = false;
675 final ReentrantLock takeLock = this.takeLock;
676 takeLock.lock();
677 try {
678 int n = Math.min(maxElements, count.get());
679 // count.get provides visibility to first n Nodes
680 Node<E> h = head;
681 int i = 0;
682 try {
683 while (i < n) {
684 Node<E> p = h.next;
685 c.add(p.item);
686 p.item = null;
687 h.next = h;
688 h = p;
689 ++i;
690 }
691 return n;
692 } finally {
693 // Restore invariants even if c.add() threw
694 if (i > 0) {
695 // assert h.item == null;
696 head = h;
697 signalNotFull = (count.getAndAdd(-i) == capacity);
698 }
699 }
700 } finally {
701 takeLock.unlock();
702 if (signalNotFull)
703 signalNotFull();
704 }
705 }
706
707 /**
708 * Used for any element traversal that is not entirely under lock.
709 * Such traversals must handle both:
710 * - dequeued nodes (p.next == p)
711 * - (possibly multiple) interior removed nodes (p.item == null)
712 */
713 Node<E> succ(Node<E> p) {
714 return (p == (p = p.next)) ? head.next : p;
715 }
716
717 /**
718 * Returns an iterator over the elements in this queue in proper sequence.
719 * The elements will be returned in order from first (head) to last (tail).
720 *
721 * <p>The returned iterator is
722 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
723 *
724 * @return an iterator over the elements in this queue in proper sequence
725 */
726 public Iterator<E> iterator() {
727 return new Itr();
728 }
729
730 private class Itr implements Iterator<E> {
731 /*
732 * Basic weakly-consistent iterator. At all times hold the next
733 * item to hand out so that if hasNext() reports true, we will
734 * still have it to return even if lost race with a take etc.
735 */
736
737 private Node<E> current;
738 private Node<E> lastRet;
739 private E currentElement;
740
741 Itr() {
742 fullyLock();
743 try {
744 if ((current = head.next) != null)
745 currentElement = current.item;
746 } finally {
747 fullyUnlock();
748 }
749 }
750
751 public boolean hasNext() {
752 return current != null;
753 }
754
755 public E next() {
756 Node<E> p;
757 if ((p = current) == null)
758 throw new NoSuchElementException();
759 E ret = currentElement, e = null;
760 lastRet = p;
761 fullyLock();
762 try {
763 for (p = p.next; p != null; p = succ(p))
764 if ((e = p.item) != null)
765 break;
766 } finally {
767 fullyUnlock();
768 }
769 current = p;
770 currentElement = e;
771 return ret;
772 }
773
774 public void forEachRemaining(Consumer<? super E> action) {
775 // A variant of forEachFrom
776 Objects.requireNonNull(action);
777 Node<E> p;
778 if ((p = current) == null) return;
779 lastRet = current;
780 current = null;
781 final int batchSize = 32;
782 Object[] es = null;
783 int n, len = 1;
784 do {
785 fullyLock();
786 try {
787 if (es == null) {
788 p = p.next;
789 for (Node<E> q = p; q != null; q = succ(q))
790 if (q.item != null && ++len == batchSize)
791 break;
792 es = new Object[len];
793 es[0] = currentElement;
794 currentElement = null;
795 n = 1;
796 } else
797 n = 0;
798 for (; p != null && n < len; p = succ(p))
799 if ((es[n] = p.item) != null) {
800 lastRet = p;
801 n++;
802 }
803 } finally {
804 fullyUnlock();
805 }
806 for (int i = 0; i < n; i++) {
807 @SuppressWarnings("unchecked") E e = (E) es[i];
808 action.accept(e);
809 }
810 } while (n > 0 && p != null);
811 }
812
813 public void remove() {
814 if (lastRet == null)
815 throw new IllegalStateException();
816 fullyLock();
817 try {
818 Node<E> node = lastRet;
819 lastRet = null;
820 for (Node<E> trail = head, p = trail.next;
821 p != null;
822 trail = p, p = p.next) {
823 if (p == node) {
824 unlink(p, trail);
825 break;
826 }
827 }
828 } finally {
829 fullyUnlock();
830 }
831 }
832 }
833
834 /**
835 * A customized variant of Spliterators.IteratorSpliterator.
836 * Keep this class in sync with (very similar) LBDSpliterator.
837 */
838 private final class LBQSpliterator implements Spliterator<E> {
839 static final int MAX_BATCH = 1 << 25; // max batch array size;
840 Node<E> current; // current node; null until initialized
841 int batch; // batch size for splits
842 boolean exhausted; // true when no more nodes
843 long est = size(); // size estimate
844
845 LBQSpliterator() {}
846
847 public long estimateSize() { return est; }
848
849 public Spliterator<E> trySplit() {
850 Node<E> h;
851 int b = batch;
852 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
853 if (!exhausted &&
854 ((h = current) != null || (h = head.next) != null)
855 && h.next != null) {
856 Object[] a = new Object[n];
857 int i = 0;
858 Node<E> p = current;
859 fullyLock();
860 try {
861 if (p != null || (p = head.next) != null)
862 for (; p != null && i < n; p = succ(p))
863 if ((a[i] = p.item) != null)
864 i++;
865 } finally {
866 fullyUnlock();
867 }
868 if ((current = p) == null) {
869 est = 0L;
870 exhausted = true;
871 }
872 else if ((est -= i) < 0L)
873 est = 0L;
874 if (i > 0) {
875 batch = i;
876 return Spliterators.spliterator
877 (a, 0, i, (Spliterator.ORDERED |
878 Spliterator.NONNULL |
879 Spliterator.CONCURRENT));
880 }
881 }
882 return null;
883 }
884
885 public boolean tryAdvance(Consumer<? super E> action) {
886 Objects.requireNonNull(action);
887 if (!exhausted) {
888 Node<E> p = current;
889 E e = null;
890 fullyLock();
891 try {
892 if (p != null || (p = head.next) != null)
893 do {
894 e = p.item;
895 p = succ(p);
896 } while (e == null && p != null);
897 } finally {
898 fullyUnlock();
899 }
900 exhausted = ((current = p) == null);
901 if (e != null) {
902 action.accept(e);
903 return true;
904 }
905 }
906 return false;
907 }
908
909 public void forEachRemaining(Consumer<? super E> action) {
910 Objects.requireNonNull(action);
911 if (!exhausted) {
912 exhausted = true;
913 Node<E> p = current;
914 current = null;
915 forEachFrom(action, p);
916 }
917 }
918
919 public int characteristics() {
920 return (Spliterator.ORDERED |
921 Spliterator.NONNULL |
922 Spliterator.CONCURRENT);
923 }
924 }
925
926 /**
927 * Returns a {@link Spliterator} over the elements in this queue.
928 *
929 * <p>The returned spliterator is
930 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
931 *
932 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
933 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
934 *
935 * @implNote
936 * The {@code Spliterator} implements {@code trySplit} to permit limited
937 * parallelism.
938 *
939 * @return a {@code Spliterator} over the elements in this queue
940 * @since 1.8
941 */
942 public Spliterator<E> spliterator() {
943 return new LBQSpliterator();
944 }
945
946 /**
947 * @throws NullPointerException {@inheritDoc}
948 */
949 public void forEach(Consumer<? super E> action) {
950 Objects.requireNonNull(action);
951 forEachFrom(action, null);
952 }
953
954 /**
955 * Runs action on each element found during a traversal starting at p.
956 * If p is null, traversal starts at head.
957 */
958 void forEachFrom(Consumer<? super E> action, Node<E> p) {
959 // Extract batches of elements while holding the lock; then
960 // run the action on the elements while not
961 final int batchSize = 32; // max number of elements per batch
962 Object[] es = null; // container for batch of elements
963 int n, len = 0;
964 do {
965 fullyLock();
966 try {
967 if (es == null) {
968 if (p == null) p = head.next;
969 for (Node<E> q = p; q != null; q = succ(q))
970 if (q.item != null && ++len == batchSize)
971 break;
972 es = new Object[len];
973 }
974 for (n = 0; p != null && n < len; p = succ(p))
975 if ((es[n] = p.item) != null)
976 n++;
977 } finally {
978 fullyUnlock();
979 }
980 for (int i = 0; i < n; i++) {
981 @SuppressWarnings("unchecked") E e = (E) es[i];
982 action.accept(e);
983 }
984 } while (n > 0 && p != null);
985 }
986
987 /**
988 * Saves this queue to a stream (that is, serializes it).
989 *
990 * @param s the stream
991 * @throws java.io.IOException if an I/O error occurs
992 * @serialData The capacity is emitted (int), followed by all of
993 * its elements (each an {@code Object}) in the proper order,
994 * followed by a null
995 */
996 private void writeObject(java.io.ObjectOutputStream s)
997 throws java.io.IOException {
998
999 fullyLock();
1000 try {
1001 // Write out any hidden stuff, plus capacity
1002 s.defaultWriteObject();
1003
1004 // Write out all elements in the proper order.
1005 for (Node<E> p = head.next; p != null; p = p.next)
1006 s.writeObject(p.item);
1007
1008 // Use trailing null as sentinel
1009 s.writeObject(null);
1010 } finally {
1011 fullyUnlock();
1012 }
1013 }
1014
1015 /**
1016 * Reconstitutes this queue from a stream (that is, deserializes it).
1017 * @param s the stream
1018 * @throws ClassNotFoundException if the class of a serialized object
1019 * could not be found
1020 * @throws java.io.IOException if an I/O error occurs
1021 */
1022 private void readObject(java.io.ObjectInputStream s)
1023 throws java.io.IOException, ClassNotFoundException {
1024 // Read in capacity, and any hidden stuff
1025 s.defaultReadObject();
1026
1027 count.set(0);
1028 last = head = new Node<E>(null);
1029
1030 // Read in all elements and place in queue
1031 for (;;) {
1032 @SuppressWarnings("unchecked")
1033 E item = (E)s.readObject();
1034 if (item == null)
1035 break;
1036 add(item);
1037 }
1038 }
1039 }