ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.111
Committed: Sat May 6 06:49:46 2017 UTC (7 years ago) by jsr166
Branch: MAIN
Changes since 1.110: +1 -1 lines
Log Message:
8177789: fix collections framework links to point to java.util package doc

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.Iterator;
12 import java.util.NoSuchElementException;
13 import java.util.Objects;
14 import java.util.Spliterator;
15 import java.util.Spliterators;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.function.Consumer;
20 import java.util.function.Predicate;
21
22 /**
23 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
24 * linked nodes.
25 * This queue orders elements FIFO (first-in-first-out).
26 * The <em>head</em> of the queue is that element that has been on the
27 * queue the longest time.
28 * The <em>tail</em> of the queue is that element that has been on the
29 * queue the shortest time. New elements
30 * are inserted at the tail of the queue, and the queue retrieval
31 * operations obtain elements at the head of the queue.
32 * Linked queues typically have higher throughput than array-based queues but
33 * less predictable performance in most concurrent applications.
34 *
35 * <p>The optional capacity bound constructor argument serves as a
36 * way to prevent excessive queue expansion. The capacity, if unspecified,
37 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
38 * dynamically created upon each insertion unless this would bring the
39 * queue above capacity.
40 *
41 * <p>This class and its iterator implement all of the <em>optional</em>
42 * methods of the {@link Collection} and {@link Iterator} interfaces.
43 *
44 * <p>This class is a member of the
45 * <a href="{@docRoot}/java/util/package-summary.html#CollectionsFramework">
46 * Java Collections Framework</a>.
47 *
48 * @since 1.5
49 * @author Doug Lea
50 * @param <E> the type of elements held in this queue
51 */
52 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
54 private static final long serialVersionUID = -6903933977591709194L;
55
56 /*
57 * A variant of the "two lock queue" algorithm. The putLock gates
58 * entry to put (and offer), and has an associated condition for
59 * waiting puts. Similarly for the takeLock. The "count" field
60 * that they both rely on is maintained as an atomic to avoid
61 * needing to get both locks in most cases. Also, to minimize need
62 * for puts to get takeLock and vice-versa, cascading notifies are
63 * used. When a put notices that it has enabled at least one take,
64 * it signals taker. That taker in turn signals others if more
65 * items have been entered since the signal. And symmetrically for
66 * takes signalling puts. Operations such as remove(Object) and
67 * iterators acquire both locks.
68 *
69 * Visibility between writers and readers is provided as follows:
70 *
71 * Whenever an element is enqueued, the putLock is acquired and
72 * count updated. A subsequent reader guarantees visibility to the
73 * enqueued Node by either acquiring the putLock (via fullyLock)
74 * or by acquiring the takeLock, and then reading n = count.get();
75 * this gives visibility to the first n items.
76 *
77 * To implement weakly consistent iterators, it appears we need to
78 * keep all Nodes GC-reachable from a predecessor dequeued Node.
79 * That would cause two problems:
80 * - allow a rogue Iterator to cause unbounded memory retention
81 * - cause cross-generational linking of old Nodes to new Nodes if
82 * a Node was tenured while live, which generational GCs have a
83 * hard time dealing with, causing repeated major collections.
84 * However, only non-deleted Nodes need to be reachable from
85 * dequeued Nodes, and reachability does not necessarily have to
86 * be of the kind understood by the GC. We use the trick of
87 * linking a Node that has just been dequeued to itself. Such a
88 * self-link implicitly means to advance to head.next.
89 */
90
91 /**
92 * Linked list node class.
93 */
94 static class Node<E> {
95 E item;
96
97 /**
98 * One of:
99 * - the real successor Node
100 * - this Node, meaning the successor is head.next
101 * - null, meaning there is no successor (this is the last node)
102 */
103 Node<E> next;
104
105 Node(E x) { item = x; }
106 }
107
108 /** The capacity bound, or Integer.MAX_VALUE if none */
109 private final int capacity;
110
111 /** Current number of elements */
112 private final AtomicInteger count = new AtomicInteger();
113
114 /**
115 * Head of linked list.
116 * Invariant: head.item == null
117 */
118 transient Node<E> head;
119
120 /**
121 * Tail of linked list.
122 * Invariant: last.next == null
123 */
124 private transient Node<E> last;
125
126 /** Lock held by take, poll, etc */
127 private final ReentrantLock takeLock = new ReentrantLock();
128
129 /** Wait queue for waiting takes */
130 private final Condition notEmpty = takeLock.newCondition();
131
132 /** Lock held by put, offer, etc */
133 private final ReentrantLock putLock = new ReentrantLock();
134
135 /** Wait queue for waiting puts */
136 private final Condition notFull = putLock.newCondition();
137
138 /**
139 * Signals a waiting take. Called only from put/offer (which do not
140 * otherwise ordinarily lock takeLock.)
141 */
142 private void signalNotEmpty() {
143 final ReentrantLock takeLock = this.takeLock;
144 takeLock.lock();
145 try {
146 notEmpty.signal();
147 } finally {
148 takeLock.unlock();
149 }
150 }
151
152 /**
153 * Signals a waiting put. Called only from take/poll.
154 */
155 private void signalNotFull() {
156 final ReentrantLock putLock = this.putLock;
157 putLock.lock();
158 try {
159 notFull.signal();
160 } finally {
161 putLock.unlock();
162 }
163 }
164
165 /**
166 * Links node at end of queue.
167 *
168 * @param node the node
169 */
170 private void enqueue(Node<E> node) {
171 // assert putLock.isHeldByCurrentThread();
172 // assert last.next == null;
173 last = last.next = node;
174 }
175
176 /**
177 * Removes a node from head of queue.
178 *
179 * @return the node
180 */
181 private E dequeue() {
182 // assert takeLock.isHeldByCurrentThread();
183 // assert head.item == null;
184 Node<E> h = head;
185 Node<E> first = h.next;
186 h.next = h; // help GC
187 head = first;
188 E x = first.item;
189 first.item = null;
190 return x;
191 }
192
193 /**
194 * Locks to prevent both puts and takes.
195 */
196 void fullyLock() {
197 putLock.lock();
198 takeLock.lock();
199 }
200
201 /**
202 * Unlocks to allow both puts and takes.
203 */
204 void fullyUnlock() {
205 takeLock.unlock();
206 putLock.unlock();
207 }
208
209 /**
210 * Creates a {@code LinkedBlockingQueue} with a capacity of
211 * {@link Integer#MAX_VALUE}.
212 */
213 public LinkedBlockingQueue() {
214 this(Integer.MAX_VALUE);
215 }
216
217 /**
218 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
219 *
220 * @param capacity the capacity of this queue
221 * @throws IllegalArgumentException if {@code capacity} is not greater
222 * than zero
223 */
224 public LinkedBlockingQueue(int capacity) {
225 if (capacity <= 0) throw new IllegalArgumentException();
226 this.capacity = capacity;
227 last = head = new Node<E>(null);
228 }
229
230 /**
231 * Creates a {@code LinkedBlockingQueue} with a capacity of
232 * {@link Integer#MAX_VALUE}, initially containing the elements of the
233 * given collection,
234 * added in traversal order of the collection's iterator.
235 *
236 * @param c the collection of elements to initially contain
237 * @throws NullPointerException if the specified collection or any
238 * of its elements are null
239 */
240 public LinkedBlockingQueue(Collection<? extends E> c) {
241 this(Integer.MAX_VALUE);
242 final ReentrantLock putLock = this.putLock;
243 putLock.lock(); // Never contended, but necessary for visibility
244 try {
245 int n = 0;
246 for (E e : c) {
247 if (e == null)
248 throw new NullPointerException();
249 if (n == capacity)
250 throw new IllegalStateException("Queue full");
251 enqueue(new Node<E>(e));
252 ++n;
253 }
254 count.set(n);
255 } finally {
256 putLock.unlock();
257 }
258 }
259
260 // this doc comment is overridden to remove the reference to collections
261 // greater in size than Integer.MAX_VALUE
262 /**
263 * Returns the number of elements in this queue.
264 *
265 * @return the number of elements in this queue
266 */
267 public int size() {
268 return count.get();
269 }
270
271 // this doc comment is a modified copy of the inherited doc comment,
272 // without the reference to unlimited queues.
273 /**
274 * Returns the number of additional elements that this queue can ideally
275 * (in the absence of memory or resource constraints) accept without
276 * blocking. This is always equal to the initial capacity of this queue
277 * less the current {@code size} of this queue.
278 *
279 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
280 * an element will succeed by inspecting {@code remainingCapacity}
281 * because it may be the case that another thread is about to
282 * insert or remove an element.
283 */
284 public int remainingCapacity() {
285 return capacity - count.get();
286 }
287
288 /**
289 * Inserts the specified element at the tail of this queue, waiting if
290 * necessary for space to become available.
291 *
292 * @throws InterruptedException {@inheritDoc}
293 * @throws NullPointerException {@inheritDoc}
294 */
295 public void put(E e) throws InterruptedException {
296 if (e == null) throw new NullPointerException();
297 // Note: convention in all put/take/etc is to preset local var
298 // holding count negative to indicate failure unless set.
299 int c = -1;
300 Node<E> node = new Node<E>(e);
301 final ReentrantLock putLock = this.putLock;
302 final AtomicInteger count = this.count;
303 putLock.lockInterruptibly();
304 try {
305 /*
306 * Note that count is used in wait guard even though it is
307 * not protected by lock. This works because count can
308 * only decrease at this point (all other puts are shut
309 * out by lock), and we (or some other waiting put) are
310 * signalled if it ever changes from capacity. Similarly
311 * for all other uses of count in other wait guards.
312 */
313 while (count.get() == capacity) {
314 notFull.await();
315 }
316 enqueue(node);
317 c = count.getAndIncrement();
318 if (c + 1 < capacity)
319 notFull.signal();
320 } finally {
321 putLock.unlock();
322 }
323 if (c == 0)
324 signalNotEmpty();
325 }
326
327 /**
328 * Inserts the specified element at the tail of this queue, waiting if
329 * necessary up to the specified wait time for space to become available.
330 *
331 * @return {@code true} if successful, or {@code false} if
332 * the specified waiting time elapses before space is available
333 * @throws InterruptedException {@inheritDoc}
334 * @throws NullPointerException {@inheritDoc}
335 */
336 public boolean offer(E e, long timeout, TimeUnit unit)
337 throws InterruptedException {
338
339 if (e == null) throw new NullPointerException();
340 long nanos = unit.toNanos(timeout);
341 int c = -1;
342 final ReentrantLock putLock = this.putLock;
343 final AtomicInteger count = this.count;
344 putLock.lockInterruptibly();
345 try {
346 while (count.get() == capacity) {
347 if (nanos <= 0L)
348 return false;
349 nanos = notFull.awaitNanos(nanos);
350 }
351 enqueue(new Node<E>(e));
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
356 putLock.unlock();
357 }
358 if (c == 0)
359 signalNotEmpty();
360 return true;
361 }
362
363 /**
364 * Inserts the specified element at the tail of this queue if it is
365 * possible to do so immediately without exceeding the queue's capacity,
366 * returning {@code true} upon success and {@code false} if this queue
367 * is full.
368 * When using a capacity-restricted queue, this method is generally
369 * preferable to method {@link BlockingQueue#add add}, which can fail to
370 * insert an element only by throwing an exception.
371 *
372 * @throws NullPointerException if the specified element is null
373 */
374 public boolean offer(E e) {
375 if (e == null) throw new NullPointerException();
376 final AtomicInteger count = this.count;
377 if (count.get() == capacity)
378 return false;
379 int c = -1;
380 Node<E> node = new Node<E>(e);
381 final ReentrantLock putLock = this.putLock;
382 putLock.lock();
383 try {
384 if (count.get() < capacity) {
385 enqueue(node);
386 c = count.getAndIncrement();
387 if (c + 1 < capacity)
388 notFull.signal();
389 }
390 } finally {
391 putLock.unlock();
392 }
393 if (c == 0)
394 signalNotEmpty();
395 return c >= 0;
396 }
397
398 public E take() throws InterruptedException {
399 E x;
400 int c = -1;
401 final AtomicInteger count = this.count;
402 final ReentrantLock takeLock = this.takeLock;
403 takeLock.lockInterruptibly();
404 try {
405 while (count.get() == 0) {
406 notEmpty.await();
407 }
408 x = dequeue();
409 c = count.getAndDecrement();
410 if (c > 1)
411 notEmpty.signal();
412 } finally {
413 takeLock.unlock();
414 }
415 if (c == capacity)
416 signalNotFull();
417 return x;
418 }
419
420 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
421 E x = null;
422 int c = -1;
423 long nanos = unit.toNanos(timeout);
424 final AtomicInteger count = this.count;
425 final ReentrantLock takeLock = this.takeLock;
426 takeLock.lockInterruptibly();
427 try {
428 while (count.get() == 0) {
429 if (nanos <= 0L)
430 return null;
431 nanos = notEmpty.awaitNanos(nanos);
432 }
433 x = dequeue();
434 c = count.getAndDecrement();
435 if (c > 1)
436 notEmpty.signal();
437 } finally {
438 takeLock.unlock();
439 }
440 if (c == capacity)
441 signalNotFull();
442 return x;
443 }
444
445 public E poll() {
446 final AtomicInteger count = this.count;
447 if (count.get() == 0)
448 return null;
449 E x = null;
450 int c = -1;
451 final ReentrantLock takeLock = this.takeLock;
452 takeLock.lock();
453 try {
454 if (count.get() > 0) {
455 x = dequeue();
456 c = count.getAndDecrement();
457 if (c > 1)
458 notEmpty.signal();
459 }
460 } finally {
461 takeLock.unlock();
462 }
463 if (c == capacity)
464 signalNotFull();
465 return x;
466 }
467
468 public E peek() {
469 if (count.get() == 0)
470 return null;
471 final ReentrantLock takeLock = this.takeLock;
472 takeLock.lock();
473 try {
474 return (count.get() > 0) ? head.next.item : null;
475 } finally {
476 takeLock.unlock();
477 }
478 }
479
480 /**
481 * Unlinks interior Node p with predecessor pred.
482 */
483 void unlink(Node<E> p, Node<E> pred) {
484 // assert putLock.isHeldByCurrentThread();
485 // assert takeLock.isHeldByCurrentThread();
486 // p.next is not changed, to allow iterators that are
487 // traversing p to maintain their weak-consistency guarantee.
488 p.item = null;
489 pred.next = p.next;
490 if (last == p)
491 last = pred;
492 if (count.getAndDecrement() == capacity)
493 notFull.signal();
494 }
495
496 /**
497 * Removes a single instance of the specified element from this queue,
498 * if it is present. More formally, removes an element {@code e} such
499 * that {@code o.equals(e)}, if this queue contains one or more such
500 * elements.
501 * Returns {@code true} if this queue contained the specified element
502 * (or equivalently, if this queue changed as a result of the call).
503 *
504 * @param o element to be removed from this queue, if present
505 * @return {@code true} if this queue changed as a result of the call
506 */
507 public boolean remove(Object o) {
508 if (o == null) return false;
509 fullyLock();
510 try {
511 for (Node<E> pred = head, p = pred.next;
512 p != null;
513 pred = p, p = p.next) {
514 if (o.equals(p.item)) {
515 unlink(p, pred);
516 return true;
517 }
518 }
519 return false;
520 } finally {
521 fullyUnlock();
522 }
523 }
524
525 /**
526 * Returns {@code true} if this queue contains the specified element.
527 * More formally, returns {@code true} if and only if this queue contains
528 * at least one element {@code e} such that {@code o.equals(e)}.
529 *
530 * @param o object to be checked for containment in this queue
531 * @return {@code true} if this queue contains the specified element
532 */
533 public boolean contains(Object o) {
534 if (o == null) return false;
535 fullyLock();
536 try {
537 for (Node<E> p = head.next; p != null; p = p.next)
538 if (o.equals(p.item))
539 return true;
540 return false;
541 } finally {
542 fullyUnlock();
543 }
544 }
545
546 /**
547 * Returns an array containing all of the elements in this queue, in
548 * proper sequence.
549 *
550 * <p>The returned array will be "safe" in that no references to it are
551 * maintained by this queue. (In other words, this method must allocate
552 * a new array). The caller is thus free to modify the returned array.
553 *
554 * <p>This method acts as bridge between array-based and collection-based
555 * APIs.
556 *
557 * @return an array containing all of the elements in this queue
558 */
559 public Object[] toArray() {
560 fullyLock();
561 try {
562 int size = count.get();
563 Object[] a = new Object[size];
564 int k = 0;
565 for (Node<E> p = head.next; p != null; p = p.next)
566 a[k++] = p.item;
567 return a;
568 } finally {
569 fullyUnlock();
570 }
571 }
572
573 /**
574 * Returns an array containing all of the elements in this queue, in
575 * proper sequence; the runtime type of the returned array is that of
576 * the specified array. If the queue fits in the specified array, it
577 * is returned therein. Otherwise, a new array is allocated with the
578 * runtime type of the specified array and the size of this queue.
579 *
580 * <p>If this queue fits in the specified array with room to spare
581 * (i.e., the array has more elements than this queue), the element in
582 * the array immediately following the end of the queue is set to
583 * {@code null}.
584 *
585 * <p>Like the {@link #toArray()} method, this method acts as bridge between
586 * array-based and collection-based APIs. Further, this method allows
587 * precise control over the runtime type of the output array, and may,
588 * under certain circumstances, be used to save allocation costs.
589 *
590 * <p>Suppose {@code x} is a queue known to contain only strings.
591 * The following code can be used to dump the queue into a newly
592 * allocated array of {@code String}:
593 *
594 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
595 *
596 * Note that {@code toArray(new Object[0])} is identical in function to
597 * {@code toArray()}.
598 *
599 * @param a the array into which the elements of the queue are to
600 * be stored, if it is big enough; otherwise, a new array of the
601 * same runtime type is allocated for this purpose
602 * @return an array containing all of the elements in this queue
603 * @throws ArrayStoreException if the runtime type of the specified array
604 * is not a supertype of the runtime type of every element in
605 * this queue
606 * @throws NullPointerException if the specified array is null
607 */
608 @SuppressWarnings("unchecked")
609 public <T> T[] toArray(T[] a) {
610 fullyLock();
611 try {
612 int size = count.get();
613 if (a.length < size)
614 a = (T[])java.lang.reflect.Array.newInstance
615 (a.getClass().getComponentType(), size);
616
617 int k = 0;
618 for (Node<E> p = head.next; p != null; p = p.next)
619 a[k++] = (T)p.item;
620 if (a.length > k)
621 a[k] = null;
622 return a;
623 } finally {
624 fullyUnlock();
625 }
626 }
627
628 public String toString() {
629 return Helpers.collectionToString(this);
630 }
631
632 /**
633 * Atomically removes all of the elements from this queue.
634 * The queue will be empty after this call returns.
635 */
636 public void clear() {
637 fullyLock();
638 try {
639 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
640 h.next = h;
641 p.item = null;
642 }
643 head = last;
644 // assert head.item == null && head.next == null;
645 if (count.getAndSet(0) == capacity)
646 notFull.signal();
647 } finally {
648 fullyUnlock();
649 }
650 }
651
652 /**
653 * @throws UnsupportedOperationException {@inheritDoc}
654 * @throws ClassCastException {@inheritDoc}
655 * @throws NullPointerException {@inheritDoc}
656 * @throws IllegalArgumentException {@inheritDoc}
657 */
658 public int drainTo(Collection<? super E> c) {
659 return drainTo(c, Integer.MAX_VALUE);
660 }
661
662 /**
663 * @throws UnsupportedOperationException {@inheritDoc}
664 * @throws ClassCastException {@inheritDoc}
665 * @throws NullPointerException {@inheritDoc}
666 * @throws IllegalArgumentException {@inheritDoc}
667 */
668 public int drainTo(Collection<? super E> c, int maxElements) {
669 Objects.requireNonNull(c);
670 if (c == this)
671 throw new IllegalArgumentException();
672 if (maxElements <= 0)
673 return 0;
674 boolean signalNotFull = false;
675 final ReentrantLock takeLock = this.takeLock;
676 takeLock.lock();
677 try {
678 int n = Math.min(maxElements, count.get());
679 // count.get provides visibility to first n Nodes
680 Node<E> h = head;
681 int i = 0;
682 try {
683 while (i < n) {
684 Node<E> p = h.next;
685 c.add(p.item);
686 p.item = null;
687 h.next = h;
688 h = p;
689 ++i;
690 }
691 return n;
692 } finally {
693 // Restore invariants even if c.add() threw
694 if (i > 0) {
695 // assert h.item == null;
696 head = h;
697 signalNotFull = (count.getAndAdd(-i) == capacity);
698 }
699 }
700 } finally {
701 takeLock.unlock();
702 if (signalNotFull)
703 signalNotFull();
704 }
705 }
706
707 /**
708 * Used for any element traversal that is not entirely under lock.
709 * Such traversals must handle both:
710 * - dequeued nodes (p.next == p)
711 * - (possibly multiple) interior removed nodes (p.item == null)
712 */
713 Node<E> succ(Node<E> p) {
714 if (p == (p = p.next))
715 p = head.next;
716 return p;
717 }
718
719 /**
720 * Returns an iterator over the elements in this queue in proper sequence.
721 * The elements will be returned in order from first (head) to last (tail).
722 *
723 * <p>The returned iterator is
724 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
725 *
726 * @return an iterator over the elements in this queue in proper sequence
727 */
728 public Iterator<E> iterator() {
729 return new Itr();
730 }
731
732 /**
733 * Weakly-consistent iterator.
734 *
735 * Lazily updated ancestor field provides expected O(1) remove(),
736 * but still O(n) in the worst case, whenever the saved ancestor
737 * is concurrently deleted.
738 */
739 private class Itr implements Iterator<E> {
740 private Node<E> next; // Node holding nextItem
741 private E nextItem; // next item to hand out
742 private Node<E> lastRet;
743 private Node<E> ancestor; // Helps unlink lastRet on remove()
744
745 Itr() {
746 fullyLock();
747 try {
748 if ((next = head.next) != null)
749 nextItem = next.item;
750 } finally {
751 fullyUnlock();
752 }
753 }
754
755 public boolean hasNext() {
756 return next != null;
757 }
758
759 public E next() {
760 Node<E> p;
761 if ((p = next) == null)
762 throw new NoSuchElementException();
763 lastRet = p;
764 E x = nextItem;
765 fullyLock();
766 try {
767 E e = null;
768 for (p = p.next; p != null && (e = p.item) == null; )
769 p = succ(p);
770 next = p;
771 nextItem = e;
772 } finally {
773 fullyUnlock();
774 }
775 return x;
776 }
777
778 public void forEachRemaining(Consumer<? super E> action) {
779 // A variant of forEachFrom
780 Objects.requireNonNull(action);
781 Node<E> p;
782 if ((p = next) == null) return;
783 lastRet = p;
784 next = null;
785 final int batchSize = 64;
786 Object[] es = null;
787 int n, len = 1;
788 do {
789 fullyLock();
790 try {
791 if (es == null) {
792 p = p.next;
793 for (Node<E> q = p; q != null; q = succ(q))
794 if (q.item != null && ++len == batchSize)
795 break;
796 es = new Object[len];
797 es[0] = nextItem;
798 nextItem = null;
799 n = 1;
800 } else
801 n = 0;
802 for (; p != null && n < len; p = succ(p))
803 if ((es[n] = p.item) != null) {
804 lastRet = p;
805 n++;
806 }
807 } finally {
808 fullyUnlock();
809 }
810 for (int i = 0; i < n; i++) {
811 @SuppressWarnings("unchecked") E e = (E) es[i];
812 action.accept(e);
813 }
814 } while (n > 0 && p != null);
815 }
816
817 public void remove() {
818 Node<E> p = lastRet;
819 if (p == null)
820 throw new IllegalStateException();
821 lastRet = null;
822 fullyLock();
823 try {
824 if (p.item != null) {
825 if (ancestor == null)
826 ancestor = head;
827 ancestor = findPred(p, ancestor);
828 unlink(p, ancestor);
829 }
830 } finally {
831 fullyUnlock();
832 }
833 }
834 }
835
836 /**
837 * A customized variant of Spliterators.IteratorSpliterator.
838 * Keep this class in sync with (very similar) LBDSpliterator.
839 */
840 private final class LBQSpliterator implements Spliterator<E> {
841 static final int MAX_BATCH = 1 << 25; // max batch array size;
842 Node<E> current; // current node; null until initialized
843 int batch; // batch size for splits
844 boolean exhausted; // true when no more nodes
845 long est = size(); // size estimate
846
847 LBQSpliterator() {}
848
849 public long estimateSize() { return est; }
850
851 public Spliterator<E> trySplit() {
852 Node<E> h;
853 if (!exhausted &&
854 ((h = current) != null || (h = head.next) != null)
855 && h.next != null) {
856 int n = batch = Math.min(batch + 1, MAX_BATCH);
857 Object[] a = new Object[n];
858 int i = 0;
859 Node<E> p = current;
860 fullyLock();
861 try {
862 if (p != null || (p = head.next) != null)
863 for (; p != null && i < n; p = succ(p))
864 if ((a[i] = p.item) != null)
865 i++;
866 } finally {
867 fullyUnlock();
868 }
869 if ((current = p) == null) {
870 est = 0L;
871 exhausted = true;
872 }
873 else if ((est -= i) < 0L)
874 est = 0L;
875 if (i > 0)
876 return Spliterators.spliterator
877 (a, 0, i, (Spliterator.ORDERED |
878 Spliterator.NONNULL |
879 Spliterator.CONCURRENT));
880 }
881 return null;
882 }
883
884 public boolean tryAdvance(Consumer<? super E> action) {
885 Objects.requireNonNull(action);
886 if (!exhausted) {
887 E e = null;
888 fullyLock();
889 try {
890 Node<E> p;
891 if ((p = current) != null || (p = head.next) != null)
892 do {
893 e = p.item;
894 p = succ(p);
895 } while (e == null && p != null);
896 if ((current = p) == null)
897 exhausted = true;
898 } finally {
899 fullyUnlock();
900 }
901 if (e != null) {
902 action.accept(e);
903 return true;
904 }
905 }
906 return false;
907 }
908
909 public void forEachRemaining(Consumer<? super E> action) {
910 Objects.requireNonNull(action);
911 if (!exhausted) {
912 exhausted = true;
913 Node<E> p = current;
914 current = null;
915 forEachFrom(action, p);
916 }
917 }
918
919 public int characteristics() {
920 return (Spliterator.ORDERED |
921 Spliterator.NONNULL |
922 Spliterator.CONCURRENT);
923 }
924 }
925
926 /**
927 * Returns a {@link Spliterator} over the elements in this queue.
928 *
929 * <p>The returned spliterator is
930 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
931 *
932 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
933 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
934 *
935 * @implNote
936 * The {@code Spliterator} implements {@code trySplit} to permit limited
937 * parallelism.
938 *
939 * @return a {@code Spliterator} over the elements in this queue
940 * @since 1.8
941 */
942 public Spliterator<E> spliterator() {
943 return new LBQSpliterator();
944 }
945
946 /**
947 * @throws NullPointerException {@inheritDoc}
948 */
949 public void forEach(Consumer<? super E> action) {
950 Objects.requireNonNull(action);
951 forEachFrom(action, null);
952 }
953
954 /**
955 * Runs action on each element found during a traversal starting at p.
956 * If p is null, traversal starts at head.
957 */
958 void forEachFrom(Consumer<? super E> action, Node<E> p) {
959 // Extract batches of elements while holding the lock; then
960 // run the action on the elements while not
961 final int batchSize = 64; // max number of elements per batch
962 Object[] es = null; // container for batch of elements
963 int n, len = 0;
964 do {
965 fullyLock();
966 try {
967 if (es == null) {
968 if (p == null) p = head.next;
969 for (Node<E> q = p; q != null; q = succ(q))
970 if (q.item != null && ++len == batchSize)
971 break;
972 es = new Object[len];
973 }
974 for (n = 0; p != null && n < len; p = succ(p))
975 if ((es[n] = p.item) != null)
976 n++;
977 } finally {
978 fullyUnlock();
979 }
980 for (int i = 0; i < n; i++) {
981 @SuppressWarnings("unchecked") E e = (E) es[i];
982 action.accept(e);
983 }
984 } while (n > 0 && p != null);
985 }
986
987 /**
988 * @throws NullPointerException {@inheritDoc}
989 */
990 public boolean removeIf(Predicate<? super E> filter) {
991 Objects.requireNonNull(filter);
992 return bulkRemove(filter);
993 }
994
995 /**
996 * @throws NullPointerException {@inheritDoc}
997 */
998 public boolean removeAll(Collection<?> c) {
999 Objects.requireNonNull(c);
1000 return bulkRemove(e -> c.contains(e));
1001 }
1002
1003 /**
1004 * @throws NullPointerException {@inheritDoc}
1005 */
1006 public boolean retainAll(Collection<?> c) {
1007 Objects.requireNonNull(c);
1008 return bulkRemove(e -> !c.contains(e));
1009 }
1010
1011 /**
1012 * Returns the predecessor of live node p, given a node that was
1013 * once a live ancestor of p (or head); allows unlinking of p.
1014 */
1015 Node<E> findPred(Node<E> p, Node<E> ancestor) {
1016 // assert p.item != null;
1017 if (ancestor.item == null)
1018 ancestor = head;
1019 // Fails with NPE if precondition not satisfied
1020 for (Node<E> q; (q = ancestor.next) != p; )
1021 ancestor = q;
1022 return ancestor;
1023 }
1024
1025 /** Implementation of bulk remove methods. */
1026 @SuppressWarnings("unchecked")
1027 private boolean bulkRemove(Predicate<? super E> filter) {
1028 boolean removed = false;
1029 Node<E> p = null, ancestor = head;
1030 Node<E>[] nodes = null;
1031 int n, len = 0;
1032 do {
1033 // 1. Extract batch of up to 64 elements while holding the lock.
1034 long deathRow = 0; // "bitset" of size 64
1035 fullyLock();
1036 try {
1037 if (nodes == null) {
1038 if (p == null) p = head.next;
1039 for (Node<E> q = p; q != null; q = succ(q))
1040 if (q.item != null && ++len == 64)
1041 break;
1042 nodes = (Node<E>[]) new Node<?>[len];
1043 }
1044 for (n = 0; p != null && n < len; p = succ(p))
1045 nodes[n++] = p;
1046 } finally {
1047 fullyUnlock();
1048 }
1049
1050 // 2. Run the filter on the elements while lock is free.
1051 for (int i = 0; i < n; i++) {
1052 final E e;
1053 if ((e = nodes[i].item) != null && filter.test(e))
1054 deathRow |= 1L << i;
1055 }
1056
1057 // 3. Remove any filtered elements while holding the lock.
1058 if (deathRow != 0) {
1059 fullyLock();
1060 try {
1061 for (int i = 0; i < n; i++) {
1062 final Node<E> q;
1063 if ((deathRow & (1L << i)) != 0L
1064 && (q = nodes[i]).item != null) {
1065 ancestor = findPred(q, ancestor);
1066 unlink(q, ancestor);
1067 removed = true;
1068 }
1069 }
1070 } finally {
1071 fullyUnlock();
1072 }
1073 }
1074 } while (n > 0 && p != null);
1075 return removed;
1076 }
1077
1078 /**
1079 * Saves this queue to a stream (that is, serializes it).
1080 *
1081 * @param s the stream
1082 * @throws java.io.IOException if an I/O error occurs
1083 * @serialData The capacity is emitted (int), followed by all of
1084 * its elements (each an {@code Object}) in the proper order,
1085 * followed by a null
1086 */
1087 private void writeObject(java.io.ObjectOutputStream s)
1088 throws java.io.IOException {
1089
1090 fullyLock();
1091 try {
1092 // Write out any hidden stuff, plus capacity
1093 s.defaultWriteObject();
1094
1095 // Write out all elements in the proper order.
1096 for (Node<E> p = head.next; p != null; p = p.next)
1097 s.writeObject(p.item);
1098
1099 // Use trailing null as sentinel
1100 s.writeObject(null);
1101 } finally {
1102 fullyUnlock();
1103 }
1104 }
1105
1106 /**
1107 * Reconstitutes this queue from a stream (that is, deserializes it).
1108 * @param s the stream
1109 * @throws ClassNotFoundException if the class of a serialized object
1110 * could not be found
1111 * @throws java.io.IOException if an I/O error occurs
1112 */
1113 private void readObject(java.io.ObjectInputStream s)
1114 throws java.io.IOException, ClassNotFoundException {
1115 // Read in capacity, and any hidden stuff
1116 s.defaultReadObject();
1117
1118 count.set(0);
1119 last = head = new Node<E>(null);
1120
1121 // Read in all elements and place in queue
1122 for (;;) {
1123 @SuppressWarnings("unchecked")
1124 E item = (E)s.readObject();
1125 if (item == null)
1126 break;
1127 add(item);
1128 }
1129 }
1130 }