ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.1
Committed: Sat Mar 26 06:22:50 2016 UTC (8 years, 1 month ago) by jsr166
Branch: MAIN
Log Message:
fork jdk8 maintenance branch for source and jtreg tests

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.AbstractQueue;
10     import java.util.Collection;
11     import java.util.Iterator;
12     import java.util.NoSuchElementException;
13     import java.util.Spliterator;
14     import java.util.Spliterators;
15     import java.util.concurrent.atomic.AtomicInteger;
16     import java.util.concurrent.locks.Condition;
17     import java.util.concurrent.locks.ReentrantLock;
18     import java.util.function.Consumer;
19    
20     /**
21     * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
22     * linked nodes.
23     * This queue orders elements FIFO (first-in-first-out).
24     * The <em>head</em> of the queue is that element that has been on the
25     * queue the longest time.
26     * The <em>tail</em> of the queue is that element that has been on the
27     * queue the shortest time. New elements
28     * are inserted at the tail of the queue, and the queue retrieval
29     * operations obtain elements at the head of the queue.
30     * Linked queues typically have higher throughput than array-based queues but
31     * less predictable performance in most concurrent applications.
32     *
33     * <p>The optional capacity bound constructor argument serves as a
34     * way to prevent excessive queue expansion. The capacity, if unspecified,
35     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
36     * dynamically created upon each insertion unless this would bring the
37     * queue above capacity.
38     *
39     * <p>This class and its iterator implement all of the
40     * <em>optional</em> methods of the {@link Collection} and {@link
41     * Iterator} interfaces.
42     *
43     * <p>This class is a member of the
44     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
45     * Java Collections Framework</a>.
46     *
47     * @since 1.5
48     * @author Doug Lea
49     * @param <E> the type of elements held in this queue
50     */
51     public class LinkedBlockingQueue<E> extends AbstractQueue<E>
52     implements BlockingQueue<E>, java.io.Serializable {
53     private static final long serialVersionUID = -6903933977591709194L;
54    
55     /*
56     * A variant of the "two lock queue" algorithm. The putLock gates
57     * entry to put (and offer), and has an associated condition for
58     * waiting puts. Similarly for the takeLock. The "count" field
59     * that they both rely on is maintained as an atomic to avoid
60     * needing to get both locks in most cases. Also, to minimize need
61     * for puts to get takeLock and vice-versa, cascading notifies are
62     * used. When a put notices that it has enabled at least one take,
63     * it signals taker. That taker in turn signals others if more
64     * items have been entered since the signal. And symmetrically for
65     * takes signalling puts. Operations such as remove(Object) and
66     * iterators acquire both locks.
67     *
68     * Visibility between writers and readers is provided as follows:
69     *
70     * Whenever an element is enqueued, the putLock is acquired and
71     * count updated. A subsequent reader guarantees visibility to the
72     * enqueued Node by either acquiring the putLock (via fullyLock)
73     * or by acquiring the takeLock, and then reading n = count.get();
74     * this gives visibility to the first n items.
75     *
76     * To implement weakly consistent iterators, it appears we need to
77     * keep all Nodes GC-reachable from a predecessor dequeued Node.
78     * That would cause two problems:
79     * - allow a rogue Iterator to cause unbounded memory retention
80     * - cause cross-generational linking of old Nodes to new Nodes if
81     * a Node was tenured while live, which generational GCs have a
82     * hard time dealing with, causing repeated major collections.
83     * However, only non-deleted Nodes need to be reachable from
84     * dequeued Nodes, and reachability does not necessarily have to
85     * be of the kind understood by the GC. We use the trick of
86     * linking a Node that has just been dequeued to itself. Such a
87     * self-link implicitly means to advance to head.next.
88     */
89    
90     /**
91     * Linked list node class.
92     */
93     static class Node<E> {
94     E item;
95    
96     /**
97     * One of:
98     * - the real successor Node
99     * - this Node, meaning the successor is head.next
100     * - null, meaning there is no successor (this is the last node)
101     */
102     Node<E> next;
103    
104     Node(E x) { item = x; }
105     }
106    
107     /** The capacity bound, or Integer.MAX_VALUE if none */
108     private final int capacity;
109    
110     /** Current number of elements */
111     private final AtomicInteger count = new AtomicInteger();
112    
113     /**
114     * Head of linked list.
115     * Invariant: head.item == null
116     */
117     transient Node<E> head;
118    
119     /**
120     * Tail of linked list.
121     * Invariant: last.next == null
122     */
123     private transient Node<E> last;
124    
125     /** Lock held by take, poll, etc */
126     private final ReentrantLock takeLock = new ReentrantLock();
127    
128     /** Wait queue for waiting takes */
129     private final Condition notEmpty = takeLock.newCondition();
130    
131     /** Lock held by put, offer, etc */
132     private final ReentrantLock putLock = new ReentrantLock();
133    
134     /** Wait queue for waiting puts */
135     private final Condition notFull = putLock.newCondition();
136    
137     /**
138     * Signals a waiting take. Called only from put/offer (which do not
139     * otherwise ordinarily lock takeLock.)
140     */
141     private void signalNotEmpty() {
142     final ReentrantLock takeLock = this.takeLock;
143     takeLock.lock();
144     try {
145     notEmpty.signal();
146     } finally {
147     takeLock.unlock();
148     }
149     }
150    
151     /**
152     * Signals a waiting put. Called only from take/poll.
153     */
154     private void signalNotFull() {
155     final ReentrantLock putLock = this.putLock;
156     putLock.lock();
157     try {
158     notFull.signal();
159     } finally {
160     putLock.unlock();
161     }
162     }
163    
164     /**
165     * Links node at end of queue.
166     *
167     * @param node the node
168     */
169     private void enqueue(Node<E> node) {
170     // assert putLock.isHeldByCurrentThread();
171     // assert last.next == null;
172     last = last.next = node;
173     }
174    
175     /**
176     * Removes a node from head of queue.
177     *
178     * @return the node
179     */
180     private E dequeue() {
181     // assert takeLock.isHeldByCurrentThread();
182     // assert head.item == null;
183     Node<E> h = head;
184     Node<E> first = h.next;
185     h.next = h; // help GC
186     head = first;
187     E x = first.item;
188     first.item = null;
189     return x;
190     }
191    
192     /**
193     * Locks to prevent both puts and takes.
194     */
195     void fullyLock() {
196     putLock.lock();
197     takeLock.lock();
198     }
199    
200     /**
201     * Unlocks to allow both puts and takes.
202     */
203     void fullyUnlock() {
204     takeLock.unlock();
205     putLock.unlock();
206     }
207    
208     // /**
209     // * Tells whether both locks are held by current thread.
210     // */
211     // boolean isFullyLocked() {
212     // return (putLock.isHeldByCurrentThread() &&
213     // takeLock.isHeldByCurrentThread());
214     // }
215    
216     /**
217     * Creates a {@code LinkedBlockingQueue} with a capacity of
218     * {@link Integer#MAX_VALUE}.
219     */
220     public LinkedBlockingQueue() {
221     this(Integer.MAX_VALUE);
222     }
223    
224     /**
225     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
226     *
227     * @param capacity the capacity of this queue
228     * @throws IllegalArgumentException if {@code capacity} is not greater
229     * than zero
230     */
231     public LinkedBlockingQueue(int capacity) {
232     if (capacity <= 0) throw new IllegalArgumentException();
233     this.capacity = capacity;
234     last = head = new Node<E>(null);
235     }
236    
237     /**
238     * Creates a {@code LinkedBlockingQueue} with a capacity of
239     * {@link Integer#MAX_VALUE}, initially containing the elements of the
240     * given collection,
241     * added in traversal order of the collection's iterator.
242     *
243     * @param c the collection of elements to initially contain
244     * @throws NullPointerException if the specified collection or any
245     * of its elements are null
246     */
247     public LinkedBlockingQueue(Collection<? extends E> c) {
248     this(Integer.MAX_VALUE);
249     final ReentrantLock putLock = this.putLock;
250     putLock.lock(); // Never contended, but necessary for visibility
251     try {
252     int n = 0;
253     for (E e : c) {
254     if (e == null)
255     throw new NullPointerException();
256     if (n == capacity)
257     throw new IllegalStateException("Queue full");
258     enqueue(new Node<E>(e));
259     ++n;
260     }
261     count.set(n);
262     } finally {
263     putLock.unlock();
264     }
265     }
266    
267     // this doc comment is overridden to remove the reference to collections
268     // greater in size than Integer.MAX_VALUE
269     /**
270     * Returns the number of elements in this queue.
271     *
272     * @return the number of elements in this queue
273     */
274     public int size() {
275     return count.get();
276     }
277    
278     // this doc comment is a modified copy of the inherited doc comment,
279     // without the reference to unlimited queues.
280     /**
281     * Returns the number of additional elements that this queue can ideally
282     * (in the absence of memory or resource constraints) accept without
283     * blocking. This is always equal to the initial capacity of this queue
284     * less the current {@code size} of this queue.
285     *
286     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
287     * an element will succeed by inspecting {@code remainingCapacity}
288     * because it may be the case that another thread is about to
289     * insert or remove an element.
290     */
291     public int remainingCapacity() {
292     return capacity - count.get();
293     }
294    
295     /**
296     * Inserts the specified element at the tail of this queue, waiting if
297     * necessary for space to become available.
298     *
299     * @throws InterruptedException {@inheritDoc}
300     * @throws NullPointerException {@inheritDoc}
301     */
302     public void put(E e) throws InterruptedException {
303     if (e == null) throw new NullPointerException();
304     // Note: convention in all put/take/etc is to preset local var
305     // holding count negative to indicate failure unless set.
306     int c = -1;
307     Node<E> node = new Node<E>(e);
308     final ReentrantLock putLock = this.putLock;
309     final AtomicInteger count = this.count;
310     putLock.lockInterruptibly();
311     try {
312     /*
313     * Note that count is used in wait guard even though it is
314     * not protected by lock. This works because count can
315     * only decrease at this point (all other puts are shut
316     * out by lock), and we (or some other waiting put) are
317     * signalled if it ever changes from capacity. Similarly
318     * for all other uses of count in other wait guards.
319     */
320     while (count.get() == capacity) {
321     notFull.await();
322     }
323     enqueue(node);
324     c = count.getAndIncrement();
325     if (c + 1 < capacity)
326     notFull.signal();
327     } finally {
328     putLock.unlock();
329     }
330     if (c == 0)
331     signalNotEmpty();
332     }
333    
334     /**
335     * Inserts the specified element at the tail of this queue, waiting if
336     * necessary up to the specified wait time for space to become available.
337     *
338     * @return {@code true} if successful, or {@code false} if
339     * the specified waiting time elapses before space is available
340     * @throws InterruptedException {@inheritDoc}
341     * @throws NullPointerException {@inheritDoc}
342     */
343     public boolean offer(E e, long timeout, TimeUnit unit)
344     throws InterruptedException {
345    
346     if (e == null) throw new NullPointerException();
347     long nanos = unit.toNanos(timeout);
348     int c = -1;
349     final ReentrantLock putLock = this.putLock;
350     final AtomicInteger count = this.count;
351     putLock.lockInterruptibly();
352     try {
353     while (count.get() == capacity) {
354     if (nanos <= 0L)
355     return false;
356     nanos = notFull.awaitNanos(nanos);
357     }
358     enqueue(new Node<E>(e));
359     c = count.getAndIncrement();
360     if (c + 1 < capacity)
361     notFull.signal();
362     } finally {
363     putLock.unlock();
364     }
365     if (c == 0)
366     signalNotEmpty();
367     return true;
368     }
369    
370     /**
371     * Inserts the specified element at the tail of this queue if it is
372     * possible to do so immediately without exceeding the queue's capacity,
373     * returning {@code true} upon success and {@code false} if this queue
374     * is full.
375     * When using a capacity-restricted queue, this method is generally
376     * preferable to method {@link BlockingQueue#add add}, which can fail to
377     * insert an element only by throwing an exception.
378     *
379     * @throws NullPointerException if the specified element is null
380     */
381     public boolean offer(E e) {
382     if (e == null) throw new NullPointerException();
383     final AtomicInteger count = this.count;
384     if (count.get() == capacity)
385     return false;
386     int c = -1;
387     Node<E> node = new Node<E>(e);
388     final ReentrantLock putLock = this.putLock;
389     putLock.lock();
390     try {
391     if (count.get() < capacity) {
392     enqueue(node);
393     c = count.getAndIncrement();
394     if (c + 1 < capacity)
395     notFull.signal();
396     }
397     } finally {
398     putLock.unlock();
399     }
400     if (c == 0)
401     signalNotEmpty();
402     return c >= 0;
403     }
404    
405     public E take() throws InterruptedException {
406     E x;
407     int c = -1;
408     final AtomicInteger count = this.count;
409     final ReentrantLock takeLock = this.takeLock;
410     takeLock.lockInterruptibly();
411     try {
412     while (count.get() == 0) {
413     notEmpty.await();
414     }
415     x = dequeue();
416     c = count.getAndDecrement();
417     if (c > 1)
418     notEmpty.signal();
419     } finally {
420     takeLock.unlock();
421     }
422     if (c == capacity)
423     signalNotFull();
424     return x;
425     }
426    
427     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428     E x = null;
429     int c = -1;
430     long nanos = unit.toNanos(timeout);
431     final AtomicInteger count = this.count;
432     final ReentrantLock takeLock = this.takeLock;
433     takeLock.lockInterruptibly();
434     try {
435     while (count.get() == 0) {
436     if (nanos <= 0L)
437     return null;
438     nanos = notEmpty.awaitNanos(nanos);
439     }
440     x = dequeue();
441     c = count.getAndDecrement();
442     if (c > 1)
443     notEmpty.signal();
444     } finally {
445     takeLock.unlock();
446     }
447     if (c == capacity)
448     signalNotFull();
449     return x;
450     }
451    
452     public E poll() {
453     final AtomicInteger count = this.count;
454     if (count.get() == 0)
455     return null;
456     E x = null;
457     int c = -1;
458     final ReentrantLock takeLock = this.takeLock;
459     takeLock.lock();
460     try {
461     if (count.get() > 0) {
462     x = dequeue();
463     c = count.getAndDecrement();
464     if (c > 1)
465     notEmpty.signal();
466     }
467     } finally {
468     takeLock.unlock();
469     }
470     if (c == capacity)
471     signalNotFull();
472     return x;
473     }
474    
475     public E peek() {
476     if (count.get() == 0)
477     return null;
478     final ReentrantLock takeLock = this.takeLock;
479     takeLock.lock();
480     try {
481     return (count.get() > 0) ? head.next.item : null;
482     } finally {
483     takeLock.unlock();
484     }
485     }
486    
487     /**
488     * Unlinks interior Node p with predecessor trail.
489     */
490     void unlink(Node<E> p, Node<E> trail) {
491     // assert isFullyLocked();
492     // p.next is not changed, to allow iterators that are
493     // traversing p to maintain their weak-consistency guarantee.
494     p.item = null;
495     trail.next = p.next;
496     if (last == p)
497     last = trail;
498     if (count.getAndDecrement() == capacity)
499     notFull.signal();
500     }
501    
502     /**
503     * Removes a single instance of the specified element from this queue,
504     * if it is present. More formally, removes an element {@code e} such
505     * that {@code o.equals(e)}, if this queue contains one or more such
506     * elements.
507     * Returns {@code true} if this queue contained the specified element
508     * (or equivalently, if this queue changed as a result of the call).
509     *
510     * @param o element to be removed from this queue, if present
511     * @return {@code true} if this queue changed as a result of the call
512     */
513     public boolean remove(Object o) {
514     if (o == null) return false;
515     fullyLock();
516     try {
517     for (Node<E> trail = head, p = trail.next;
518     p != null;
519     trail = p, p = p.next) {
520     if (o.equals(p.item)) {
521     unlink(p, trail);
522     return true;
523     }
524     }
525     return false;
526     } finally {
527     fullyUnlock();
528     }
529     }
530    
531     /**
532     * Returns {@code true} if this queue contains the specified element.
533     * More formally, returns {@code true} if and only if this queue contains
534     * at least one element {@code e} such that {@code o.equals(e)}.
535     *
536     * @param o object to be checked for containment in this queue
537     * @return {@code true} if this queue contains the specified element
538     */
539     public boolean contains(Object o) {
540     if (o == null) return false;
541     fullyLock();
542     try {
543     for (Node<E> p = head.next; p != null; p = p.next)
544     if (o.equals(p.item))
545     return true;
546     return false;
547     } finally {
548     fullyUnlock();
549     }
550     }
551    
552     /**
553     * Returns an array containing all of the elements in this queue, in
554     * proper sequence.
555     *
556     * <p>The returned array will be "safe" in that no references to it are
557     * maintained by this queue. (In other words, this method must allocate
558     * a new array). The caller is thus free to modify the returned array.
559     *
560     * <p>This method acts as bridge between array-based and collection-based
561     * APIs.
562     *
563     * @return an array containing all of the elements in this queue
564     */
565     public Object[] toArray() {
566     fullyLock();
567     try {
568     int size = count.get();
569     Object[] a = new Object[size];
570     int k = 0;
571     for (Node<E> p = head.next; p != null; p = p.next)
572     a[k++] = p.item;
573     return a;
574     } finally {
575     fullyUnlock();
576     }
577     }
578    
579     /**
580     * Returns an array containing all of the elements in this queue, in
581     * proper sequence; the runtime type of the returned array is that of
582     * the specified array. If the queue fits in the specified array, it
583     * is returned therein. Otherwise, a new array is allocated with the
584     * runtime type of the specified array and the size of this queue.
585     *
586     * <p>If this queue fits in the specified array with room to spare
587     * (i.e., the array has more elements than this queue), the element in
588     * the array immediately following the end of the queue is set to
589     * {@code null}.
590     *
591     * <p>Like the {@link #toArray()} method, this method acts as bridge between
592     * array-based and collection-based APIs. Further, this method allows
593     * precise control over the runtime type of the output array, and may,
594     * under certain circumstances, be used to save allocation costs.
595     *
596     * <p>Suppose {@code x} is a queue known to contain only strings.
597     * The following code can be used to dump the queue into a newly
598     * allocated array of {@code String}:
599     *
600     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
601     *
602     * Note that {@code toArray(new Object[0])} is identical in function to
603     * {@code toArray()}.
604     *
605     * @param a the array into which the elements of the queue are to
606     * be stored, if it is big enough; otherwise, a new array of the
607     * same runtime type is allocated for this purpose
608     * @return an array containing all of the elements in this queue
609     * @throws ArrayStoreException if the runtime type of the specified array
610     * is not a supertype of the runtime type of every element in
611     * this queue
612     * @throws NullPointerException if the specified array is null
613     */
614     @SuppressWarnings("unchecked")
615     public <T> T[] toArray(T[] a) {
616     fullyLock();
617     try {
618     int size = count.get();
619     if (a.length < size)
620     a = (T[])java.lang.reflect.Array.newInstance
621     (a.getClass().getComponentType(), size);
622    
623     int k = 0;
624     for (Node<E> p = head.next; p != null; p = p.next)
625     a[k++] = (T)p.item;
626     if (a.length > k)
627     a[k] = null;
628     return a;
629     } finally {
630     fullyUnlock();
631     }
632     }
633    
634     public String toString() {
635     return Helpers.collectionToString(this);
636     }
637    
638     /**
639     * Atomically removes all of the elements from this queue.
640     * The queue will be empty after this call returns.
641     */
642     public void clear() {
643     fullyLock();
644     try {
645     for (Node<E> p, h = head; (p = h.next) != null; h = p) {
646     h.next = h;
647     p.item = null;
648     }
649     head = last;
650     // assert head.item == null && head.next == null;
651     if (count.getAndSet(0) == capacity)
652     notFull.signal();
653     } finally {
654     fullyUnlock();
655     }
656     }
657    
658     /**
659     * @throws UnsupportedOperationException {@inheritDoc}
660     * @throws ClassCastException {@inheritDoc}
661     * @throws NullPointerException {@inheritDoc}
662     * @throws IllegalArgumentException {@inheritDoc}
663     */
664     public int drainTo(Collection<? super E> c) {
665     return drainTo(c, Integer.MAX_VALUE);
666     }
667    
668     /**
669     * @throws UnsupportedOperationException {@inheritDoc}
670     * @throws ClassCastException {@inheritDoc}
671     * @throws NullPointerException {@inheritDoc}
672     * @throws IllegalArgumentException {@inheritDoc}
673     */
674     public int drainTo(Collection<? super E> c, int maxElements) {
675     if (c == null)
676     throw new NullPointerException();
677     if (c == this)
678     throw new IllegalArgumentException();
679     if (maxElements <= 0)
680     return 0;
681     boolean signalNotFull = false;
682     final ReentrantLock takeLock = this.takeLock;
683     takeLock.lock();
684     try {
685     int n = Math.min(maxElements, count.get());
686     // count.get provides visibility to first n Nodes
687     Node<E> h = head;
688     int i = 0;
689     try {
690     while (i < n) {
691     Node<E> p = h.next;
692     c.add(p.item);
693     p.item = null;
694     h.next = h;
695     h = p;
696     ++i;
697     }
698     return n;
699     } finally {
700     // Restore invariants even if c.add() threw
701     if (i > 0) {
702     // assert h.item == null;
703     head = h;
704     signalNotFull = (count.getAndAdd(-i) == capacity);
705     }
706     }
707     } finally {
708     takeLock.unlock();
709     if (signalNotFull)
710     signalNotFull();
711     }
712     }
713    
714     /**
715     * Returns an iterator over the elements in this queue in proper sequence.
716     * The elements will be returned in order from first (head) to last (tail).
717     *
718     * <p>The returned iterator is
719     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
720     *
721     * @return an iterator over the elements in this queue in proper sequence
722     */
723     public Iterator<E> iterator() {
724     return new Itr();
725     }
726    
727     private class Itr implements Iterator<E> {
728     /*
729     * Basic weakly-consistent iterator. At all times hold the next
730     * item to hand out so that if hasNext() reports true, we will
731     * still have it to return even if lost race with a take etc.
732     */
733    
734     private Node<E> current;
735     private Node<E> lastRet;
736     private E currentElement;
737    
738     Itr() {
739     fullyLock();
740     try {
741     current = head.next;
742     if (current != null)
743     currentElement = current.item;
744     } finally {
745     fullyUnlock();
746     }
747     }
748    
749     public boolean hasNext() {
750     return current != null;
751     }
752    
753     public E next() {
754     fullyLock();
755     try {
756     if (current == null)
757     throw new NoSuchElementException();
758     lastRet = current;
759     E item = null;
760     // Unlike other traversal methods, iterators must handle both:
761     // - dequeued nodes (p.next == p)
762     // - (possibly multiple) interior removed nodes (p.item == null)
763     for (Node<E> p = current, q;; p = q) {
764     if ((q = p.next) == p)
765     q = head.next;
766     if (q == null || (item = q.item) != null) {
767     current = q;
768     E x = currentElement;
769     currentElement = item;
770     return x;
771     }
772     }
773     } finally {
774     fullyUnlock();
775     }
776     }
777    
778     public void remove() {
779     if (lastRet == null)
780     throw new IllegalStateException();
781     fullyLock();
782     try {
783     Node<E> node = lastRet;
784     lastRet = null;
785     for (Node<E> trail = head, p = trail.next;
786     p != null;
787     trail = p, p = p.next) {
788     if (p == node) {
789     unlink(p, trail);
790     break;
791     }
792     }
793     } finally {
794     fullyUnlock();
795     }
796     }
797     }
798    
799     /** A customized variant of Spliterators.IteratorSpliterator */
800     static final class LBQSpliterator<E> implements Spliterator<E> {
801     static final int MAX_BATCH = 1 << 25; // max batch array size;
802     final LinkedBlockingQueue<E> queue;
803     Node<E> current; // current node; null until initialized
804     int batch; // batch size for splits
805     boolean exhausted; // true when no more nodes
806     long est; // size estimate
807     LBQSpliterator(LinkedBlockingQueue<E> queue) {
808     this.queue = queue;
809     this.est = queue.size();
810     }
811    
812     public long estimateSize() { return est; }
813    
814     public Spliterator<E> trySplit() {
815     Node<E> h;
816     final LinkedBlockingQueue<E> q = this.queue;
817     int b = batch;
818     int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
819     if (!exhausted &&
820     ((h = current) != null || (h = q.head.next) != null) &&
821     h.next != null) {
822     Object[] a = new Object[n];
823     int i = 0;
824     Node<E> p = current;
825     q.fullyLock();
826     try {
827     if (p != null || (p = q.head.next) != null) {
828     do {
829     if ((a[i] = p.item) != null)
830     ++i;
831     } while ((p = p.next) != null && i < n);
832     }
833     } finally {
834     q.fullyUnlock();
835     }
836     if ((current = p) == null) {
837     est = 0L;
838     exhausted = true;
839     }
840     else if ((est -= i) < 0L)
841     est = 0L;
842     if (i > 0) {
843     batch = i;
844     return Spliterators.spliterator
845     (a, 0, i, (Spliterator.ORDERED |
846     Spliterator.NONNULL |
847     Spliterator.CONCURRENT));
848     }
849     }
850     return null;
851     }
852    
853     public void forEachRemaining(Consumer<? super E> action) {
854     if (action == null) throw new NullPointerException();
855     final LinkedBlockingQueue<E> q = this.queue;
856     if (!exhausted) {
857     exhausted = true;
858     Node<E> p = current;
859     do {
860     E e = null;
861     q.fullyLock();
862     try {
863     if (p == null)
864     p = q.head.next;
865     while (p != null) {
866     e = p.item;
867     p = p.next;
868     if (e != null)
869     break;
870     }
871     } finally {
872     q.fullyUnlock();
873     }
874     if (e != null)
875     action.accept(e);
876     } while (p != null);
877     }
878     }
879    
880     public boolean tryAdvance(Consumer<? super E> action) {
881     if (action == null) throw new NullPointerException();
882     final LinkedBlockingQueue<E> q = this.queue;
883     if (!exhausted) {
884     E e = null;
885     q.fullyLock();
886     try {
887     if (current == null)
888     current = q.head.next;
889     while (current != null) {
890     e = current.item;
891     current = current.next;
892     if (e != null)
893     break;
894     }
895     } finally {
896     q.fullyUnlock();
897     }
898     if (current == null)
899     exhausted = true;
900     if (e != null) {
901     action.accept(e);
902     return true;
903     }
904     }
905     return false;
906     }
907    
908     public int characteristics() {
909     return Spliterator.ORDERED | Spliterator.NONNULL |
910     Spliterator.CONCURRENT;
911     }
912     }
913    
914     /**
915     * Returns a {@link Spliterator} over the elements in this queue.
916     *
917     * <p>The returned spliterator is
918     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
919     *
920     * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
921     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
922     *
923     * @implNote
924     * The {@code Spliterator} implements {@code trySplit} to permit limited
925     * parallelism.
926     *
927     * @return a {@code Spliterator} over the elements in this queue
928     * @since 1.8
929     */
930     public Spliterator<E> spliterator() {
931     return new LBQSpliterator<E>(this);
932     }
933    
934     /**
935     * Saves this queue to a stream (that is, serializes it).
936     *
937     * @param s the stream
938     * @throws java.io.IOException if an I/O error occurs
939     * @serialData The capacity is emitted (int), followed by all of
940     * its elements (each an {@code Object}) in the proper order,
941     * followed by a null
942     */
943     private void writeObject(java.io.ObjectOutputStream s)
944     throws java.io.IOException {
945    
946     fullyLock();
947     try {
948     // Write out any hidden stuff, plus capacity
949     s.defaultWriteObject();
950    
951     // Write out all elements in the proper order.
952     for (Node<E> p = head.next; p != null; p = p.next)
953     s.writeObject(p.item);
954    
955     // Use trailing null as sentinel
956     s.writeObject(null);
957     } finally {
958     fullyUnlock();
959     }
960     }
961    
962     /**
963     * Reconstitutes this queue from a stream (that is, deserializes it).
964     * @param s the stream
965     * @throws ClassNotFoundException if the class of a serialized object
966     * could not be found
967     * @throws java.io.IOException if an I/O error occurs
968     */
969     private void readObject(java.io.ObjectInputStream s)
970     throws java.io.IOException, ClassNotFoundException {
971     // Read in capacity, and any hidden stuff
972     s.defaultReadObject();
973    
974     count.set(0);
975     last = head = new Node<E>(null);
976    
977     // Read in all elements and place in queue
978     for (;;) {
979     @SuppressWarnings("unchecked")
980     E item = (E)s.readObject();
981     if (item == null)
982     break;
983     add(item);
984     }
985     }
986     }