ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.115
Committed: Thu Oct 17 01:51:38 2019 UTC (4 years, 7 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.114: +2 -0 lines
Log Message:
8232230: Suppress warnings on non-serializable non-transient instance fields in java.util.concurrent

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.Iterator;
12 import java.util.NoSuchElementException;
13 import java.util.Objects;
14 import java.util.Spliterator;
15 import java.util.Spliterators;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.function.Consumer;
20 import java.util.function.Predicate;
21
22 /**
23 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
24 * linked nodes.
25 * This queue orders elements FIFO (first-in-first-out).
26 * The <em>head</em> of the queue is that element that has been on the
27 * queue the longest time.
28 * The <em>tail</em> of the queue is that element that has been on the
29 * queue the shortest time. New elements
30 * are inserted at the tail of the queue, and the queue retrieval
31 * operations obtain elements at the head of the queue.
32 * Linked queues typically have higher throughput than array-based queues but
33 * less predictable performance in most concurrent applications.
34 *
35 * <p>The optional capacity bound constructor argument serves as a
36 * way to prevent excessive queue expansion. The capacity, if unspecified,
37 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
38 * dynamically created upon each insertion unless this would bring the
39 * queue above capacity.
40 *
41 * <p>This class and its iterator implement all of the <em>optional</em>
42 * methods of the {@link Collection} and {@link Iterator} interfaces.
43 *
44 * <p>This class is a member of the
45 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
46 * Java Collections Framework</a>.
47 *
48 * @since 1.5
49 * @author Doug Lea
50 * @param <E> the type of elements held in this queue
51 */
52 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
54 private static final long serialVersionUID = -6903933977591709194L;
55
56 /*
57 * A variant of the "two lock queue" algorithm. The putLock gates
58 * entry to put (and offer), and has an associated condition for
59 * waiting puts. Similarly for the takeLock. The "count" field
60 * that they both rely on is maintained as an atomic to avoid
61 * needing to get both locks in most cases. Also, to minimize need
62 * for puts to get takeLock and vice-versa, cascading notifies are
63 * used. When a put notices that it has enabled at least one take,
64 * it signals taker. That taker in turn signals others if more
65 * items have been entered since the signal. And symmetrically for
66 * takes signalling puts. Operations such as remove(Object) and
67 * iterators acquire both locks.
68 *
69 * Visibility between writers and readers is provided as follows:
70 *
71 * Whenever an element is enqueued, the putLock is acquired and
72 * count updated. A subsequent reader guarantees visibility to the
73 * enqueued Node by either acquiring the putLock (via fullyLock)
74 * or by acquiring the takeLock, and then reading n = count.get();
75 * this gives visibility to the first n items.
76 *
77 * To implement weakly consistent iterators, it appears we need to
78 * keep all Nodes GC-reachable from a predecessor dequeued Node.
79 * That would cause two problems:
80 * - allow a rogue Iterator to cause unbounded memory retention
81 * - cause cross-generational linking of old Nodes to new Nodes if
82 * a Node was tenured while live, which generational GCs have a
83 * hard time dealing with, causing repeated major collections.
84 * However, only non-deleted Nodes need to be reachable from
85 * dequeued Nodes, and reachability does not necessarily have to
86 * be of the kind understood by the GC. We use the trick of
87 * linking a Node that has just been dequeued to itself. Such a
88 * self-link implicitly means to advance to head.next.
89 */
90
91 /**
92 * Linked list node class.
93 */
94 static class Node<E> {
95 E item;
96
97 /**
98 * One of:
99 * - the real successor Node
100 * - this Node, meaning the successor is head.next
101 * - null, meaning there is no successor (this is the last node)
102 */
103 Node<E> next;
104
105 Node(E x) { item = x; }
106 }
107
108 /** The capacity bound, or Integer.MAX_VALUE if none */
109 private final int capacity;
110
111 /** Current number of elements */
112 private final AtomicInteger count = new AtomicInteger();
113
114 /**
115 * Head of linked list.
116 * Invariant: head.item == null
117 */
118 transient Node<E> head;
119
120 /**
121 * Tail of linked list.
122 * Invariant: last.next == null
123 */
124 private transient Node<E> last;
125
126 /** Lock held by take, poll, etc */
127 private final ReentrantLock takeLock = new ReentrantLock();
128
129 /** Wait queue for waiting takes */
130 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
131 private final Condition notEmpty = takeLock.newCondition();
132
133 /** Lock held by put, offer, etc */
134 private final ReentrantLock putLock = new ReentrantLock();
135
136 /** Wait queue for waiting puts */
137 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
138 private final Condition notFull = putLock.newCondition();
139
140 /**
141 * Signals a waiting take. Called only from put/offer (which do not
142 * otherwise ordinarily lock takeLock.)
143 */
144 private void signalNotEmpty() {
145 final ReentrantLock takeLock = this.takeLock;
146 takeLock.lock();
147 try {
148 notEmpty.signal();
149 } finally {
150 takeLock.unlock();
151 }
152 }
153
154 /**
155 * Signals a waiting put. Called only from take/poll.
156 */
157 private void signalNotFull() {
158 final ReentrantLock putLock = this.putLock;
159 putLock.lock();
160 try {
161 notFull.signal();
162 } finally {
163 putLock.unlock();
164 }
165 }
166
167 /**
168 * Links node at end of queue.
169 *
170 * @param node the node
171 */
172 private void enqueue(Node<E> node) {
173 // assert putLock.isHeldByCurrentThread();
174 // assert last.next == null;
175 last = last.next = node;
176 }
177
178 /**
179 * Removes a node from head of queue.
180 *
181 * @return the node
182 */
183 private E dequeue() {
184 // assert takeLock.isHeldByCurrentThread();
185 // assert head.item == null;
186 Node<E> h = head;
187 Node<E> first = h.next;
188 h.next = h; // help GC
189 head = first;
190 E x = first.item;
191 first.item = null;
192 return x;
193 }
194
195 /**
196 * Locks to prevent both puts and takes.
197 */
198 void fullyLock() {
199 putLock.lock();
200 takeLock.lock();
201 }
202
203 /**
204 * Unlocks to allow both puts and takes.
205 */
206 void fullyUnlock() {
207 takeLock.unlock();
208 putLock.unlock();
209 }
210
211 /**
212 * Creates a {@code LinkedBlockingQueue} with a capacity of
213 * {@link Integer#MAX_VALUE}.
214 */
215 public LinkedBlockingQueue() {
216 this(Integer.MAX_VALUE);
217 }
218
219 /**
220 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
221 *
222 * @param capacity the capacity of this queue
223 * @throws IllegalArgumentException if {@code capacity} is not greater
224 * than zero
225 */
226 public LinkedBlockingQueue(int capacity) {
227 if (capacity <= 0) throw new IllegalArgumentException();
228 this.capacity = capacity;
229 last = head = new Node<E>(null);
230 }
231
232 /**
233 * Creates a {@code LinkedBlockingQueue} with a capacity of
234 * {@link Integer#MAX_VALUE}, initially containing the elements of the
235 * given collection,
236 * added in traversal order of the collection's iterator.
237 *
238 * @param c the collection of elements to initially contain
239 * @throws NullPointerException if the specified collection or any
240 * of its elements are null
241 */
242 public LinkedBlockingQueue(Collection<? extends E> c) {
243 this(Integer.MAX_VALUE);
244 final ReentrantLock putLock = this.putLock;
245 putLock.lock(); // Never contended, but necessary for visibility
246 try {
247 int n = 0;
248 for (E e : c) {
249 if (e == null)
250 throw new NullPointerException();
251 if (n == capacity)
252 throw new IllegalStateException("Queue full");
253 enqueue(new Node<E>(e));
254 ++n;
255 }
256 count.set(n);
257 } finally {
258 putLock.unlock();
259 }
260 }
261
262 // this doc comment is overridden to remove the reference to collections
263 // greater in size than Integer.MAX_VALUE
264 /**
265 * Returns the number of elements in this queue.
266 *
267 * @return the number of elements in this queue
268 */
269 public int size() {
270 return count.get();
271 }
272
273 // this doc comment is a modified copy of the inherited doc comment,
274 // without the reference to unlimited queues.
275 /**
276 * Returns the number of additional elements that this queue can ideally
277 * (in the absence of memory or resource constraints) accept without
278 * blocking. This is always equal to the initial capacity of this queue
279 * less the current {@code size} of this queue.
280 *
281 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
282 * an element will succeed by inspecting {@code remainingCapacity}
283 * because it may be the case that another thread is about to
284 * insert or remove an element.
285 */
286 public int remainingCapacity() {
287 return capacity - count.get();
288 }
289
290 /**
291 * Inserts the specified element at the tail of this queue, waiting if
292 * necessary for space to become available.
293 *
294 * @throws InterruptedException {@inheritDoc}
295 * @throws NullPointerException {@inheritDoc}
296 */
297 public void put(E e) throws InterruptedException {
298 if (e == null) throw new NullPointerException();
299 final int c;
300 final Node<E> node = new Node<E>(e);
301 final ReentrantLock putLock = this.putLock;
302 final AtomicInteger count = this.count;
303 putLock.lockInterruptibly();
304 try {
305 /*
306 * Note that count is used in wait guard even though it is
307 * not protected by lock. This works because count can
308 * only decrease at this point (all other puts are shut
309 * out by lock), and we (or some other waiting put) are
310 * signalled if it ever changes from capacity. Similarly
311 * for all other uses of count in other wait guards.
312 */
313 while (count.get() == capacity) {
314 notFull.await();
315 }
316 enqueue(node);
317 c = count.getAndIncrement();
318 if (c + 1 < capacity)
319 notFull.signal();
320 } finally {
321 putLock.unlock();
322 }
323 if (c == 0)
324 signalNotEmpty();
325 }
326
327 /**
328 * Inserts the specified element at the tail of this queue, waiting if
329 * necessary up to the specified wait time for space to become available.
330 *
331 * @return {@code true} if successful, or {@code false} if
332 * the specified waiting time elapses before space is available
333 * @throws InterruptedException {@inheritDoc}
334 * @throws NullPointerException {@inheritDoc}
335 */
336 public boolean offer(E e, long timeout, TimeUnit unit)
337 throws InterruptedException {
338
339 if (e == null) throw new NullPointerException();
340 long nanos = unit.toNanos(timeout);
341 final int c;
342 final ReentrantLock putLock = this.putLock;
343 final AtomicInteger count = this.count;
344 putLock.lockInterruptibly();
345 try {
346 while (count.get() == capacity) {
347 if (nanos <= 0L)
348 return false;
349 nanos = notFull.awaitNanos(nanos);
350 }
351 enqueue(new Node<E>(e));
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
356 putLock.unlock();
357 }
358 if (c == 0)
359 signalNotEmpty();
360 return true;
361 }
362
363 /**
364 * Inserts the specified element at the tail of this queue if it is
365 * possible to do so immediately without exceeding the queue's capacity,
366 * returning {@code true} upon success and {@code false} if this queue
367 * is full.
368 * When using a capacity-restricted queue, this method is generally
369 * preferable to method {@link BlockingQueue#add add}, which can fail to
370 * insert an element only by throwing an exception.
371 *
372 * @throws NullPointerException if the specified element is null
373 */
374 public boolean offer(E e) {
375 if (e == null) throw new NullPointerException();
376 final AtomicInteger count = this.count;
377 if (count.get() == capacity)
378 return false;
379 final int c;
380 final Node<E> node = new Node<E>(e);
381 final ReentrantLock putLock = this.putLock;
382 putLock.lock();
383 try {
384 if (count.get() == capacity)
385 return false;
386 enqueue(node);
387 c = count.getAndIncrement();
388 if (c + 1 < capacity)
389 notFull.signal();
390 } finally {
391 putLock.unlock();
392 }
393 if (c == 0)
394 signalNotEmpty();
395 return true;
396 }
397
398 public E take() throws InterruptedException {
399 final E x;
400 final int c;
401 final AtomicInteger count = this.count;
402 final ReentrantLock takeLock = this.takeLock;
403 takeLock.lockInterruptibly();
404 try {
405 while (count.get() == 0) {
406 notEmpty.await();
407 }
408 x = dequeue();
409 c = count.getAndDecrement();
410 if (c > 1)
411 notEmpty.signal();
412 } finally {
413 takeLock.unlock();
414 }
415 if (c == capacity)
416 signalNotFull();
417 return x;
418 }
419
420 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
421 final E x;
422 final int c;
423 long nanos = unit.toNanos(timeout);
424 final AtomicInteger count = this.count;
425 final ReentrantLock takeLock = this.takeLock;
426 takeLock.lockInterruptibly();
427 try {
428 while (count.get() == 0) {
429 if (nanos <= 0L)
430 return null;
431 nanos = notEmpty.awaitNanos(nanos);
432 }
433 x = dequeue();
434 c = count.getAndDecrement();
435 if (c > 1)
436 notEmpty.signal();
437 } finally {
438 takeLock.unlock();
439 }
440 if (c == capacity)
441 signalNotFull();
442 return x;
443 }
444
445 public E poll() {
446 final AtomicInteger count = this.count;
447 if (count.get() == 0)
448 return null;
449 final E x;
450 final int c;
451 final ReentrantLock takeLock = this.takeLock;
452 takeLock.lock();
453 try {
454 if (count.get() == 0)
455 return null;
456 x = dequeue();
457 c = count.getAndDecrement();
458 if (c > 1)
459 notEmpty.signal();
460 } finally {
461 takeLock.unlock();
462 }
463 if (c == capacity)
464 signalNotFull();
465 return x;
466 }
467
468 public E peek() {
469 final AtomicInteger count = this.count;
470 if (count.get() == 0)
471 return null;
472 final ReentrantLock takeLock = this.takeLock;
473 takeLock.lock();
474 try {
475 return (count.get() > 0) ? head.next.item : null;
476 } finally {
477 takeLock.unlock();
478 }
479 }
480
481 /**
482 * Unlinks interior Node p with predecessor pred.
483 */
484 void unlink(Node<E> p, Node<E> pred) {
485 // assert putLock.isHeldByCurrentThread();
486 // assert takeLock.isHeldByCurrentThread();
487 // p.next is not changed, to allow iterators that are
488 // traversing p to maintain their weak-consistency guarantee.
489 p.item = null;
490 pred.next = p.next;
491 if (last == p)
492 last = pred;
493 if (count.getAndDecrement() == capacity)
494 notFull.signal();
495 }
496
497 /**
498 * Removes a single instance of the specified element from this queue,
499 * if it is present. More formally, removes an element {@code e} such
500 * that {@code o.equals(e)}, if this queue contains one or more such
501 * elements.
502 * Returns {@code true} if this queue contained the specified element
503 * (or equivalently, if this queue changed as a result of the call).
504 *
505 * @param o element to be removed from this queue, if present
506 * @return {@code true} if this queue changed as a result of the call
507 */
508 public boolean remove(Object o) {
509 if (o == null) return false;
510 fullyLock();
511 try {
512 for (Node<E> pred = head, p = pred.next;
513 p != null;
514 pred = p, p = p.next) {
515 if (o.equals(p.item)) {
516 unlink(p, pred);
517 return true;
518 }
519 }
520 return false;
521 } finally {
522 fullyUnlock();
523 }
524 }
525
526 /**
527 * Returns {@code true} if this queue contains the specified element.
528 * More formally, returns {@code true} if and only if this queue contains
529 * at least one element {@code e} such that {@code o.equals(e)}.
530 *
531 * @param o object to be checked for containment in this queue
532 * @return {@code true} if this queue contains the specified element
533 */
534 public boolean contains(Object o) {
535 if (o == null) return false;
536 fullyLock();
537 try {
538 for (Node<E> p = head.next; p != null; p = p.next)
539 if (o.equals(p.item))
540 return true;
541 return false;
542 } finally {
543 fullyUnlock();
544 }
545 }
546
547 /**
548 * Returns an array containing all of the elements in this queue, in
549 * proper sequence.
550 *
551 * <p>The returned array will be "safe" in that no references to it are
552 * maintained by this queue. (In other words, this method must allocate
553 * a new array). The caller is thus free to modify the returned array.
554 *
555 * <p>This method acts as bridge between array-based and collection-based
556 * APIs.
557 *
558 * @return an array containing all of the elements in this queue
559 */
560 public Object[] toArray() {
561 fullyLock();
562 try {
563 int size = count.get();
564 Object[] a = new Object[size];
565 int k = 0;
566 for (Node<E> p = head.next; p != null; p = p.next)
567 a[k++] = p.item;
568 return a;
569 } finally {
570 fullyUnlock();
571 }
572 }
573
574 /**
575 * Returns an array containing all of the elements in this queue, in
576 * proper sequence; the runtime type of the returned array is that of
577 * the specified array. If the queue fits in the specified array, it
578 * is returned therein. Otherwise, a new array is allocated with the
579 * runtime type of the specified array and the size of this queue.
580 *
581 * <p>If this queue fits in the specified array with room to spare
582 * (i.e., the array has more elements than this queue), the element in
583 * the array immediately following the end of the queue is set to
584 * {@code null}.
585 *
586 * <p>Like the {@link #toArray()} method, this method acts as bridge between
587 * array-based and collection-based APIs. Further, this method allows
588 * precise control over the runtime type of the output array, and may,
589 * under certain circumstances, be used to save allocation costs.
590 *
591 * <p>Suppose {@code x} is a queue known to contain only strings.
592 * The following code can be used to dump the queue into a newly
593 * allocated array of {@code String}:
594 *
595 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
596 *
597 * Note that {@code toArray(new Object[0])} is identical in function to
598 * {@code toArray()}.
599 *
600 * @param a the array into which the elements of the queue are to
601 * be stored, if it is big enough; otherwise, a new array of the
602 * same runtime type is allocated for this purpose
603 * @return an array containing all of the elements in this queue
604 * @throws ArrayStoreException if the runtime type of the specified array
605 * is not a supertype of the runtime type of every element in
606 * this queue
607 * @throws NullPointerException if the specified array is null
608 */
609 @SuppressWarnings("unchecked")
610 public <T> T[] toArray(T[] a) {
611 fullyLock();
612 try {
613 int size = count.get();
614 if (a.length < size)
615 a = (T[])java.lang.reflect.Array.newInstance
616 (a.getClass().getComponentType(), size);
617
618 int k = 0;
619 for (Node<E> p = head.next; p != null; p = p.next)
620 a[k++] = (T)p.item;
621 if (a.length > k)
622 a[k] = null;
623 return a;
624 } finally {
625 fullyUnlock();
626 }
627 }
628
629 public String toString() {
630 return Helpers.collectionToString(this);
631 }
632
633 /**
634 * Atomically removes all of the elements from this queue.
635 * The queue will be empty after this call returns.
636 */
637 public void clear() {
638 fullyLock();
639 try {
640 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
641 h.next = h;
642 p.item = null;
643 }
644 head = last;
645 // assert head.item == null && head.next == null;
646 if (count.getAndSet(0) == capacity)
647 notFull.signal();
648 } finally {
649 fullyUnlock();
650 }
651 }
652
653 /**
654 * @throws UnsupportedOperationException {@inheritDoc}
655 * @throws ClassCastException {@inheritDoc}
656 * @throws NullPointerException {@inheritDoc}
657 * @throws IllegalArgumentException {@inheritDoc}
658 */
659 public int drainTo(Collection<? super E> c) {
660 return drainTo(c, Integer.MAX_VALUE);
661 }
662
663 /**
664 * @throws UnsupportedOperationException {@inheritDoc}
665 * @throws ClassCastException {@inheritDoc}
666 * @throws NullPointerException {@inheritDoc}
667 * @throws IllegalArgumentException {@inheritDoc}
668 */
669 public int drainTo(Collection<? super E> c, int maxElements) {
670 Objects.requireNonNull(c);
671 if (c == this)
672 throw new IllegalArgumentException();
673 if (maxElements <= 0)
674 return 0;
675 boolean signalNotFull = false;
676 final ReentrantLock takeLock = this.takeLock;
677 takeLock.lock();
678 try {
679 int n = Math.min(maxElements, count.get());
680 // count.get provides visibility to first n Nodes
681 Node<E> h = head;
682 int i = 0;
683 try {
684 while (i < n) {
685 Node<E> p = h.next;
686 c.add(p.item);
687 p.item = null;
688 h.next = h;
689 h = p;
690 ++i;
691 }
692 return n;
693 } finally {
694 // Restore invariants even if c.add() threw
695 if (i > 0) {
696 // assert h.item == null;
697 head = h;
698 signalNotFull = (count.getAndAdd(-i) == capacity);
699 }
700 }
701 } finally {
702 takeLock.unlock();
703 if (signalNotFull)
704 signalNotFull();
705 }
706 }
707
708 /**
709 * Used for any element traversal that is not entirely under lock.
710 * Such traversals must handle both:
711 * - dequeued nodes (p.next == p)
712 * - (possibly multiple) interior removed nodes (p.item == null)
713 */
714 Node<E> succ(Node<E> p) {
715 if (p == (p = p.next))
716 p = head.next;
717 return p;
718 }
719
720 /**
721 * Returns an iterator over the elements in this queue in proper sequence.
722 * The elements will be returned in order from first (head) to last (tail).
723 *
724 * <p>The returned iterator is
725 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
726 *
727 * @return an iterator over the elements in this queue in proper sequence
728 */
729 public Iterator<E> iterator() {
730 return new Itr();
731 }
732
733 /**
734 * Weakly-consistent iterator.
735 *
736 * Lazily updated ancestor field provides expected O(1) remove(),
737 * but still O(n) in the worst case, whenever the saved ancestor
738 * is concurrently deleted.
739 */
740 private class Itr implements Iterator<E> {
741 private Node<E> next; // Node holding nextItem
742 private E nextItem; // next item to hand out
743 private Node<E> lastRet;
744 private Node<E> ancestor; // Helps unlink lastRet on remove()
745
746 Itr() {
747 fullyLock();
748 try {
749 if ((next = head.next) != null)
750 nextItem = next.item;
751 } finally {
752 fullyUnlock();
753 }
754 }
755
756 public boolean hasNext() {
757 return next != null;
758 }
759
760 public E next() {
761 Node<E> p;
762 if ((p = next) == null)
763 throw new NoSuchElementException();
764 lastRet = p;
765 E x = nextItem;
766 fullyLock();
767 try {
768 E e = null;
769 for (p = p.next; p != null && (e = p.item) == null; )
770 p = succ(p);
771 next = p;
772 nextItem = e;
773 } finally {
774 fullyUnlock();
775 }
776 return x;
777 }
778
779 public void forEachRemaining(Consumer<? super E> action) {
780 // A variant of forEachFrom
781 Objects.requireNonNull(action);
782 Node<E> p;
783 if ((p = next) == null) return;
784 lastRet = p;
785 next = null;
786 final int batchSize = 64;
787 Object[] es = null;
788 int n, len = 1;
789 do {
790 fullyLock();
791 try {
792 if (es == null) {
793 p = p.next;
794 for (Node<E> q = p; q != null; q = succ(q))
795 if (q.item != null && ++len == batchSize)
796 break;
797 es = new Object[len];
798 es[0] = nextItem;
799 nextItem = null;
800 n = 1;
801 } else
802 n = 0;
803 for (; p != null && n < len; p = succ(p))
804 if ((es[n] = p.item) != null) {
805 lastRet = p;
806 n++;
807 }
808 } finally {
809 fullyUnlock();
810 }
811 for (int i = 0; i < n; i++) {
812 @SuppressWarnings("unchecked") E e = (E) es[i];
813 action.accept(e);
814 }
815 } while (n > 0 && p != null);
816 }
817
818 public void remove() {
819 Node<E> p = lastRet;
820 if (p == null)
821 throw new IllegalStateException();
822 lastRet = null;
823 fullyLock();
824 try {
825 if (p.item != null) {
826 if (ancestor == null)
827 ancestor = head;
828 ancestor = findPred(p, ancestor);
829 unlink(p, ancestor);
830 }
831 } finally {
832 fullyUnlock();
833 }
834 }
835 }
836
837 /**
838 * A customized variant of Spliterators.IteratorSpliterator.
839 * Keep this class in sync with (very similar) LBDSpliterator.
840 */
841 private final class LBQSpliterator implements Spliterator<E> {
842 static final int MAX_BATCH = 1 << 25; // max batch array size;
843 Node<E> current; // current node; null until initialized
844 int batch; // batch size for splits
845 boolean exhausted; // true when no more nodes
846 long est = size(); // size estimate
847
848 LBQSpliterator() {}
849
850 public long estimateSize() { return est; }
851
852 public Spliterator<E> trySplit() {
853 Node<E> h;
854 if (!exhausted &&
855 ((h = current) != null || (h = head.next) != null)
856 && h.next != null) {
857 int n = batch = Math.min(batch + 1, MAX_BATCH);
858 Object[] a = new Object[n];
859 int i = 0;
860 Node<E> p = current;
861 fullyLock();
862 try {
863 if (p != null || (p = head.next) != null)
864 for (; p != null && i < n; p = succ(p))
865 if ((a[i] = p.item) != null)
866 i++;
867 } finally {
868 fullyUnlock();
869 }
870 if ((current = p) == null) {
871 est = 0L;
872 exhausted = true;
873 }
874 else if ((est -= i) < 0L)
875 est = 0L;
876 if (i > 0)
877 return Spliterators.spliterator
878 (a, 0, i, (Spliterator.ORDERED |
879 Spliterator.NONNULL |
880 Spliterator.CONCURRENT));
881 }
882 return null;
883 }
884
885 public boolean tryAdvance(Consumer<? super E> action) {
886 Objects.requireNonNull(action);
887 if (!exhausted) {
888 E e = null;
889 fullyLock();
890 try {
891 Node<E> p;
892 if ((p = current) != null || (p = head.next) != null)
893 do {
894 e = p.item;
895 p = succ(p);
896 } while (e == null && p != null);
897 if ((current = p) == null)
898 exhausted = true;
899 } finally {
900 fullyUnlock();
901 }
902 if (e != null) {
903 action.accept(e);
904 return true;
905 }
906 }
907 return false;
908 }
909
910 public void forEachRemaining(Consumer<? super E> action) {
911 Objects.requireNonNull(action);
912 if (!exhausted) {
913 exhausted = true;
914 Node<E> p = current;
915 current = null;
916 forEachFrom(action, p);
917 }
918 }
919
920 public int characteristics() {
921 return (Spliterator.ORDERED |
922 Spliterator.NONNULL |
923 Spliterator.CONCURRENT);
924 }
925 }
926
927 /**
928 * Returns a {@link Spliterator} over the elements in this queue.
929 *
930 * <p>The returned spliterator is
931 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
932 *
933 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
934 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
935 *
936 * @implNote
937 * The {@code Spliterator} implements {@code trySplit} to permit limited
938 * parallelism.
939 *
940 * @return a {@code Spliterator} over the elements in this queue
941 * @since 1.8
942 */
943 public Spliterator<E> spliterator() {
944 return new LBQSpliterator();
945 }
946
947 /**
948 * @throws NullPointerException {@inheritDoc}
949 */
950 public void forEach(Consumer<? super E> action) {
951 Objects.requireNonNull(action);
952 forEachFrom(action, null);
953 }
954
955 /**
956 * Runs action on each element found during a traversal starting at p.
957 * If p is null, traversal starts at head.
958 */
959 void forEachFrom(Consumer<? super E> action, Node<E> p) {
960 // Extract batches of elements while holding the lock; then
961 // run the action on the elements while not
962 final int batchSize = 64; // max number of elements per batch
963 Object[] es = null; // container for batch of elements
964 int n, len = 0;
965 do {
966 fullyLock();
967 try {
968 if (es == null) {
969 if (p == null) p = head.next;
970 for (Node<E> q = p; q != null; q = succ(q))
971 if (q.item != null && ++len == batchSize)
972 break;
973 es = new Object[len];
974 }
975 for (n = 0; p != null && n < len; p = succ(p))
976 if ((es[n] = p.item) != null)
977 n++;
978 } finally {
979 fullyUnlock();
980 }
981 for (int i = 0; i < n; i++) {
982 @SuppressWarnings("unchecked") E e = (E) es[i];
983 action.accept(e);
984 }
985 } while (n > 0 && p != null);
986 }
987
988 /**
989 * @throws NullPointerException {@inheritDoc}
990 */
991 public boolean removeIf(Predicate<? super E> filter) {
992 Objects.requireNonNull(filter);
993 return bulkRemove(filter);
994 }
995
996 /**
997 * @throws NullPointerException {@inheritDoc}
998 */
999 public boolean removeAll(Collection<?> c) {
1000 Objects.requireNonNull(c);
1001 return bulkRemove(e -> c.contains(e));
1002 }
1003
1004 /**
1005 * @throws NullPointerException {@inheritDoc}
1006 */
1007 public boolean retainAll(Collection<?> c) {
1008 Objects.requireNonNull(c);
1009 return bulkRemove(e -> !c.contains(e));
1010 }
1011
1012 /**
1013 * Returns the predecessor of live node p, given a node that was
1014 * once a live ancestor of p (or head); allows unlinking of p.
1015 */
1016 Node<E> findPred(Node<E> p, Node<E> ancestor) {
1017 // assert p.item != null;
1018 if (ancestor.item == null)
1019 ancestor = head;
1020 // Fails with NPE if precondition not satisfied
1021 for (Node<E> q; (q = ancestor.next) != p; )
1022 ancestor = q;
1023 return ancestor;
1024 }
1025
1026 /** Implementation of bulk remove methods. */
1027 @SuppressWarnings("unchecked")
1028 private boolean bulkRemove(Predicate<? super E> filter) {
1029 boolean removed = false;
1030 Node<E> p = null, ancestor = head;
1031 Node<E>[] nodes = null;
1032 int n, len = 0;
1033 do {
1034 // 1. Extract batch of up to 64 elements while holding the lock.
1035 fullyLock();
1036 try {
1037 if (nodes == null) { // first batch; initialize
1038 p = head.next;
1039 for (Node<E> q = p; q != null; q = succ(q))
1040 if (q.item != null && ++len == 64)
1041 break;
1042 nodes = (Node<E>[]) new Node<?>[len];
1043 }
1044 for (n = 0; p != null && n < len; p = succ(p))
1045 nodes[n++] = p;
1046 } finally {
1047 fullyUnlock();
1048 }
1049
1050 // 2. Run the filter on the elements while lock is free.
1051 long deathRow = 0L; // "bitset" of size 64
1052 for (int i = 0; i < n; i++) {
1053 final E e;
1054 if ((e = nodes[i].item) != null && filter.test(e))
1055 deathRow |= 1L << i;
1056 }
1057
1058 // 3. Remove any filtered elements while holding the lock.
1059 if (deathRow != 0) {
1060 fullyLock();
1061 try {
1062 for (int i = 0; i < n; i++) {
1063 final Node<E> q;
1064 if ((deathRow & (1L << i)) != 0L
1065 && (q = nodes[i]).item != null) {
1066 ancestor = findPred(q, ancestor);
1067 unlink(q, ancestor);
1068 removed = true;
1069 }
1070 nodes[i] = null; // help GC
1071 }
1072 } finally {
1073 fullyUnlock();
1074 }
1075 }
1076 } while (n > 0 && p != null);
1077 return removed;
1078 }
1079
1080 /**
1081 * Saves this queue to a stream (that is, serializes it).
1082 *
1083 * @param s the stream
1084 * @throws java.io.IOException if an I/O error occurs
1085 * @serialData The capacity is emitted (int), followed by all of
1086 * its elements (each an {@code Object}) in the proper order,
1087 * followed by a null
1088 */
1089 private void writeObject(java.io.ObjectOutputStream s)
1090 throws java.io.IOException {
1091
1092 fullyLock();
1093 try {
1094 // Write out any hidden stuff, plus capacity
1095 s.defaultWriteObject();
1096
1097 // Write out all elements in the proper order.
1098 for (Node<E> p = head.next; p != null; p = p.next)
1099 s.writeObject(p.item);
1100
1101 // Use trailing null as sentinel
1102 s.writeObject(null);
1103 } finally {
1104 fullyUnlock();
1105 }
1106 }
1107
1108 /**
1109 * Reconstitutes this queue from a stream (that is, deserializes it).
1110 * @param s the stream
1111 * @throws ClassNotFoundException if the class of a serialized object
1112 * could not be found
1113 * @throws java.io.IOException if an I/O error occurs
1114 */
1115 private void readObject(java.io.ObjectInputStream s)
1116 throws java.io.IOException, ClassNotFoundException {
1117 // Read in capacity, and any hidden stuff
1118 s.defaultReadObject();
1119
1120 count.set(0);
1121 last = head = new Node<E>(null);
1122
1123 // Read in all elements and place in queue
1124 for (;;) {
1125 @SuppressWarnings("unchecked")
1126 E item = (E)s.readObject();
1127 if (item == null)
1128 break;
1129 add(item);
1130 }
1131 }
1132 }