ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.107
Committed: Thu Dec 29 17:42:05 2016 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.106: +92 -0 lines
Log Message:
implement optimized bulk remove methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.Iterator;
12 import java.util.NoSuchElementException;
13 import java.util.Objects;
14 import java.util.Spliterator;
15 import java.util.Spliterators;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.function.Consumer;
20 import java.util.function.Predicate;
21
22 /**
23 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
24 * linked nodes.
25 * This queue orders elements FIFO (first-in-first-out).
26 * The <em>head</em> of the queue is that element that has been on the
27 * queue the longest time.
28 * The <em>tail</em> of the queue is that element that has been on the
29 * queue the shortest time. New elements
30 * are inserted at the tail of the queue, and the queue retrieval
31 * operations obtain elements at the head of the queue.
32 * Linked queues typically have higher throughput than array-based queues but
33 * less predictable performance in most concurrent applications.
34 *
35 * <p>The optional capacity bound constructor argument serves as a
36 * way to prevent excessive queue expansion. The capacity, if unspecified,
37 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
38 * dynamically created upon each insertion unless this would bring the
39 * queue above capacity.
40 *
41 * <p>This class and its iterator implement all of the
42 * <em>optional</em> methods of the {@link Collection} and {@link
43 * Iterator} interfaces.
44 *
45 * <p>This class is a member of the
46 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
47 * Java Collections Framework</a>.
48 *
49 * @since 1.5
50 * @author Doug Lea
51 * @param <E> the type of elements held in this queue
52 */
53 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
54 implements BlockingQueue<E>, java.io.Serializable {
55 private static final long serialVersionUID = -6903933977591709194L;
56
57 /*
58 * A variant of the "two lock queue" algorithm. The putLock gates
59 * entry to put (and offer), and has an associated condition for
60 * waiting puts. Similarly for the takeLock. The "count" field
61 * that they both rely on is maintained as an atomic to avoid
62 * needing to get both locks in most cases. Also, to minimize need
63 * for puts to get takeLock and vice-versa, cascading notifies are
64 * used. When a put notices that it has enabled at least one take,
65 * it signals taker. That taker in turn signals others if more
66 * items have been entered since the signal. And symmetrically for
67 * takes signalling puts. Operations such as remove(Object) and
68 * iterators acquire both locks.
69 *
70 * Visibility between writers and readers is provided as follows:
71 *
72 * Whenever an element is enqueued, the putLock is acquired and
73 * count updated. A subsequent reader guarantees visibility to the
74 * enqueued Node by either acquiring the putLock (via fullyLock)
75 * or by acquiring the takeLock, and then reading n = count.get();
76 * this gives visibility to the first n items.
77 *
78 * To implement weakly consistent iterators, it appears we need to
79 * keep all Nodes GC-reachable from a predecessor dequeued Node.
80 * That would cause two problems:
81 * - allow a rogue Iterator to cause unbounded memory retention
82 * - cause cross-generational linking of old Nodes to new Nodes if
83 * a Node was tenured while live, which generational GCs have a
84 * hard time dealing with, causing repeated major collections.
85 * However, only non-deleted Nodes need to be reachable from
86 * dequeued Nodes, and reachability does not necessarily have to
87 * be of the kind understood by the GC. We use the trick of
88 * linking a Node that has just been dequeued to itself. Such a
89 * self-link implicitly means to advance to head.next.
90 */
91
92 /**
93 * Linked list node class.
94 */
95 static class Node<E> {
96 E item;
97
98 /**
99 * One of:
100 * - the real successor Node
101 * - this Node, meaning the successor is head.next
102 * - null, meaning there is no successor (this is the last node)
103 */
104 Node<E> next;
105
106 Node(E x) { item = x; }
107 }
108
109 /** The capacity bound, or Integer.MAX_VALUE if none */
110 private final int capacity;
111
112 /** Current number of elements */
113 private final AtomicInteger count = new AtomicInteger();
114
115 /**
116 * Head of linked list.
117 * Invariant: head.item == null
118 */
119 transient Node<E> head;
120
121 /**
122 * Tail of linked list.
123 * Invariant: last.next == null
124 */
125 private transient Node<E> last;
126
127 /** Lock held by take, poll, etc */
128 private final ReentrantLock takeLock = new ReentrantLock();
129
130 /** Wait queue for waiting takes */
131 private final Condition notEmpty = takeLock.newCondition();
132
133 /** Lock held by put, offer, etc */
134 private final ReentrantLock putLock = new ReentrantLock();
135
136 /** Wait queue for waiting puts */
137 private final Condition notFull = putLock.newCondition();
138
139 /**
140 * Signals a waiting take. Called only from put/offer (which do not
141 * otherwise ordinarily lock takeLock.)
142 */
143 private void signalNotEmpty() {
144 final ReentrantLock takeLock = this.takeLock;
145 takeLock.lock();
146 try {
147 notEmpty.signal();
148 } finally {
149 takeLock.unlock();
150 }
151 }
152
153 /**
154 * Signals a waiting put. Called only from take/poll.
155 */
156 private void signalNotFull() {
157 final ReentrantLock putLock = this.putLock;
158 putLock.lock();
159 try {
160 notFull.signal();
161 } finally {
162 putLock.unlock();
163 }
164 }
165
166 /**
167 * Links node at end of queue.
168 *
169 * @param node the node
170 */
171 private void enqueue(Node<E> node) {
172 // assert putLock.isHeldByCurrentThread();
173 // assert last.next == null;
174 last = last.next = node;
175 }
176
177 /**
178 * Removes a node from head of queue.
179 *
180 * @return the node
181 */
182 private E dequeue() {
183 // assert takeLock.isHeldByCurrentThread();
184 // assert head.item == null;
185 Node<E> h = head;
186 Node<E> first = h.next;
187 h.next = h; // help GC
188 head = first;
189 E x = first.item;
190 first.item = null;
191 return x;
192 }
193
194 /**
195 * Locks to prevent both puts and takes.
196 */
197 void fullyLock() {
198 putLock.lock();
199 takeLock.lock();
200 }
201
202 /**
203 * Unlocks to allow both puts and takes.
204 */
205 void fullyUnlock() {
206 takeLock.unlock();
207 putLock.unlock();
208 }
209
210 /**
211 * Creates a {@code LinkedBlockingQueue} with a capacity of
212 * {@link Integer#MAX_VALUE}.
213 */
214 public LinkedBlockingQueue() {
215 this(Integer.MAX_VALUE);
216 }
217
218 /**
219 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
220 *
221 * @param capacity the capacity of this queue
222 * @throws IllegalArgumentException if {@code capacity} is not greater
223 * than zero
224 */
225 public LinkedBlockingQueue(int capacity) {
226 if (capacity <= 0) throw new IllegalArgumentException();
227 this.capacity = capacity;
228 last = head = new Node<E>(null);
229 }
230
231 /**
232 * Creates a {@code LinkedBlockingQueue} with a capacity of
233 * {@link Integer#MAX_VALUE}, initially containing the elements of the
234 * given collection,
235 * added in traversal order of the collection's iterator.
236 *
237 * @param c the collection of elements to initially contain
238 * @throws NullPointerException if the specified collection or any
239 * of its elements are null
240 */
241 public LinkedBlockingQueue(Collection<? extends E> c) {
242 this(Integer.MAX_VALUE);
243 final ReentrantLock putLock = this.putLock;
244 putLock.lock(); // Never contended, but necessary for visibility
245 try {
246 int n = 0;
247 for (E e : c) {
248 if (e == null)
249 throw new NullPointerException();
250 if (n == capacity)
251 throw new IllegalStateException("Queue full");
252 enqueue(new Node<E>(e));
253 ++n;
254 }
255 count.set(n);
256 } finally {
257 putLock.unlock();
258 }
259 }
260
261 // this doc comment is overridden to remove the reference to collections
262 // greater in size than Integer.MAX_VALUE
263 /**
264 * Returns the number of elements in this queue.
265 *
266 * @return the number of elements in this queue
267 */
268 public int size() {
269 return count.get();
270 }
271
272 // this doc comment is a modified copy of the inherited doc comment,
273 // without the reference to unlimited queues.
274 /**
275 * Returns the number of additional elements that this queue can ideally
276 * (in the absence of memory or resource constraints) accept without
277 * blocking. This is always equal to the initial capacity of this queue
278 * less the current {@code size} of this queue.
279 *
280 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
281 * an element will succeed by inspecting {@code remainingCapacity}
282 * because it may be the case that another thread is about to
283 * insert or remove an element.
284 */
285 public int remainingCapacity() {
286 return capacity - count.get();
287 }
288
289 /**
290 * Inserts the specified element at the tail of this queue, waiting if
291 * necessary for space to become available.
292 *
293 * @throws InterruptedException {@inheritDoc}
294 * @throws NullPointerException {@inheritDoc}
295 */
296 public void put(E e) throws InterruptedException {
297 if (e == null) throw new NullPointerException();
298 // Note: convention in all put/take/etc is to preset local var
299 // holding count negative to indicate failure unless set.
300 int c = -1;
301 Node<E> node = new Node<E>(e);
302 final ReentrantLock putLock = this.putLock;
303 final AtomicInteger count = this.count;
304 putLock.lockInterruptibly();
305 try {
306 /*
307 * Note that count is used in wait guard even though it is
308 * not protected by lock. This works because count can
309 * only decrease at this point (all other puts are shut
310 * out by lock), and we (or some other waiting put) are
311 * signalled if it ever changes from capacity. Similarly
312 * for all other uses of count in other wait guards.
313 */
314 while (count.get() == capacity) {
315 notFull.await();
316 }
317 enqueue(node);
318 c = count.getAndIncrement();
319 if (c + 1 < capacity)
320 notFull.signal();
321 } finally {
322 putLock.unlock();
323 }
324 if (c == 0)
325 signalNotEmpty();
326 }
327
328 /**
329 * Inserts the specified element at the tail of this queue, waiting if
330 * necessary up to the specified wait time for space to become available.
331 *
332 * @return {@code true} if successful, or {@code false} if
333 * the specified waiting time elapses before space is available
334 * @throws InterruptedException {@inheritDoc}
335 * @throws NullPointerException {@inheritDoc}
336 */
337 public boolean offer(E e, long timeout, TimeUnit unit)
338 throws InterruptedException {
339
340 if (e == null) throw new NullPointerException();
341 long nanos = unit.toNanos(timeout);
342 int c = -1;
343 final ReentrantLock putLock = this.putLock;
344 final AtomicInteger count = this.count;
345 putLock.lockInterruptibly();
346 try {
347 while (count.get() == capacity) {
348 if (nanos <= 0L)
349 return false;
350 nanos = notFull.awaitNanos(nanos);
351 }
352 enqueue(new Node<E>(e));
353 c = count.getAndIncrement();
354 if (c + 1 < capacity)
355 notFull.signal();
356 } finally {
357 putLock.unlock();
358 }
359 if (c == 0)
360 signalNotEmpty();
361 return true;
362 }
363
364 /**
365 * Inserts the specified element at the tail of this queue if it is
366 * possible to do so immediately without exceeding the queue's capacity,
367 * returning {@code true} upon success and {@code false} if this queue
368 * is full.
369 * When using a capacity-restricted queue, this method is generally
370 * preferable to method {@link BlockingQueue#add add}, which can fail to
371 * insert an element only by throwing an exception.
372 *
373 * @throws NullPointerException if the specified element is null
374 */
375 public boolean offer(E e) {
376 if (e == null) throw new NullPointerException();
377 final AtomicInteger count = this.count;
378 if (count.get() == capacity)
379 return false;
380 int c = -1;
381 Node<E> node = new Node<E>(e);
382 final ReentrantLock putLock = this.putLock;
383 putLock.lock();
384 try {
385 if (count.get() < capacity) {
386 enqueue(node);
387 c = count.getAndIncrement();
388 if (c + 1 < capacity)
389 notFull.signal();
390 }
391 } finally {
392 putLock.unlock();
393 }
394 if (c == 0)
395 signalNotEmpty();
396 return c >= 0;
397 }
398
399 public E take() throws InterruptedException {
400 E x;
401 int c = -1;
402 final AtomicInteger count = this.count;
403 final ReentrantLock takeLock = this.takeLock;
404 takeLock.lockInterruptibly();
405 try {
406 while (count.get() == 0) {
407 notEmpty.await();
408 }
409 x = dequeue();
410 c = count.getAndDecrement();
411 if (c > 1)
412 notEmpty.signal();
413 } finally {
414 takeLock.unlock();
415 }
416 if (c == capacity)
417 signalNotFull();
418 return x;
419 }
420
421 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
422 E x = null;
423 int c = -1;
424 long nanos = unit.toNanos(timeout);
425 final AtomicInteger count = this.count;
426 final ReentrantLock takeLock = this.takeLock;
427 takeLock.lockInterruptibly();
428 try {
429 while (count.get() == 0) {
430 if (nanos <= 0L)
431 return null;
432 nanos = notEmpty.awaitNanos(nanos);
433 }
434 x = dequeue();
435 c = count.getAndDecrement();
436 if (c > 1)
437 notEmpty.signal();
438 } finally {
439 takeLock.unlock();
440 }
441 if (c == capacity)
442 signalNotFull();
443 return x;
444 }
445
446 public E poll() {
447 final AtomicInteger count = this.count;
448 if (count.get() == 0)
449 return null;
450 E x = null;
451 int c = -1;
452 final ReentrantLock takeLock = this.takeLock;
453 takeLock.lock();
454 try {
455 if (count.get() > 0) {
456 x = dequeue();
457 c = count.getAndDecrement();
458 if (c > 1)
459 notEmpty.signal();
460 }
461 } finally {
462 takeLock.unlock();
463 }
464 if (c == capacity)
465 signalNotFull();
466 return x;
467 }
468
469 public E peek() {
470 if (count.get() == 0)
471 return null;
472 final ReentrantLock takeLock = this.takeLock;
473 takeLock.lock();
474 try {
475 return (count.get() > 0) ? head.next.item : null;
476 } finally {
477 takeLock.unlock();
478 }
479 }
480
481 /**
482 * Unlinks interior Node p with predecessor pred.
483 */
484 void unlink(Node<E> p, Node<E> pred) {
485 // assert putLock.isHeldByCurrentThread();
486 // assert takeLock.isHeldByCurrentThread();
487 // p.next is not changed, to allow iterators that are
488 // traversing p to maintain their weak-consistency guarantee.
489 p.item = null;
490 pred.next = p.next;
491 if (last == p)
492 last = pred;
493 if (count.getAndDecrement() == capacity)
494 notFull.signal();
495 }
496
497 /**
498 * Removes a single instance of the specified element from this queue,
499 * if it is present. More formally, removes an element {@code e} such
500 * that {@code o.equals(e)}, if this queue contains one or more such
501 * elements.
502 * Returns {@code true} if this queue contained the specified element
503 * (or equivalently, if this queue changed as a result of the call).
504 *
505 * @param o element to be removed from this queue, if present
506 * @return {@code true} if this queue changed as a result of the call
507 */
508 public boolean remove(Object o) {
509 if (o == null) return false;
510 fullyLock();
511 try {
512 for (Node<E> pred = head, p = pred.next;
513 p != null;
514 pred = p, p = p.next) {
515 if (o.equals(p.item)) {
516 unlink(p, pred);
517 return true;
518 }
519 }
520 return false;
521 } finally {
522 fullyUnlock();
523 }
524 }
525
526 /**
527 * Returns {@code true} if this queue contains the specified element.
528 * More formally, returns {@code true} if and only if this queue contains
529 * at least one element {@code e} such that {@code o.equals(e)}.
530 *
531 * @param o object to be checked for containment in this queue
532 * @return {@code true} if this queue contains the specified element
533 */
534 public boolean contains(Object o) {
535 if (o == null) return false;
536 fullyLock();
537 try {
538 for (Node<E> p = head.next; p != null; p = p.next)
539 if (o.equals(p.item))
540 return true;
541 return false;
542 } finally {
543 fullyUnlock();
544 }
545 }
546
547 /**
548 * Returns an array containing all of the elements in this queue, in
549 * proper sequence.
550 *
551 * <p>The returned array will be "safe" in that no references to it are
552 * maintained by this queue. (In other words, this method must allocate
553 * a new array). The caller is thus free to modify the returned array.
554 *
555 * <p>This method acts as bridge between array-based and collection-based
556 * APIs.
557 *
558 * @return an array containing all of the elements in this queue
559 */
560 public Object[] toArray() {
561 fullyLock();
562 try {
563 int size = count.get();
564 Object[] a = new Object[size];
565 int k = 0;
566 for (Node<E> p = head.next; p != null; p = p.next)
567 a[k++] = p.item;
568 return a;
569 } finally {
570 fullyUnlock();
571 }
572 }
573
574 /**
575 * Returns an array containing all of the elements in this queue, in
576 * proper sequence; the runtime type of the returned array is that of
577 * the specified array. If the queue fits in the specified array, it
578 * is returned therein. Otherwise, a new array is allocated with the
579 * runtime type of the specified array and the size of this queue.
580 *
581 * <p>If this queue fits in the specified array with room to spare
582 * (i.e., the array has more elements than this queue), the element in
583 * the array immediately following the end of the queue is set to
584 * {@code null}.
585 *
586 * <p>Like the {@link #toArray()} method, this method acts as bridge between
587 * array-based and collection-based APIs. Further, this method allows
588 * precise control over the runtime type of the output array, and may,
589 * under certain circumstances, be used to save allocation costs.
590 *
591 * <p>Suppose {@code x} is a queue known to contain only strings.
592 * The following code can be used to dump the queue into a newly
593 * allocated array of {@code String}:
594 *
595 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
596 *
597 * Note that {@code toArray(new Object[0])} is identical in function to
598 * {@code toArray()}.
599 *
600 * @param a the array into which the elements of the queue are to
601 * be stored, if it is big enough; otherwise, a new array of the
602 * same runtime type is allocated for this purpose
603 * @return an array containing all of the elements in this queue
604 * @throws ArrayStoreException if the runtime type of the specified array
605 * is not a supertype of the runtime type of every element in
606 * this queue
607 * @throws NullPointerException if the specified array is null
608 */
609 @SuppressWarnings("unchecked")
610 public <T> T[] toArray(T[] a) {
611 fullyLock();
612 try {
613 int size = count.get();
614 if (a.length < size)
615 a = (T[])java.lang.reflect.Array.newInstance
616 (a.getClass().getComponentType(), size);
617
618 int k = 0;
619 for (Node<E> p = head.next; p != null; p = p.next)
620 a[k++] = (T)p.item;
621 if (a.length > k)
622 a[k] = null;
623 return a;
624 } finally {
625 fullyUnlock();
626 }
627 }
628
629 public String toString() {
630 return Helpers.collectionToString(this);
631 }
632
633 /**
634 * Atomically removes all of the elements from this queue.
635 * The queue will be empty after this call returns.
636 */
637 public void clear() {
638 fullyLock();
639 try {
640 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
641 h.next = h;
642 p.item = null;
643 }
644 head = last;
645 // assert head.item == null && head.next == null;
646 if (count.getAndSet(0) == capacity)
647 notFull.signal();
648 } finally {
649 fullyUnlock();
650 }
651 }
652
653 /**
654 * @throws UnsupportedOperationException {@inheritDoc}
655 * @throws ClassCastException {@inheritDoc}
656 * @throws NullPointerException {@inheritDoc}
657 * @throws IllegalArgumentException {@inheritDoc}
658 */
659 public int drainTo(Collection<? super E> c) {
660 return drainTo(c, Integer.MAX_VALUE);
661 }
662
663 /**
664 * @throws UnsupportedOperationException {@inheritDoc}
665 * @throws ClassCastException {@inheritDoc}
666 * @throws NullPointerException {@inheritDoc}
667 * @throws IllegalArgumentException {@inheritDoc}
668 */
669 public int drainTo(Collection<? super E> c, int maxElements) {
670 Objects.requireNonNull(c);
671 if (c == this)
672 throw new IllegalArgumentException();
673 if (maxElements <= 0)
674 return 0;
675 boolean signalNotFull = false;
676 final ReentrantLock takeLock = this.takeLock;
677 takeLock.lock();
678 try {
679 int n = Math.min(maxElements, count.get());
680 // count.get provides visibility to first n Nodes
681 Node<E> h = head;
682 int i = 0;
683 try {
684 while (i < n) {
685 Node<E> p = h.next;
686 c.add(p.item);
687 p.item = null;
688 h.next = h;
689 h = p;
690 ++i;
691 }
692 return n;
693 } finally {
694 // Restore invariants even if c.add() threw
695 if (i > 0) {
696 // assert h.item == null;
697 head = h;
698 signalNotFull = (count.getAndAdd(-i) == capacity);
699 }
700 }
701 } finally {
702 takeLock.unlock();
703 if (signalNotFull)
704 signalNotFull();
705 }
706 }
707
708 /**
709 * Used for any element traversal that is not entirely under lock.
710 * Such traversals must handle both:
711 * - dequeued nodes (p.next == p)
712 * - (possibly multiple) interior removed nodes (p.item == null)
713 */
714 Node<E> succ(Node<E> p) {
715 if (p == (p = p.next))
716 p = head.next;
717 return p;
718 }
719
720 /**
721 * Returns an iterator over the elements in this queue in proper sequence.
722 * The elements will be returned in order from first (head) to last (tail).
723 *
724 * <p>The returned iterator is
725 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
726 *
727 * @return an iterator over the elements in this queue in proper sequence
728 */
729 public Iterator<E> iterator() {
730 return new Itr();
731 }
732
733 private class Itr implements Iterator<E> {
734 /*
735 * Basic weakly-consistent iterator. At all times hold the next
736 * item to hand out so that if hasNext() reports true, we will
737 * still have it to return even if lost race with a take etc.
738 */
739
740 private Node<E> next;
741 private E nextItem;
742 private Node<E> lastRet;
743
744 Itr() {
745 fullyLock();
746 try {
747 if ((next = head.next) != null)
748 nextItem = next.item;
749 } finally {
750 fullyUnlock();
751 }
752 }
753
754 public boolean hasNext() {
755 return next != null;
756 }
757
758 public E next() {
759 Node<E> p;
760 if ((p = next) == null)
761 throw new NoSuchElementException();
762 lastRet = p;
763 E x = nextItem;
764 fullyLock();
765 try {
766 E e = null;
767 for (p = p.next; p != null && (e = p.item) == null; )
768 p = succ(p);
769 next = p;
770 nextItem = e;
771 } finally {
772 fullyUnlock();
773 }
774 return x;
775 }
776
777 public void forEachRemaining(Consumer<? super E> action) {
778 // A variant of forEachFrom
779 Objects.requireNonNull(action);
780 Node<E> p;
781 if ((p = next) == null) return;
782 lastRet = p;
783 next = null;
784 final int batchSize = 32;
785 Object[] es = null;
786 int n, len = 1;
787 do {
788 fullyLock();
789 try {
790 if (es == null) {
791 p = p.next;
792 for (Node<E> q = p; q != null; q = succ(q))
793 if (q.item != null && ++len == batchSize)
794 break;
795 es = new Object[len];
796 es[0] = nextItem;
797 nextItem = null;
798 n = 1;
799 } else
800 n = 0;
801 for (; p != null && n < len; p = succ(p))
802 if ((es[n] = p.item) != null) {
803 lastRet = p;
804 n++;
805 }
806 } finally {
807 fullyUnlock();
808 }
809 for (int i = 0; i < n; i++) {
810 @SuppressWarnings("unchecked") E e = (E) es[i];
811 action.accept(e);
812 }
813 } while (n > 0 && p != null);
814 }
815
816 public void remove() {
817 if (lastRet == null)
818 throw new IllegalStateException();
819 fullyLock();
820 try {
821 Node<E> node = lastRet;
822 lastRet = null;
823 for (Node<E> pred = head, p = pred.next;
824 p != null;
825 pred = p, p = p.next) {
826 if (p == node) {
827 unlink(p, pred);
828 break;
829 }
830 }
831 } finally {
832 fullyUnlock();
833 }
834 }
835 }
836
837 /**
838 * A customized variant of Spliterators.IteratorSpliterator.
839 * Keep this class in sync with (very similar) LBDSpliterator.
840 */
841 private final class LBQSpliterator implements Spliterator<E> {
842 static final int MAX_BATCH = 1 << 25; // max batch array size;
843 Node<E> current; // current node; null until initialized
844 int batch; // batch size for splits
845 boolean exhausted; // true when no more nodes
846 long est = size(); // size estimate
847
848 LBQSpliterator() {}
849
850 public long estimateSize() { return est; }
851
852 public Spliterator<E> trySplit() {
853 Node<E> h;
854 if (!exhausted &&
855 ((h = current) != null || (h = head.next) != null)
856 && h.next != null) {
857 int n = batch = Math.min(batch + 1, MAX_BATCH);
858 Object[] a = new Object[n];
859 int i = 0;
860 Node<E> p = current;
861 fullyLock();
862 try {
863 if (p != null || (p = head.next) != null)
864 for (; p != null && i < n; p = succ(p))
865 if ((a[i] = p.item) != null)
866 i++;
867 } finally {
868 fullyUnlock();
869 }
870 if ((current = p) == null) {
871 est = 0L;
872 exhausted = true;
873 }
874 else if ((est -= i) < 0L)
875 est = 0L;
876 if (i > 0)
877 return Spliterators.spliterator
878 (a, 0, i, (Spliterator.ORDERED |
879 Spliterator.NONNULL |
880 Spliterator.CONCURRENT));
881 }
882 return null;
883 }
884
885 public boolean tryAdvance(Consumer<? super E> action) {
886 Objects.requireNonNull(action);
887 if (!exhausted) {
888 E e = null;
889 fullyLock();
890 try {
891 Node<E> p;
892 if ((p = current) != null || (p = head.next) != null)
893 do {
894 e = p.item;
895 p = succ(p);
896 } while (e == null && p != null);
897 if ((current = p) == null)
898 exhausted = true;
899 } finally {
900 fullyUnlock();
901 }
902 if (e != null) {
903 action.accept(e);
904 return true;
905 }
906 }
907 return false;
908 }
909
910 public void forEachRemaining(Consumer<? super E> action) {
911 Objects.requireNonNull(action);
912 if (!exhausted) {
913 exhausted = true;
914 Node<E> p = current;
915 current = null;
916 forEachFrom(action, p);
917 }
918 }
919
920 public int characteristics() {
921 return (Spliterator.ORDERED |
922 Spliterator.NONNULL |
923 Spliterator.CONCURRENT);
924 }
925 }
926
927 /**
928 * Returns a {@link Spliterator} over the elements in this queue.
929 *
930 * <p>The returned spliterator is
931 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
932 *
933 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
934 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
935 *
936 * @implNote
937 * The {@code Spliterator} implements {@code trySplit} to permit limited
938 * parallelism.
939 *
940 * @return a {@code Spliterator} over the elements in this queue
941 * @since 1.8
942 */
943 public Spliterator<E> spliterator() {
944 return new LBQSpliterator();
945 }
946
947 /**
948 * @throws NullPointerException {@inheritDoc}
949 */
950 public void forEach(Consumer<? super E> action) {
951 Objects.requireNonNull(action);
952 forEachFrom(action, null);
953 }
954
955 /**
956 * Runs action on each element found during a traversal starting at p.
957 * If p is null, traversal starts at head.
958 */
959 void forEachFrom(Consumer<? super E> action, Node<E> p) {
960 // Extract batches of elements while holding the lock; then
961 // run the action on the elements while not
962 final int batchSize = 32; // max number of elements per batch
963 Object[] es = null; // container for batch of elements
964 int n, len = 0;
965 do {
966 fullyLock();
967 try {
968 if (es == null) {
969 if (p == null) p = head.next;
970 for (Node<E> q = p; q != null; q = succ(q))
971 if (q.item != null && ++len == batchSize)
972 break;
973 es = new Object[len];
974 }
975 for (n = 0; p != null && n < len; p = succ(p))
976 if ((es[n] = p.item) != null)
977 n++;
978 } finally {
979 fullyUnlock();
980 }
981 for (int i = 0; i < n; i++) {
982 @SuppressWarnings("unchecked") E e = (E) es[i];
983 action.accept(e);
984 }
985 } while (n > 0 && p != null);
986 }
987
988 /**
989 * @throws NullPointerException {@inheritDoc}
990 */
991 public boolean removeIf(Predicate<? super E> filter) {
992 Objects.requireNonNull(filter);
993 return bulkRemove(filter);
994 }
995
996 /**
997 * @throws NullPointerException {@inheritDoc}
998 */
999 public boolean removeAll(Collection<?> c) {
1000 Objects.requireNonNull(c);
1001 return bulkRemove(e -> c.contains(e));
1002 }
1003
1004 /**
1005 * @throws NullPointerException {@inheritDoc}
1006 */
1007 public boolean retainAll(Collection<?> c) {
1008 Objects.requireNonNull(c);
1009 return bulkRemove(e -> !c.contains(e));
1010 }
1011
1012 /**
1013 * Returns the predecessor of live node p, given a node that was
1014 * once a live ancestor of p (or head); allows unlinking of p.
1015 */
1016 private Node<E> findPred(Node<E> p, Node<E> ancestor) {
1017 // assert p.item != null;
1018 if (ancestor.item == null)
1019 ancestor = head;
1020 // Fails with NPE if precondition not satisfied
1021 for (Node<E> q; (q = ancestor.next) != p; )
1022 ancestor = q;
1023 return ancestor;
1024 }
1025
1026 /** Implementation of bulk remove methods. */
1027 @SuppressWarnings("unchecked")
1028 private boolean bulkRemove(Predicate<? super E> filter) {
1029 boolean removed = false;
1030 Node<E> p = null, ancestor = head;
1031 Node<E>[] nodes = null;
1032 int n, len = 0;
1033 do {
1034 // 1. Extract batch of up to 64 elements while holding the lock.
1035 long deathRow = 0; // "bitset" of size 64
1036 fullyLock();
1037 try {
1038 if (nodes == null) {
1039 if (p == null) p = head.next;
1040 for (Node<E> q = p; q != null; q = succ(q))
1041 if (q.item != null && ++len == 64)
1042 break;
1043 nodes = (Node<E>[]) new Node<?>[len];
1044 }
1045 for (n = 0; p != null && n < len; p = succ(p))
1046 nodes[n++] = p;
1047 } finally {
1048 fullyUnlock();
1049 }
1050
1051 // 2. Run the filter on the elements while lock is free.
1052 for (int i = 0; i < n; i++) {
1053 final E e;
1054 if ((e = nodes[i].item) != null && filter.test(e))
1055 deathRow |= 1L << i;
1056 }
1057
1058 // 3. Remove any filtered elements while holding the lock.
1059 if (deathRow != 0) {
1060 fullyLock();
1061 try {
1062 for (int i = 0; i < n; i++) {
1063 final Node<E> q;
1064 if ((deathRow & (1L << i)) != 0L
1065 && (q = nodes[i]).item != null) {
1066 ancestor = findPred(q, ancestor);
1067 unlink(q, ancestor);
1068 removed = true;
1069 }
1070 }
1071 } finally {
1072 fullyUnlock();
1073 }
1074 }
1075 } while (n > 0 && p != null);
1076 return removed;
1077 }
1078
1079 /**
1080 * Saves this queue to a stream (that is, serializes it).
1081 *
1082 * @param s the stream
1083 * @throws java.io.IOException if an I/O error occurs
1084 * @serialData The capacity is emitted (int), followed by all of
1085 * its elements (each an {@code Object}) in the proper order,
1086 * followed by a null
1087 */
1088 private void writeObject(java.io.ObjectOutputStream s)
1089 throws java.io.IOException {
1090
1091 fullyLock();
1092 try {
1093 // Write out any hidden stuff, plus capacity
1094 s.defaultWriteObject();
1095
1096 // Write out all elements in the proper order.
1097 for (Node<E> p = head.next; p != null; p = p.next)
1098 s.writeObject(p.item);
1099
1100 // Use trailing null as sentinel
1101 s.writeObject(null);
1102 } finally {
1103 fullyUnlock();
1104 }
1105 }
1106
1107 /**
1108 * Reconstitutes this queue from a stream (that is, deserializes it).
1109 * @param s the stream
1110 * @throws ClassNotFoundException if the class of a serialized object
1111 * could not be found
1112 * @throws java.io.IOException if an I/O error occurs
1113 */
1114 private void readObject(java.io.ObjectInputStream s)
1115 throws java.io.IOException, ClassNotFoundException {
1116 // Read in capacity, and any hidden stuff
1117 s.defaultReadObject();
1118
1119 count.set(0);
1120 last = head = new Node<E>(null);
1121
1122 // Read in all elements and place in queue
1123 for (;;) {
1124 @SuppressWarnings("unchecked")
1125 E item = (E)s.readObject();
1126 if (item == null)
1127 break;
1128 add(item);
1129 }
1130 }
1131 }