ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.2
Committed: Sat Dec 17 21:56:54 2016 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +145 -92 lines
Log Message:
sync with src/main

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.AbstractQueue;
10     import java.util.Collection;
11     import java.util.Iterator;
12     import java.util.NoSuchElementException;
13 jsr166 1.2 import java.util.Objects;
14 jsr166 1.1 import java.util.Spliterator;
15     import java.util.Spliterators;
16     import java.util.concurrent.atomic.AtomicInteger;
17     import java.util.concurrent.locks.Condition;
18     import java.util.concurrent.locks.ReentrantLock;
19     import java.util.function.Consumer;
20    
21     /**
22     * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
23     * linked nodes.
24     * This queue orders elements FIFO (first-in-first-out).
25     * The <em>head</em> of the queue is that element that has been on the
26     * queue the longest time.
27     * The <em>tail</em> of the queue is that element that has been on the
28     * queue the shortest time. New elements
29     * are inserted at the tail of the queue, and the queue retrieval
30     * operations obtain elements at the head of the queue.
31     * Linked queues typically have higher throughput than array-based queues but
32     * less predictable performance in most concurrent applications.
33     *
34     * <p>The optional capacity bound constructor argument serves as a
35     * way to prevent excessive queue expansion. The capacity, if unspecified,
36     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
37     * dynamically created upon each insertion unless this would bring the
38     * queue above capacity.
39     *
40     * <p>This class and its iterator implement all of the
41     * <em>optional</em> methods of the {@link Collection} and {@link
42     * Iterator} interfaces.
43     *
44     * <p>This class is a member of the
45     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
46     * Java Collections Framework</a>.
47     *
48     * @since 1.5
49     * @author Doug Lea
50     * @param <E> the type of elements held in this queue
51     */
52     public class LinkedBlockingQueue<E> extends AbstractQueue<E>
53     implements BlockingQueue<E>, java.io.Serializable {
54     private static final long serialVersionUID = -6903933977591709194L;
55    
56     /*
57     * A variant of the "two lock queue" algorithm. The putLock gates
58     * entry to put (and offer), and has an associated condition for
59     * waiting puts. Similarly for the takeLock. The "count" field
60     * that they both rely on is maintained as an atomic to avoid
61     * needing to get both locks in most cases. Also, to minimize need
62     * for puts to get takeLock and vice-versa, cascading notifies are
63     * used. When a put notices that it has enabled at least one take,
64     * it signals taker. That taker in turn signals others if more
65     * items have been entered since the signal. And symmetrically for
66     * takes signalling puts. Operations such as remove(Object) and
67     * iterators acquire both locks.
68     *
69     * Visibility between writers and readers is provided as follows:
70     *
71     * Whenever an element is enqueued, the putLock is acquired and
72     * count updated. A subsequent reader guarantees visibility to the
73     * enqueued Node by either acquiring the putLock (via fullyLock)
74     * or by acquiring the takeLock, and then reading n = count.get();
75     * this gives visibility to the first n items.
76     *
77     * To implement weakly consistent iterators, it appears we need to
78     * keep all Nodes GC-reachable from a predecessor dequeued Node.
79     * That would cause two problems:
80     * - allow a rogue Iterator to cause unbounded memory retention
81     * - cause cross-generational linking of old Nodes to new Nodes if
82     * a Node was tenured while live, which generational GCs have a
83     * hard time dealing with, causing repeated major collections.
84     * However, only non-deleted Nodes need to be reachable from
85     * dequeued Nodes, and reachability does not necessarily have to
86     * be of the kind understood by the GC. We use the trick of
87     * linking a Node that has just been dequeued to itself. Such a
88     * self-link implicitly means to advance to head.next.
89     */
90    
91     /**
92     * Linked list node class.
93     */
94     static class Node<E> {
95     E item;
96    
97     /**
98     * One of:
99     * - the real successor Node
100     * - this Node, meaning the successor is head.next
101     * - null, meaning there is no successor (this is the last node)
102     */
103     Node<E> next;
104    
105     Node(E x) { item = x; }
106     }
107    
108     /** The capacity bound, or Integer.MAX_VALUE if none */
109     private final int capacity;
110    
111     /** Current number of elements */
112     private final AtomicInteger count = new AtomicInteger();
113    
114     /**
115     * Head of linked list.
116     * Invariant: head.item == null
117     */
118     transient Node<E> head;
119    
120     /**
121     * Tail of linked list.
122     * Invariant: last.next == null
123     */
124     private transient Node<E> last;
125    
126     /** Lock held by take, poll, etc */
127     private final ReentrantLock takeLock = new ReentrantLock();
128    
129     /** Wait queue for waiting takes */
130     private final Condition notEmpty = takeLock.newCondition();
131    
132     /** Lock held by put, offer, etc */
133     private final ReentrantLock putLock = new ReentrantLock();
134    
135     /** Wait queue for waiting puts */
136     private final Condition notFull = putLock.newCondition();
137    
138     /**
139     * Signals a waiting take. Called only from put/offer (which do not
140     * otherwise ordinarily lock takeLock.)
141     */
142     private void signalNotEmpty() {
143     final ReentrantLock takeLock = this.takeLock;
144     takeLock.lock();
145     try {
146     notEmpty.signal();
147     } finally {
148     takeLock.unlock();
149     }
150     }
151    
152     /**
153     * Signals a waiting put. Called only from take/poll.
154     */
155     private void signalNotFull() {
156     final ReentrantLock putLock = this.putLock;
157     putLock.lock();
158     try {
159     notFull.signal();
160     } finally {
161     putLock.unlock();
162     }
163     }
164    
165     /**
166     * Links node at end of queue.
167     *
168     * @param node the node
169     */
170     private void enqueue(Node<E> node) {
171     // assert putLock.isHeldByCurrentThread();
172     // assert last.next == null;
173     last = last.next = node;
174     }
175    
176     /**
177     * Removes a node from head of queue.
178     *
179     * @return the node
180     */
181     private E dequeue() {
182     // assert takeLock.isHeldByCurrentThread();
183     // assert head.item == null;
184     Node<E> h = head;
185     Node<E> first = h.next;
186     h.next = h; // help GC
187     head = first;
188     E x = first.item;
189     first.item = null;
190     return x;
191     }
192    
193     /**
194     * Locks to prevent both puts and takes.
195     */
196     void fullyLock() {
197     putLock.lock();
198     takeLock.lock();
199     }
200    
201     /**
202     * Unlocks to allow both puts and takes.
203     */
204     void fullyUnlock() {
205     takeLock.unlock();
206     putLock.unlock();
207     }
208    
209     /**
210     * Creates a {@code LinkedBlockingQueue} with a capacity of
211     * {@link Integer#MAX_VALUE}.
212     */
213     public LinkedBlockingQueue() {
214     this(Integer.MAX_VALUE);
215     }
216    
217     /**
218     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
219     *
220     * @param capacity the capacity of this queue
221     * @throws IllegalArgumentException if {@code capacity} is not greater
222     * than zero
223     */
224     public LinkedBlockingQueue(int capacity) {
225     if (capacity <= 0) throw new IllegalArgumentException();
226     this.capacity = capacity;
227     last = head = new Node<E>(null);
228     }
229    
230     /**
231     * Creates a {@code LinkedBlockingQueue} with a capacity of
232     * {@link Integer#MAX_VALUE}, initially containing the elements of the
233     * given collection,
234     * added in traversal order of the collection's iterator.
235     *
236     * @param c the collection of elements to initially contain
237     * @throws NullPointerException if the specified collection or any
238     * of its elements are null
239     */
240     public LinkedBlockingQueue(Collection<? extends E> c) {
241     this(Integer.MAX_VALUE);
242     final ReentrantLock putLock = this.putLock;
243     putLock.lock(); // Never contended, but necessary for visibility
244     try {
245     int n = 0;
246     for (E e : c) {
247     if (e == null)
248     throw new NullPointerException();
249     if (n == capacity)
250     throw new IllegalStateException("Queue full");
251     enqueue(new Node<E>(e));
252     ++n;
253     }
254     count.set(n);
255     } finally {
256     putLock.unlock();
257     }
258     }
259    
260     // this doc comment is overridden to remove the reference to collections
261     // greater in size than Integer.MAX_VALUE
262     /**
263     * Returns the number of elements in this queue.
264     *
265     * @return the number of elements in this queue
266     */
267     public int size() {
268     return count.get();
269     }
270    
271     // this doc comment is a modified copy of the inherited doc comment,
272     // without the reference to unlimited queues.
273     /**
274     * Returns the number of additional elements that this queue can ideally
275     * (in the absence of memory or resource constraints) accept without
276     * blocking. This is always equal to the initial capacity of this queue
277     * less the current {@code size} of this queue.
278     *
279     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
280     * an element will succeed by inspecting {@code remainingCapacity}
281     * because it may be the case that another thread is about to
282     * insert or remove an element.
283     */
284     public int remainingCapacity() {
285     return capacity - count.get();
286     }
287    
288     /**
289     * Inserts the specified element at the tail of this queue, waiting if
290     * necessary for space to become available.
291     *
292     * @throws InterruptedException {@inheritDoc}
293     * @throws NullPointerException {@inheritDoc}
294     */
295     public void put(E e) throws InterruptedException {
296     if (e == null) throw new NullPointerException();
297     // Note: convention in all put/take/etc is to preset local var
298     // holding count negative to indicate failure unless set.
299     int c = -1;
300     Node<E> node = new Node<E>(e);
301     final ReentrantLock putLock = this.putLock;
302     final AtomicInteger count = this.count;
303     putLock.lockInterruptibly();
304     try {
305     /*
306     * Note that count is used in wait guard even though it is
307     * not protected by lock. This works because count can
308     * only decrease at this point (all other puts are shut
309     * out by lock), and we (or some other waiting put) are
310     * signalled if it ever changes from capacity. Similarly
311     * for all other uses of count in other wait guards.
312     */
313     while (count.get() == capacity) {
314     notFull.await();
315     }
316     enqueue(node);
317     c = count.getAndIncrement();
318     if (c + 1 < capacity)
319     notFull.signal();
320     } finally {
321     putLock.unlock();
322     }
323     if (c == 0)
324     signalNotEmpty();
325     }
326    
327     /**
328     * Inserts the specified element at the tail of this queue, waiting if
329     * necessary up to the specified wait time for space to become available.
330     *
331     * @return {@code true} if successful, or {@code false} if
332     * the specified waiting time elapses before space is available
333     * @throws InterruptedException {@inheritDoc}
334     * @throws NullPointerException {@inheritDoc}
335     */
336     public boolean offer(E e, long timeout, TimeUnit unit)
337     throws InterruptedException {
338    
339     if (e == null) throw new NullPointerException();
340     long nanos = unit.toNanos(timeout);
341     int c = -1;
342     final ReentrantLock putLock = this.putLock;
343     final AtomicInteger count = this.count;
344     putLock.lockInterruptibly();
345     try {
346     while (count.get() == capacity) {
347     if (nanos <= 0L)
348     return false;
349     nanos = notFull.awaitNanos(nanos);
350     }
351     enqueue(new Node<E>(e));
352     c = count.getAndIncrement();
353     if (c + 1 < capacity)
354     notFull.signal();
355     } finally {
356     putLock.unlock();
357     }
358     if (c == 0)
359     signalNotEmpty();
360     return true;
361     }
362    
363     /**
364     * Inserts the specified element at the tail of this queue if it is
365     * possible to do so immediately without exceeding the queue's capacity,
366     * returning {@code true} upon success and {@code false} if this queue
367     * is full.
368     * When using a capacity-restricted queue, this method is generally
369     * preferable to method {@link BlockingQueue#add add}, which can fail to
370     * insert an element only by throwing an exception.
371     *
372     * @throws NullPointerException if the specified element is null
373     */
374     public boolean offer(E e) {
375     if (e == null) throw new NullPointerException();
376     final AtomicInteger count = this.count;
377     if (count.get() == capacity)
378     return false;
379     int c = -1;
380     Node<E> node = new Node<E>(e);
381     final ReentrantLock putLock = this.putLock;
382     putLock.lock();
383     try {
384     if (count.get() < capacity) {
385     enqueue(node);
386     c = count.getAndIncrement();
387     if (c + 1 < capacity)
388     notFull.signal();
389     }
390     } finally {
391     putLock.unlock();
392     }
393     if (c == 0)
394     signalNotEmpty();
395     return c >= 0;
396     }
397    
398     public E take() throws InterruptedException {
399     E x;
400     int c = -1;
401     final AtomicInteger count = this.count;
402     final ReentrantLock takeLock = this.takeLock;
403     takeLock.lockInterruptibly();
404     try {
405     while (count.get() == 0) {
406     notEmpty.await();
407     }
408     x = dequeue();
409     c = count.getAndDecrement();
410     if (c > 1)
411     notEmpty.signal();
412     } finally {
413     takeLock.unlock();
414     }
415     if (c == capacity)
416     signalNotFull();
417     return x;
418     }
419    
420     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
421     E x = null;
422     int c = -1;
423     long nanos = unit.toNanos(timeout);
424     final AtomicInteger count = this.count;
425     final ReentrantLock takeLock = this.takeLock;
426     takeLock.lockInterruptibly();
427     try {
428     while (count.get() == 0) {
429     if (nanos <= 0L)
430     return null;
431     nanos = notEmpty.awaitNanos(nanos);
432     }
433     x = dequeue();
434     c = count.getAndDecrement();
435     if (c > 1)
436     notEmpty.signal();
437     } finally {
438     takeLock.unlock();
439     }
440     if (c == capacity)
441     signalNotFull();
442     return x;
443     }
444    
445     public E poll() {
446     final AtomicInteger count = this.count;
447     if (count.get() == 0)
448     return null;
449     E x = null;
450     int c = -1;
451     final ReentrantLock takeLock = this.takeLock;
452     takeLock.lock();
453     try {
454     if (count.get() > 0) {
455     x = dequeue();
456     c = count.getAndDecrement();
457     if (c > 1)
458     notEmpty.signal();
459     }
460     } finally {
461     takeLock.unlock();
462     }
463     if (c == capacity)
464     signalNotFull();
465     return x;
466     }
467    
468     public E peek() {
469     if (count.get() == 0)
470     return null;
471     final ReentrantLock takeLock = this.takeLock;
472     takeLock.lock();
473     try {
474     return (count.get() > 0) ? head.next.item : null;
475     } finally {
476     takeLock.unlock();
477     }
478     }
479    
480     /**
481     * Unlinks interior Node p with predecessor trail.
482     */
483     void unlink(Node<E> p, Node<E> trail) {
484 jsr166 1.2 // assert putLock.isHeldByCurrentThread();
485     // assert takeLock.isHeldByCurrentThread();
486 jsr166 1.1 // p.next is not changed, to allow iterators that are
487     // traversing p to maintain their weak-consistency guarantee.
488     p.item = null;
489     trail.next = p.next;
490     if (last == p)
491     last = trail;
492     if (count.getAndDecrement() == capacity)
493     notFull.signal();
494     }
495    
496     /**
497     * Removes a single instance of the specified element from this queue,
498     * if it is present. More formally, removes an element {@code e} such
499     * that {@code o.equals(e)}, if this queue contains one or more such
500     * elements.
501     * Returns {@code true} if this queue contained the specified element
502     * (or equivalently, if this queue changed as a result of the call).
503     *
504     * @param o element to be removed from this queue, if present
505     * @return {@code true} if this queue changed as a result of the call
506     */
507     public boolean remove(Object o) {
508     if (o == null) return false;
509     fullyLock();
510     try {
511     for (Node<E> trail = head, p = trail.next;
512     p != null;
513     trail = p, p = p.next) {
514     if (o.equals(p.item)) {
515     unlink(p, trail);
516     return true;
517     }
518     }
519     return false;
520     } finally {
521     fullyUnlock();
522     }
523     }
524    
525     /**
526     * Returns {@code true} if this queue contains the specified element.
527     * More formally, returns {@code true} if and only if this queue contains
528     * at least one element {@code e} such that {@code o.equals(e)}.
529     *
530     * @param o object to be checked for containment in this queue
531     * @return {@code true} if this queue contains the specified element
532     */
533     public boolean contains(Object o) {
534     if (o == null) return false;
535     fullyLock();
536     try {
537     for (Node<E> p = head.next; p != null; p = p.next)
538     if (o.equals(p.item))
539     return true;
540     return false;
541     } finally {
542     fullyUnlock();
543     }
544     }
545    
546     /**
547     * Returns an array containing all of the elements in this queue, in
548     * proper sequence.
549     *
550     * <p>The returned array will be "safe" in that no references to it are
551     * maintained by this queue. (In other words, this method must allocate
552     * a new array). The caller is thus free to modify the returned array.
553     *
554     * <p>This method acts as bridge between array-based and collection-based
555     * APIs.
556     *
557     * @return an array containing all of the elements in this queue
558     */
559     public Object[] toArray() {
560     fullyLock();
561     try {
562     int size = count.get();
563     Object[] a = new Object[size];
564     int k = 0;
565     for (Node<E> p = head.next; p != null; p = p.next)
566     a[k++] = p.item;
567     return a;
568     } finally {
569     fullyUnlock();
570     }
571     }
572    
573     /**
574     * Returns an array containing all of the elements in this queue, in
575     * proper sequence; the runtime type of the returned array is that of
576     * the specified array. If the queue fits in the specified array, it
577     * is returned therein. Otherwise, a new array is allocated with the
578     * runtime type of the specified array and the size of this queue.
579     *
580     * <p>If this queue fits in the specified array with room to spare
581     * (i.e., the array has more elements than this queue), the element in
582     * the array immediately following the end of the queue is set to
583     * {@code null}.
584     *
585     * <p>Like the {@link #toArray()} method, this method acts as bridge between
586     * array-based and collection-based APIs. Further, this method allows
587     * precise control over the runtime type of the output array, and may,
588     * under certain circumstances, be used to save allocation costs.
589     *
590     * <p>Suppose {@code x} is a queue known to contain only strings.
591     * The following code can be used to dump the queue into a newly
592     * allocated array of {@code String}:
593     *
594     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
595     *
596     * Note that {@code toArray(new Object[0])} is identical in function to
597     * {@code toArray()}.
598     *
599     * @param a the array into which the elements of the queue are to
600     * be stored, if it is big enough; otherwise, a new array of the
601     * same runtime type is allocated for this purpose
602     * @return an array containing all of the elements in this queue
603     * @throws ArrayStoreException if the runtime type of the specified array
604     * is not a supertype of the runtime type of every element in
605     * this queue
606     * @throws NullPointerException if the specified array is null
607     */
608     @SuppressWarnings("unchecked")
609     public <T> T[] toArray(T[] a) {
610     fullyLock();
611     try {
612     int size = count.get();
613     if (a.length < size)
614     a = (T[])java.lang.reflect.Array.newInstance
615     (a.getClass().getComponentType(), size);
616    
617     int k = 0;
618     for (Node<E> p = head.next; p != null; p = p.next)
619     a[k++] = (T)p.item;
620     if (a.length > k)
621     a[k] = null;
622     return a;
623     } finally {
624     fullyUnlock();
625     }
626     }
627    
628     public String toString() {
629     return Helpers.collectionToString(this);
630     }
631    
632     /**
633     * Atomically removes all of the elements from this queue.
634     * The queue will be empty after this call returns.
635     */
636     public void clear() {
637     fullyLock();
638     try {
639     for (Node<E> p, h = head; (p = h.next) != null; h = p) {
640     h.next = h;
641     p.item = null;
642     }
643     head = last;
644     // assert head.item == null && head.next == null;
645     if (count.getAndSet(0) == capacity)
646     notFull.signal();
647     } finally {
648     fullyUnlock();
649     }
650     }
651    
652     /**
653     * @throws UnsupportedOperationException {@inheritDoc}
654     * @throws ClassCastException {@inheritDoc}
655     * @throws NullPointerException {@inheritDoc}
656     * @throws IllegalArgumentException {@inheritDoc}
657     */
658     public int drainTo(Collection<? super E> c) {
659     return drainTo(c, Integer.MAX_VALUE);
660     }
661    
662     /**
663     * @throws UnsupportedOperationException {@inheritDoc}
664     * @throws ClassCastException {@inheritDoc}
665     * @throws NullPointerException {@inheritDoc}
666     * @throws IllegalArgumentException {@inheritDoc}
667     */
668     public int drainTo(Collection<? super E> c, int maxElements) {
669 jsr166 1.2 Objects.requireNonNull(c);
670 jsr166 1.1 if (c == this)
671     throw new IllegalArgumentException();
672     if (maxElements <= 0)
673     return 0;
674     boolean signalNotFull = false;
675     final ReentrantLock takeLock = this.takeLock;
676     takeLock.lock();
677     try {
678     int n = Math.min(maxElements, count.get());
679     // count.get provides visibility to first n Nodes
680     Node<E> h = head;
681     int i = 0;
682     try {
683     while (i < n) {
684     Node<E> p = h.next;
685     c.add(p.item);
686     p.item = null;
687     h.next = h;
688     h = p;
689     ++i;
690     }
691     return n;
692     } finally {
693     // Restore invariants even if c.add() threw
694     if (i > 0) {
695     // assert h.item == null;
696     head = h;
697     signalNotFull = (count.getAndAdd(-i) == capacity);
698     }
699     }
700     } finally {
701     takeLock.unlock();
702     if (signalNotFull)
703     signalNotFull();
704     }
705     }
706    
707     /**
708 jsr166 1.2 * Used for any element traversal that is not entirely under lock.
709     * Such traversals must handle both:
710     * - dequeued nodes (p.next == p)
711     * - (possibly multiple) interior removed nodes (p.item == null)
712     */
713     Node<E> succ(Node<E> p) {
714     return (p == (p = p.next)) ? head.next : p;
715     }
716    
717     /**
718 jsr166 1.1 * Returns an iterator over the elements in this queue in proper sequence.
719     * The elements will be returned in order from first (head) to last (tail).
720     *
721     * <p>The returned iterator is
722     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
723     *
724     * @return an iterator over the elements in this queue in proper sequence
725     */
726     public Iterator<E> iterator() {
727     return new Itr();
728     }
729    
730     private class Itr implements Iterator<E> {
731     /*
732     * Basic weakly-consistent iterator. At all times hold the next
733     * item to hand out so that if hasNext() reports true, we will
734     * still have it to return even if lost race with a take etc.
735     */
736    
737     private Node<E> current;
738     private Node<E> lastRet;
739     private E currentElement;
740    
741     Itr() {
742     fullyLock();
743     try {
744 jsr166 1.2 if ((current = head.next) != null)
745 jsr166 1.1 currentElement = current.item;
746     } finally {
747     fullyUnlock();
748     }
749     }
750    
751     public boolean hasNext() {
752     return current != null;
753     }
754    
755     public E next() {
756 jsr166 1.2 Node<E> p;
757     if ((p = current) == null)
758     throw new NoSuchElementException();
759     E ret = currentElement, e = null;
760     lastRet = p;
761 jsr166 1.1 fullyLock();
762     try {
763 jsr166 1.2 for (p = p.next; p != null; p = succ(p))
764     if ((e = p.item) != null)
765     break;
766 jsr166 1.1 } finally {
767     fullyUnlock();
768     }
769 jsr166 1.2 current = p;
770     currentElement = e;
771     return ret;
772     }
773    
774     public void forEachRemaining(Consumer<? super E> action) {
775     // A variant of forEachFrom
776     Objects.requireNonNull(action);
777     Node<E> p;
778     if ((p = current) == null) return;
779     lastRet = current;
780     current = null;
781     final int batchSize = 32;
782     Object[] es = null;
783     int n, len = 1;
784     do {
785     fullyLock();
786     try {
787     if (es == null) {
788     p = p.next;
789     for (Node<E> q = p; q != null; q = succ(q))
790     if (q.item != null && ++len == batchSize)
791     break;
792     es = new Object[len];
793     es[0] = currentElement;
794     currentElement = null;
795     n = 1;
796     } else
797     n = 0;
798     for (; p != null && n < len; p = succ(p))
799     if ((es[n] = p.item) != null) {
800     lastRet = p;
801     n++;
802     }
803     } finally {
804     fullyUnlock();
805     }
806     for (int i = 0; i < n; i++) {
807     @SuppressWarnings("unchecked") E e = (E) es[i];
808     action.accept(e);
809     }
810     } while (n > 0 && p != null);
811 jsr166 1.1 }
812    
813     public void remove() {
814     if (lastRet == null)
815     throw new IllegalStateException();
816     fullyLock();
817     try {
818     Node<E> node = lastRet;
819     lastRet = null;
820     for (Node<E> trail = head, p = trail.next;
821     p != null;
822     trail = p, p = p.next) {
823     if (p == node) {
824     unlink(p, trail);
825     break;
826     }
827     }
828     } finally {
829     fullyUnlock();
830     }
831     }
832     }
833    
834 jsr166 1.2 /**
835     * A customized variant of Spliterators.IteratorSpliterator.
836     * Keep this class in sync with (very similar) LBDSpliterator.
837     */
838     private final class LBQSpliterator implements Spliterator<E> {
839 jsr166 1.1 static final int MAX_BATCH = 1 << 25; // max batch array size;
840     Node<E> current; // current node; null until initialized
841     int batch; // batch size for splits
842     boolean exhausted; // true when no more nodes
843 jsr166 1.2 long est = size(); // size estimate
844    
845     LBQSpliterator() {}
846 jsr166 1.1
847     public long estimateSize() { return est; }
848    
849     public Spliterator<E> trySplit() {
850     Node<E> h;
851     int b = batch;
852     int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
853     if (!exhausted &&
854 jsr166 1.2 ((h = current) != null || (h = head.next) != null)
855     && h.next != null) {
856 jsr166 1.1 Object[] a = new Object[n];
857     int i = 0;
858     Node<E> p = current;
859 jsr166 1.2 fullyLock();
860 jsr166 1.1 try {
861 jsr166 1.2 if (p != null || (p = head.next) != null)
862     for (; p != null && i < n; p = succ(p))
863 jsr166 1.1 if ((a[i] = p.item) != null)
864 jsr166 1.2 i++;
865 jsr166 1.1 } finally {
866 jsr166 1.2 fullyUnlock();
867 jsr166 1.1 }
868     if ((current = p) == null) {
869     est = 0L;
870     exhausted = true;
871     }
872     else if ((est -= i) < 0L)
873     est = 0L;
874     if (i > 0) {
875     batch = i;
876     return Spliterators.spliterator
877     (a, 0, i, (Spliterator.ORDERED |
878     Spliterator.NONNULL |
879     Spliterator.CONCURRENT));
880     }
881     }
882     return null;
883     }
884    
885 jsr166 1.2 public boolean tryAdvance(Consumer<? super E> action) {
886     Objects.requireNonNull(action);
887 jsr166 1.1 if (!exhausted) {
888     Node<E> p = current;
889     E e = null;
890 jsr166 1.2 fullyLock();
891 jsr166 1.1 try {
892 jsr166 1.2 if (p != null || (p = head.next) != null)
893     do {
894     e = p.item;
895     p = succ(p);
896     } while (e == null && p != null);
897 jsr166 1.1 } finally {
898 jsr166 1.2 fullyUnlock();
899 jsr166 1.1 }
900 jsr166 1.2 exhausted = ((current = p) == null);
901 jsr166 1.1 if (e != null) {
902     action.accept(e);
903     return true;
904     }
905     }
906     return false;
907     }
908    
909 jsr166 1.2 public void forEachRemaining(Consumer<? super E> action) {
910     Objects.requireNonNull(action);
911     if (!exhausted) {
912     exhausted = true;
913     Node<E> p = current;
914     current = null;
915     forEachFrom(action, p);
916     }
917     }
918    
919 jsr166 1.1 public int characteristics() {
920 jsr166 1.2 return (Spliterator.ORDERED |
921     Spliterator.NONNULL |
922     Spliterator.CONCURRENT);
923 jsr166 1.1 }
924     }
925    
926     /**
927     * Returns a {@link Spliterator} over the elements in this queue.
928     *
929     * <p>The returned spliterator is
930     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
931     *
932     * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
933     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
934     *
935     * @implNote
936     * The {@code Spliterator} implements {@code trySplit} to permit limited
937     * parallelism.
938     *
939     * @return a {@code Spliterator} over the elements in this queue
940     * @since 1.8
941     */
942     public Spliterator<E> spliterator() {
943 jsr166 1.2 return new LBQSpliterator();
944     }
945    
946     /**
947     * @throws NullPointerException {@inheritDoc}
948     */
949     public void forEach(Consumer<? super E> action) {
950     Objects.requireNonNull(action);
951     forEachFrom(action, null);
952     }
953    
954     /**
955     * Runs action on each element found during a traversal starting at p.
956     * If p is null, traversal starts at head.
957     */
958     void forEachFrom(Consumer<? super E> action, Node<E> p) {
959     // Extract batches of elements while holding the lock; then
960     // run the action on the elements while not
961     final int batchSize = 32; // max number of elements per batch
962     Object[] es = null; // container for batch of elements
963     int n, len = 0;
964     do {
965     fullyLock();
966     try {
967     if (es == null) {
968     if (p == null) p = head.next;
969     for (Node<E> q = p; q != null; q = succ(q))
970     if (q.item != null && ++len == batchSize)
971     break;
972     es = new Object[len];
973     }
974     for (n = 0; p != null && n < len; p = succ(p))
975     if ((es[n] = p.item) != null)
976     n++;
977     } finally {
978     fullyUnlock();
979     }
980     for (int i = 0; i < n; i++) {
981     @SuppressWarnings("unchecked") E e = (E) es[i];
982     action.accept(e);
983     }
984     } while (n > 0 && p != null);
985 jsr166 1.1 }
986    
987     /**
988     * Saves this queue to a stream (that is, serializes it).
989     *
990     * @param s the stream
991     * @throws java.io.IOException if an I/O error occurs
992     * @serialData The capacity is emitted (int), followed by all of
993     * its elements (each an {@code Object}) in the proper order,
994     * followed by a null
995     */
996     private void writeObject(java.io.ObjectOutputStream s)
997     throws java.io.IOException {
998    
999     fullyLock();
1000     try {
1001     // Write out any hidden stuff, plus capacity
1002     s.defaultWriteObject();
1003    
1004     // Write out all elements in the proper order.
1005     for (Node<E> p = head.next; p != null; p = p.next)
1006     s.writeObject(p.item);
1007    
1008     // Use trailing null as sentinel
1009     s.writeObject(null);
1010     } finally {
1011     fullyUnlock();
1012     }
1013     }
1014    
1015     /**
1016     * Reconstitutes this queue from a stream (that is, deserializes it).
1017     * @param s the stream
1018     * @throws ClassNotFoundException if the class of a serialized object
1019     * could not be found
1020     * @throws java.io.IOException if an I/O error occurs
1021     */
1022     private void readObject(java.io.ObjectInputStream s)
1023     throws java.io.IOException, ClassNotFoundException {
1024     // Read in capacity, and any hidden stuff
1025     s.defaultReadObject();
1026    
1027     count.set(0);
1028     last = head = new Node<E>(null);
1029    
1030     // Read in all elements and place in queue
1031     for (;;) {
1032     @SuppressWarnings("unchecked")
1033     E item = (E)s.readObject();
1034     if (item == null)
1035     break;
1036     add(item);
1037     }
1038     }
1039     }