ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.76
Committed: Wed Mar 13 12:39:02 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.75: +3 -10 lines
Log Message:
Synch with lambda Spliterator API

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.51
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14 dl 1.74 import java.util.Collections;
15 jsr166 1.51 import java.util.Iterator;
16     import java.util.NoSuchElementException;
17 dl 1.74 import java.util.Spliterator;
18 dl 1.75 import java.util.Spliterators;
19 dl 1.74 import java.util.stream.Stream;
20     import java.util.stream.Streams;
21     import java.util.function.Consumer;
22 tim 1.1
23     /**
24 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
25 dholmes 1.8 * linked nodes.
26     * This queue orders elements FIFO (first-in-first-out).
27 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
28 dholmes 1.8 * queue the longest time.
29     * The <em>tail</em> of the queue is that element that has been on the
30 dl 1.20 * queue the shortest time. New elements
31     * are inserted at the tail of the queue, and the queue retrieval
32     * operations obtain elements at the head of the queue.
33 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
34     * less predictable performance in most concurrent applications.
35 tim 1.12 *
36 jsr166 1.70 * <p>The optional capacity bound constructor argument serves as a
37 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
38     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
39 dl 1.3 * dynamically created upon each insertion unless this would bring the
40     * queue above capacity.
41 dholmes 1.8 *
42 dl 1.36 * <p>This class and its iterator implement all of the
43     * <em>optional</em> methods of the {@link Collection} and {@link
44 dl 1.38 * Iterator} interfaces.
45 dl 1.21 *
46 dl 1.34 * <p>This class is a member of the
47 jsr166 1.48 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
48 dl 1.34 * Java Collections Framework</a>.
49     *
50 dl 1.6 * @since 1.5
51     * @author Doug Lea
52 dl 1.27 * @param <E> the type of elements held in this collection
53 jsr166 1.40 */
54 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
55 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
56 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
57 tim 1.1
58 dl 1.2 /*
59     * A variant of the "two lock queue" algorithm. The putLock gates
60     * entry to put (and offer), and has an associated condition for
61     * waiting puts. Similarly for the takeLock. The "count" field
62     * that they both rely on is maintained as an atomic to avoid
63     * needing to get both locks in most cases. Also, to minimize need
64     * for puts to get takeLock and vice-versa, cascading notifies are
65     * used. When a put notices that it has enabled at least one take,
66     * it signals taker. That taker in turn signals others if more
67     * items have been entered since the signal. And symmetrically for
68 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
69 dl 1.2 * iterators acquire both locks.
70 jsr166 1.51 *
71     * Visibility between writers and readers is provided as follows:
72     *
73     * Whenever an element is enqueued, the putLock is acquired and
74     * count updated. A subsequent reader guarantees visibility to the
75     * enqueued Node by either acquiring the putLock (via fullyLock)
76     * or by acquiring the takeLock, and then reading n = count.get();
77     * this gives visibility to the first n items.
78     *
79     * To implement weakly consistent iterators, it appears we need to
80     * keep all Nodes GC-reachable from a predecessor dequeued Node.
81     * That would cause two problems:
82     * - allow a rogue Iterator to cause unbounded memory retention
83     * - cause cross-generational linking of old Nodes to new Nodes if
84     * a Node was tenured while live, which generational GCs have a
85     * hard time dealing with, causing repeated major collections.
86     * However, only non-deleted Nodes need to be reachable from
87     * dequeued Nodes, and reachability does not necessarily have to
88     * be of the kind understood by the GC. We use the trick of
89     * linking a Node that has just been dequeued to itself. Such a
90     * self-link implicitly means to advance to head.next.
91 dl 1.38 */
92 dl 1.2
93 dl 1.6 /**
94     * Linked list node class
95     */
96 dl 1.2 static class Node<E> {
97 jsr166 1.51 E item;
98    
99     /**
100     * One of:
101     * - the real successor Node
102     * - this Node, meaning the successor is head.next
103     * - null, meaning there is no successor (this is the last node)
104     */
105 dl 1.2 Node<E> next;
106 jsr166 1.51
107 dl 1.2 Node(E x) { item = x; }
108     }
109    
110 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
111 dl 1.2 private final int capacity;
112 dl 1.6
113     /** Current number of elements */
114 jsr166 1.61 private final AtomicInteger count = new AtomicInteger();
115 dl 1.2
116 jsr166 1.51 /**
117     * Head of linked list.
118     * Invariant: head.item == null
119     */
120 jsr166 1.64 transient Node<E> head;
121 dl 1.6
122 jsr166 1.51 /**
123     * Tail of linked list.
124     * Invariant: last.next == null
125     */
126 dl 1.6 private transient Node<E> last;
127 dl 1.2
128 dl 1.6 /** Lock held by take, poll, etc */
129 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
130 dl 1.6
131     /** Wait queue for waiting takes */
132 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
133 dl 1.2
134 dl 1.6 /** Lock held by put, offer, etc */
135 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
136 dl 1.6
137     /** Wait queue for waiting puts */
138 dl 1.32 private final Condition notFull = putLock.newCondition();
139 dl 1.2
140     /**
141 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
142 dl 1.4 * otherwise ordinarily lock takeLock.)
143 dl 1.2 */
144     private void signalNotEmpty() {
145 dl 1.31 final ReentrantLock takeLock = this.takeLock;
146 dl 1.2 takeLock.lock();
147     try {
148     notEmpty.signal();
149 tim 1.17 } finally {
150 dl 1.2 takeLock.unlock();
151     }
152     }
153    
154     /**
155 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
156 dl 1.2 */
157     private void signalNotFull() {
158 dl 1.31 final ReentrantLock putLock = this.putLock;
159 dl 1.2 putLock.lock();
160     try {
161     notFull.signal();
162 tim 1.17 } finally {
163 dl 1.2 putLock.unlock();
164     }
165     }
166    
167     /**
168 dl 1.54 * Links node at end of queue.
169 jsr166 1.51 *
170 dl 1.54 * @param node the node
171 dl 1.2 */
172 dl 1.54 private void enqueue(Node<E> node) {
173 jsr166 1.51 // assert putLock.isHeldByCurrentThread();
174     // assert last.next == null;
175 dl 1.54 last = last.next = node;
176 dl 1.2 }
177    
178     /**
179 jsr166 1.51 * Removes a node from head of queue.
180     *
181 dl 1.6 * @return the node
182 dl 1.2 */
183 jsr166 1.51 private E dequeue() {
184     // assert takeLock.isHeldByCurrentThread();
185     // assert head.item == null;
186 dl 1.50 Node<E> h = head;
187     Node<E> first = h.next;
188 jsr166 1.51 h.next = h; // help GC
189 dl 1.2 head = first;
190 dl 1.28 E x = first.item;
191 dl 1.2 first.item = null;
192     return x;
193     }
194    
195     /**
196 jsr166 1.71 * Locks to prevent both puts and takes.
197 dl 1.2 */
198 jsr166 1.51 void fullyLock() {
199 dl 1.2 putLock.lock();
200     takeLock.lock();
201 tim 1.1 }
202 dl 1.2
203     /**
204 jsr166 1.71 * Unlocks to allow both puts and takes.
205 dl 1.2 */
206 jsr166 1.51 void fullyUnlock() {
207 dl 1.2 takeLock.unlock();
208     putLock.unlock();
209     }
210    
211 jsr166 1.51 // /**
212     // * Tells whether both locks are held by current thread.
213     // */
214     // boolean isFullyLocked() {
215     // return (putLock.isHeldByCurrentThread() &&
216     // takeLock.isHeldByCurrentThread());
217     // }
218 dl 1.2
219     /**
220 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
221 dholmes 1.8 * {@link Integer#MAX_VALUE}.
222 dl 1.2 */
223     public LinkedBlockingQueue() {
224     this(Integer.MAX_VALUE);
225     }
226    
227     /**
228 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
229 tim 1.16 *
230 jsr166 1.43 * @param capacity the capacity of this queue
231 jsr166 1.51 * @throws IllegalArgumentException if {@code capacity} is not greater
232 jsr166 1.43 * than zero
233 dl 1.2 */
234     public LinkedBlockingQueue(int capacity) {
235 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
236 dl 1.2 this.capacity = capacity;
237 dl 1.6 last = head = new Node<E>(null);
238 dl 1.2 }
239    
240     /**
241 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
242 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
243 tim 1.12 * given collection,
244 dholmes 1.8 * added in traversal order of the collection's iterator.
245 jsr166 1.43 *
246 dholmes 1.9 * @param c the collection of elements to initially contain
247 jsr166 1.43 * @throws NullPointerException if the specified collection or any
248     * of its elements are null
249 dl 1.2 */
250 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
251 dl 1.2 this(Integer.MAX_VALUE);
252 jsr166 1.51 final ReentrantLock putLock = this.putLock;
253     putLock.lock(); // Never contended, but necessary for visibility
254     try {
255     int n = 0;
256     for (E e : c) {
257     if (e == null)
258     throw new NullPointerException();
259     if (n == capacity)
260     throw new IllegalStateException("Queue full");
261 dl 1.54 enqueue(new Node<E>(e));
262 jsr166 1.51 ++n;
263     }
264     count.set(n);
265     } finally {
266     putLock.unlock();
267     }
268 dl 1.2 }
269    
270 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
271     // greater in size than Integer.MAX_VALUE
272 tim 1.12 /**
273 dl 1.20 * Returns the number of elements in this queue.
274     *
275 jsr166 1.43 * @return the number of elements in this queue
276 dholmes 1.8 */
277 dl 1.2 public int size() {
278     return count.get();
279 tim 1.1 }
280 dl 1.2
281 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
282     // without the reference to unlimited queues.
283 tim 1.12 /**
284 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
285     * (in the absence of memory or resource constraints) accept without
286 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
287 jsr166 1.51 * less the current {@code size} of this queue.
288 jsr166 1.41 *
289     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
290 jsr166 1.51 * an element will succeed by inspecting {@code remainingCapacity}
291 jsr166 1.41 * because it may be the case that another thread is about to
292 jsr166 1.43 * insert or remove an element.
293 dholmes 1.8 */
294 dl 1.2 public int remainingCapacity() {
295     return capacity - count.get();
296     }
297    
298 dholmes 1.22 /**
299 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
300 dholmes 1.22 * necessary for space to become available.
301 jsr166 1.43 *
302     * @throws InterruptedException {@inheritDoc}
303     * @throws NullPointerException {@inheritDoc}
304 dholmes 1.22 */
305 jsr166 1.42 public void put(E e) throws InterruptedException {
306     if (e == null) throw new NullPointerException();
307 jsr166 1.51 // Note: convention in all put/take/etc is to preset local var
308     // holding count negative to indicate failure unless set.
309 tim 1.12 int c = -1;
310 jsr166 1.60 Node<E> node = new Node<E>(e);
311 dl 1.31 final ReentrantLock putLock = this.putLock;
312     final AtomicInteger count = this.count;
313 dl 1.2 putLock.lockInterruptibly();
314     try {
315     /*
316     * Note that count is used in wait guard even though it is
317     * not protected by lock. This works because count can
318     * only decrease at this point (all other puts are shut
319     * out by lock), and we (or some other waiting put) are
320 jsr166 1.51 * signalled if it ever changes from capacity. Similarly
321     * for all other uses of count in other wait guards.
322 dl 1.2 */
323 jsr166 1.51 while (count.get() == capacity) {
324     notFull.await();
325 dl 1.2 }
326 dl 1.54 enqueue(node);
327 dl 1.2 c = count.getAndIncrement();
328 dl 1.6 if (c + 1 < capacity)
329 dl 1.2 notFull.signal();
330 tim 1.17 } finally {
331 dl 1.2 putLock.unlock();
332     }
333 tim 1.12 if (c == 0)
334 dl 1.2 signalNotEmpty();
335 tim 1.1 }
336 dl 1.2
337 dholmes 1.22 /**
338     * Inserts the specified element at the tail of this queue, waiting if
339     * necessary up to the specified wait time for space to become available.
340 jsr166 1.43 *
341 jsr166 1.51 * @return {@code true} if successful, or {@code false} if
342 jsr166 1.73 * the specified waiting time elapses before space is available
343 jsr166 1.43 * @throws InterruptedException {@inheritDoc}
344     * @throws NullPointerException {@inheritDoc}
345 dholmes 1.22 */
346 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
347 dholmes 1.8 throws InterruptedException {
348 tim 1.12
349 jsr166 1.42 if (e == null) throw new NullPointerException();
350 dl 1.2 long nanos = unit.toNanos(timeout);
351     int c = -1;
352 dl 1.31 final ReentrantLock putLock = this.putLock;
353     final AtomicInteger count = this.count;
354 dholmes 1.8 putLock.lockInterruptibly();
355 dl 1.2 try {
356 jsr166 1.51 while (count.get() == capacity) {
357 dl 1.2 if (nanos <= 0)
358     return false;
359 jsr166 1.51 nanos = notFull.awaitNanos(nanos);
360 dl 1.2 }
361 dl 1.54 enqueue(new Node<E>(e));
362 jsr166 1.51 c = count.getAndIncrement();
363     if (c + 1 < capacity)
364     notFull.signal();
365 tim 1.17 } finally {
366 dl 1.2 putLock.unlock();
367     }
368 tim 1.12 if (c == 0)
369 dl 1.2 signalNotEmpty();
370     return true;
371 tim 1.1 }
372 dl 1.2
373 dl 1.23 /**
374 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
375     * possible to do so immediately without exceeding the queue's capacity,
376 jsr166 1.51 * returning {@code true} upon success and {@code false} if this queue
377 jsr166 1.44 * is full.
378     * When using a capacity-restricted queue, this method is generally
379     * preferable to method {@link BlockingQueue#add add}, which can fail to
380     * insert an element only by throwing an exception.
381 dl 1.23 *
382 jsr166 1.43 * @throws NullPointerException if the specified element is null
383 dl 1.23 */
384 jsr166 1.42 public boolean offer(E e) {
385     if (e == null) throw new NullPointerException();
386 dl 1.31 final AtomicInteger count = this.count;
387 dl 1.2 if (count.get() == capacity)
388     return false;
389 tim 1.12 int c = -1;
390 jsr166 1.60 Node<E> node = new Node<E>(e);
391 dl 1.31 final ReentrantLock putLock = this.putLock;
392 dholmes 1.8 putLock.lock();
393 dl 1.2 try {
394     if (count.get() < capacity) {
395 dl 1.54 enqueue(node);
396 dl 1.2 c = count.getAndIncrement();
397 dl 1.6 if (c + 1 < capacity)
398 dl 1.2 notFull.signal();
399     }
400 tim 1.17 } finally {
401 dl 1.2 putLock.unlock();
402     }
403 tim 1.12 if (c == 0)
404 dl 1.2 signalNotEmpty();
405     return c >= 0;
406 tim 1.1 }
407 dl 1.2
408     public E take() throws InterruptedException {
409     E x;
410     int c = -1;
411 dl 1.31 final AtomicInteger count = this.count;
412     final ReentrantLock takeLock = this.takeLock;
413 dl 1.2 takeLock.lockInterruptibly();
414     try {
415 jsr166 1.51 while (count.get() == 0) {
416     notEmpty.await();
417 dl 1.2 }
418 jsr166 1.51 x = dequeue();
419 dl 1.2 c = count.getAndDecrement();
420     if (c > 1)
421     notEmpty.signal();
422 tim 1.17 } finally {
423 dl 1.2 takeLock.unlock();
424     }
425 tim 1.12 if (c == capacity)
426 dl 1.2 signalNotFull();
427     return x;
428     }
429    
430     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
431     E x = null;
432     int c = -1;
433 dholmes 1.8 long nanos = unit.toNanos(timeout);
434 dl 1.31 final AtomicInteger count = this.count;
435     final ReentrantLock takeLock = this.takeLock;
436 dl 1.2 takeLock.lockInterruptibly();
437     try {
438 jsr166 1.51 while (count.get() == 0) {
439 dl 1.2 if (nanos <= 0)
440     return null;
441 jsr166 1.51 nanos = notEmpty.awaitNanos(nanos);
442 dl 1.2 }
443 jsr166 1.51 x = dequeue();
444     c = count.getAndDecrement();
445     if (c > 1)
446     notEmpty.signal();
447 tim 1.17 } finally {
448 dl 1.2 takeLock.unlock();
449     }
450 tim 1.12 if (c == capacity)
451 dl 1.2 signalNotFull();
452     return x;
453     }
454    
455     public E poll() {
456 dl 1.31 final AtomicInteger count = this.count;
457 dl 1.2 if (count.get() == 0)
458     return null;
459     E x = null;
460 tim 1.12 int c = -1;
461 dl 1.31 final ReentrantLock takeLock = this.takeLock;
462 dl 1.30 takeLock.lock();
463 dl 1.2 try {
464     if (count.get() > 0) {
465 jsr166 1.51 x = dequeue();
466 dl 1.2 c = count.getAndDecrement();
467     if (c > 1)
468     notEmpty.signal();
469     }
470 tim 1.17 } finally {
471 dl 1.2 takeLock.unlock();
472     }
473 tim 1.12 if (c == capacity)
474 dl 1.2 signalNotFull();
475     return x;
476 tim 1.1 }
477 dl 1.2
478     public E peek() {
479     if (count.get() == 0)
480     return null;
481 dl 1.31 final ReentrantLock takeLock = this.takeLock;
482 dholmes 1.8 takeLock.lock();
483 dl 1.2 try {
484     Node<E> first = head.next;
485     if (first == null)
486     return null;
487     else
488     return first.item;
489 tim 1.17 } finally {
490 dl 1.2 takeLock.unlock();
491     }
492 tim 1.1 }
493    
494 dl 1.35 /**
495 jsr166 1.51 * Unlinks interior Node p with predecessor trail.
496     */
497     void unlink(Node<E> p, Node<E> trail) {
498     // assert isFullyLocked();
499     // p.next is not changed, to allow iterators that are
500     // traversing p to maintain their weak-consistency guarantee.
501     p.item = null;
502     trail.next = p.next;
503     if (last == p)
504     last = trail;
505     if (count.getAndDecrement() == capacity)
506     notFull.signal();
507     }
508    
509     /**
510 jsr166 1.44 * Removes a single instance of the specified element from this queue,
511 jsr166 1.51 * if it is present. More formally, removes an element {@code e} such
512     * that {@code o.equals(e)}, if this queue contains one or more such
513 jsr166 1.44 * elements.
514 jsr166 1.51 * Returns {@code true} if this queue contained the specified element
515 jsr166 1.44 * (or equivalently, if this queue changed as a result of the call).
516     *
517     * @param o element to be removed from this queue, if present
518 jsr166 1.51 * @return {@code true} if this queue changed as a result of the call
519 dl 1.35 */
520 dholmes 1.9 public boolean remove(Object o) {
521     if (o == null) return false;
522 dl 1.2 fullyLock();
523     try {
524 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
525     p != null;
526     trail = p, p = p.next) {
527 dholmes 1.9 if (o.equals(p.item)) {
528 jsr166 1.51 unlink(p, trail);
529     return true;
530 dl 1.2 }
531     }
532 jsr166 1.51 return false;
533 tim 1.17 } finally {
534 dl 1.2 fullyUnlock();
535     }
536 tim 1.1 }
537 dl 1.2
538 jsr166 1.43 /**
539 jsr166 1.56 * Returns {@code true} if this queue contains the specified element.
540     * More formally, returns {@code true} if and only if this queue contains
541     * at least one element {@code e} such that {@code o.equals(e)}.
542     *
543     * @param o object to be checked for containment in this queue
544     * @return {@code true} if this queue contains the specified element
545     */
546     public boolean contains(Object o) {
547     if (o == null) return false;
548     fullyLock();
549     try {
550     for (Node<E> p = head.next; p != null; p = p.next)
551     if (o.equals(p.item))
552     return true;
553     return false;
554     } finally {
555     fullyUnlock();
556     }
557     }
558    
559     /**
560 jsr166 1.43 * Returns an array containing all of the elements in this queue, in
561     * proper sequence.
562     *
563     * <p>The returned array will be "safe" in that no references to it are
564     * maintained by this queue. (In other words, this method must allocate
565     * a new array). The caller is thus free to modify the returned array.
566 jsr166 1.45 *
567 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
568     * APIs.
569     *
570     * @return an array containing all of the elements in this queue
571     */
572 dl 1.2 public Object[] toArray() {
573     fullyLock();
574     try {
575     int size = count.get();
576 tim 1.12 Object[] a = new Object[size];
577 dl 1.2 int k = 0;
578 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
579 dl 1.2 a[k++] = p.item;
580     return a;
581 tim 1.17 } finally {
582 dl 1.2 fullyUnlock();
583     }
584 tim 1.1 }
585 dl 1.2
586 jsr166 1.43 /**
587     * Returns an array containing all of the elements in this queue, in
588     * proper sequence; the runtime type of the returned array is that of
589     * the specified array. If the queue fits in the specified array, it
590     * is returned therein. Otherwise, a new array is allocated with the
591     * runtime type of the specified array and the size of this queue.
592     *
593     * <p>If this queue fits in the specified array with room to spare
594     * (i.e., the array has more elements than this queue), the element in
595     * the array immediately following the end of the queue is set to
596 jsr166 1.51 * {@code null}.
597 jsr166 1.43 *
598     * <p>Like the {@link #toArray()} method, this method acts as bridge between
599     * array-based and collection-based APIs. Further, this method allows
600     * precise control over the runtime type of the output array, and may,
601     * under certain circumstances, be used to save allocation costs.
602     *
603 jsr166 1.51 * <p>Suppose {@code x} is a queue known to contain only strings.
604 jsr166 1.43 * The following code can be used to dump the queue into a newly
605 jsr166 1.51 * allocated array of {@code String}:
606 jsr166 1.43 *
607 jsr166 1.62 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
608 jsr166 1.43 *
609 jsr166 1.51 * Note that {@code toArray(new Object[0])} is identical in function to
610     * {@code toArray()}.
611 jsr166 1.43 *
612     * @param a the array into which the elements of the queue are to
613     * be stored, if it is big enough; otherwise, a new array of the
614     * same runtime type is allocated for this purpose
615     * @return an array containing all of the elements in this queue
616     * @throws ArrayStoreException if the runtime type of the specified array
617     * is not a supertype of the runtime type of every element in
618     * this queue
619     * @throws NullPointerException if the specified array is null
620     */
621 jsr166 1.51 @SuppressWarnings("unchecked")
622 dl 1.2 public <T> T[] toArray(T[] a) {
623     fullyLock();
624     try {
625     int size = count.get();
626     if (a.length < size)
627 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
628     (a.getClass().getComponentType(), size);
629 tim 1.12
630 dl 1.2 int k = 0;
631 jsr166 1.51 for (Node<E> p = head.next; p != null; p = p.next)
632 dl 1.2 a[k++] = (T)p.item;
633 jsr166 1.47 if (a.length > k)
634     a[k] = null;
635 dl 1.2 return a;
636 tim 1.17 } finally {
637 dl 1.2 fullyUnlock();
638     }
639 tim 1.1 }
640 dl 1.2
641     public String toString() {
642     fullyLock();
643     try {
644 jsr166 1.55 Node<E> p = head.next;
645     if (p == null)
646     return "[]";
647    
648     StringBuilder sb = new StringBuilder();
649     sb.append('[');
650     for (;;) {
651     E e = p.item;
652     sb.append(e == this ? "(this Collection)" : e);
653     p = p.next;
654     if (p == null)
655     return sb.append(']').toString();
656     sb.append(',').append(' ');
657     }
658 tim 1.17 } finally {
659 dl 1.2 fullyUnlock();
660     }
661 tim 1.1 }
662 dl 1.2
663 dl 1.35 /**
664     * Atomically removes all of the elements from this queue.
665     * The queue will be empty after this call returns.
666     */
667 dl 1.24 public void clear() {
668     fullyLock();
669     try {
670 jsr166 1.51 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
671     h.next = h;
672     p.item = null;
673     }
674     head = last;
675     // assert head.item == null && head.next == null;
676 dl 1.24 if (count.getAndSet(0) == capacity)
677 jsr166 1.51 notFull.signal();
678 dl 1.24 } finally {
679     fullyUnlock();
680     }
681     }
682    
683 jsr166 1.43 /**
684     * @throws UnsupportedOperationException {@inheritDoc}
685     * @throws ClassCastException {@inheritDoc}
686     * @throws NullPointerException {@inheritDoc}
687     * @throws IllegalArgumentException {@inheritDoc}
688     */
689 dl 1.24 public int drainTo(Collection<? super E> c) {
690 jsr166 1.51 return drainTo(c, Integer.MAX_VALUE);
691 dl 1.24 }
692 jsr166 1.40
693 jsr166 1.43 /**
694     * @throws UnsupportedOperationException {@inheritDoc}
695     * @throws ClassCastException {@inheritDoc}
696     * @throws NullPointerException {@inheritDoc}
697     * @throws IllegalArgumentException {@inheritDoc}
698     */
699 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
700     if (c == null)
701     throw new NullPointerException();
702     if (c == this)
703     throw new IllegalArgumentException();
704 jsr166 1.63 if (maxElements <= 0)
705     return 0;
706 jsr166 1.51 boolean signalNotFull = false;
707     final ReentrantLock takeLock = this.takeLock;
708     takeLock.lock();
709 dl 1.24 try {
710 jsr166 1.51 int n = Math.min(maxElements, count.get());
711     // count.get provides visibility to first n Nodes
712     Node<E> h = head;
713     int i = 0;
714     try {
715     while (i < n) {
716     Node<E> p = h.next;
717     c.add(p.item);
718     p.item = null;
719     h.next = h;
720     h = p;
721     ++i;
722     }
723     return n;
724     } finally {
725     // Restore invariants even if c.add() threw
726     if (i > 0) {
727     // assert h.item == null;
728     head = h;
729     signalNotFull = (count.getAndAdd(-i) == capacity);
730     }
731 dl 1.24 }
732     } finally {
733 jsr166 1.51 takeLock.unlock();
734     if (signalNotFull)
735     signalNotFull();
736 dl 1.24 }
737     }
738    
739 dholmes 1.14 /**
740     * Returns an iterator over the elements in this queue in proper sequence.
741 jsr166 1.57 * The elements will be returned in order from first (head) to last (tail).
742     *
743     * <p>The returned iterator is a "weakly consistent" iterator that
744 jsr166 1.52 * will never throw {@link java.util.ConcurrentModificationException
745 jsr166 1.57 * ConcurrentModificationException}, and guarantees to traverse
746     * elements as they existed upon construction of the iterator, and
747     * may (but is not guaranteed to) reflect any modifications
748     * subsequent to construction.
749 dholmes 1.14 *
750 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
751 dholmes 1.14 */
752 dl 1.2 public Iterator<E> iterator() {
753 jsr166 1.59 return new Itr();
754 tim 1.1 }
755 dl 1.2
756     private class Itr implements Iterator<E> {
757 tim 1.12 /*
758 jsr166 1.51 * Basic weakly-consistent iterator. At all times hold the next
759 dl 1.4 * item to hand out so that if hasNext() reports true, we will
760     * still have it to return even if lost race with a take etc.
761     */
762 jsr166 1.72
763 dl 1.31 private Node<E> current;
764     private Node<E> lastRet;
765     private E currentElement;
766 tim 1.12
767 dl 1.2 Itr() {
768 jsr166 1.51 fullyLock();
769 dl 1.2 try {
770     current = head.next;
771 dl 1.4 if (current != null)
772     currentElement = current.item;
773 tim 1.17 } finally {
774 jsr166 1.51 fullyUnlock();
775 dl 1.2 }
776     }
777 tim 1.12
778     public boolean hasNext() {
779 dl 1.2 return current != null;
780     }
781    
782 jsr166 1.51 /**
783 jsr166 1.53 * Returns the next live successor of p, or null if no such.
784     *
785     * Unlike other traversal methods, iterators need to handle both:
786 jsr166 1.51 * - dequeued nodes (p.next == p)
787 jsr166 1.53 * - (possibly multiple) interior removed nodes (p.item == null)
788 jsr166 1.51 */
789     private Node<E> nextNode(Node<E> p) {
790 jsr166 1.53 for (;;) {
791     Node<E> s = p.next;
792     if (s == p)
793     return head.next;
794     if (s == null || s.item != null)
795     return s;
796     p = s;
797     }
798 jsr166 1.51 }
799    
800 tim 1.12 public E next() {
801 jsr166 1.51 fullyLock();
802 dl 1.2 try {
803     if (current == null)
804     throw new NoSuchElementException();
805 dl 1.4 E x = currentElement;
806 dl 1.2 lastRet = current;
807 jsr166 1.51 current = nextNode(current);
808     currentElement = (current == null) ? null : current.item;
809 dl 1.2 return x;
810 tim 1.17 } finally {
811 jsr166 1.51 fullyUnlock();
812 dl 1.2 }
813     }
814    
815 tim 1.12 public void remove() {
816 dl 1.2 if (lastRet == null)
817 tim 1.12 throw new IllegalStateException();
818 jsr166 1.51 fullyLock();
819 dl 1.2 try {
820     Node<E> node = lastRet;
821     lastRet = null;
822 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
823     p != null;
824     trail = p, p = p.next) {
825     if (p == node) {
826     unlink(p, trail);
827     break;
828     }
829 dl 1.2 }
830 tim 1.17 } finally {
831 jsr166 1.51 fullyUnlock();
832 dl 1.2 }
833     }
834 tim 1.1 }
835 dl 1.2
836 dl 1.74 static final class LBQSpliterator<E> implements Spliterator<E> {
837     // Similar idea to ConcurrentLinkedQueue spliterator
838     static final int MAX_BATCH = 1 << 11; // saturate batch size
839     final LinkedBlockingQueue<E> queue;
840     Node<E> current; // current node; null until initialized
841     int batch; // batch size for splits
842     boolean exhausted; // true when no more nodes
843     long est; // size estimate
844     LBQSpliterator(LinkedBlockingQueue<E> queue) {
845     this.queue = queue;
846     this.est = queue.size();
847     }
848    
849     public long estimateSize() { return est; }
850    
851     public Spliterator<E> trySplit() {
852     int n;
853     final LinkedBlockingQueue<E> q = this.queue;
854     if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) {
855     Object[] a = new Object[batch = n];
856     int i = 0;
857     Node<E> p = current;
858     q.fullyLock();
859     try {
860     if (p != null || (p = q.head.next) != null) {
861     do {
862     if ((a[i] = p.item) != null)
863     ++i;
864     } while ((p = p.next) != null && i < n);
865     }
866     } finally {
867     q.fullyUnlock();
868     }
869     if ((current = p) == null) {
870     est = 0L;
871     exhausted = true;
872     }
873     else if ((est -= i) <= 0L)
874     est = 1L;
875 dl 1.75 return Spliterators.spliterator
876 dl 1.74 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
877     Spliterator.CONCURRENT);
878     }
879     return null;
880     }
881    
882     public void forEach(Consumer<? super E> action) {
883     if (action == null) throw new NullPointerException();
884     final LinkedBlockingQueue<E> q = this.queue;
885     if (!exhausted) {
886     exhausted = true;
887     Node<E> p = current;
888     do {
889     E e = null;
890     q.fullyLock();
891     try {
892     if (p == null)
893     p = q.head.next;
894     while (p != null) {
895     e = p.item;
896     p = p.next;
897     if (e != null)
898     break;
899     }
900     } finally {
901     q.fullyUnlock();
902     }
903     if (e != null)
904     action.accept(e);
905     } while (p != null);
906     }
907     }
908    
909     public boolean tryAdvance(Consumer<? super E> action) {
910     if (action == null) throw new NullPointerException();
911     final LinkedBlockingQueue<E> q = this.queue;
912     if (!exhausted) {
913     E e = null;
914     q.fullyLock();
915     try {
916     if (current == null)
917     current = q.head.next;
918     while (current != null) {
919     e = current.item;
920     current = current.next;
921     if (e != null)
922     break;
923     }
924     } finally {
925     q.fullyUnlock();
926     }
927 dl 1.76 if (current == null)
928     exhausted = true;
929 dl 1.74 if (e != null) {
930     action.accept(e);
931     return true;
932     }
933     }
934     return false;
935     }
936    
937     public int characteristics() {
938     return Spliterator.ORDERED | Spliterator.NONNULL |
939     Spliterator.CONCURRENT;
940     }
941     }
942    
943 dl 1.76 public Spliterator<E> spliterator() {
944 dl 1.74 return new LBQSpliterator<E>(this);
945     }
946    
947 dl 1.2 /**
948 jsr166 1.68 * Saves this queue to a stream (that is, serializes it).
949 dl 1.2 *
950     * @serialData The capacity is emitted (int), followed by all of
951 jsr166 1.51 * its elements (each an {@code Object}) in the proper order,
952 dl 1.2 * followed by a null
953     */
954     private void writeObject(java.io.ObjectOutputStream s)
955     throws java.io.IOException {
956    
957 tim 1.12 fullyLock();
958 dl 1.2 try {
959     // Write out any hidden stuff, plus capacity
960     s.defaultWriteObject();
961    
962     // Write out all elements in the proper order.
963 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
964 dl 1.2 s.writeObject(p.item);
965    
966     // Use trailing null as sentinel
967     s.writeObject(null);
968 tim 1.17 } finally {
969 dl 1.2 fullyUnlock();
970     }
971 tim 1.1 }
972    
973 dl 1.2 /**
974 jsr166 1.65 * Reconstitutes this queue from a stream (that is, deserializes it).
975 dl 1.2 */
976     private void readObject(java.io.ObjectInputStream s)
977     throws java.io.IOException, ClassNotFoundException {
978 tim 1.12 // Read in capacity, and any hidden stuff
979     s.defaultReadObject();
980 dl 1.2
981 dl 1.19 count.set(0);
982     last = head = new Node<E>(null);
983    
984 dl 1.6 // Read in all elements and place in queue
985 dl 1.2 for (;;) {
986 jsr166 1.51 @SuppressWarnings("unchecked")
987 dl 1.2 E item = (E)s.readObject();
988     if (item == null)
989     break;
990     add(item);
991     }
992 tim 1.1 }
993     }