ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.61
Committed: Sat Jun 4 01:29:29 2011 UTC (13 years ago) by jsr166
Branch: MAIN
Changes since 1.60: +1 -1 lines
Log Message:
use nullary AtomicFoo constructors when possible

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.51
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14     import java.util.Iterator;
15     import java.util.NoSuchElementException;
16 tim 1.1
17     /**
18 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
19 dholmes 1.8 * linked nodes.
20     * This queue orders elements FIFO (first-in-first-out).
21 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
22 dholmes 1.8 * queue the longest time.
23     * The <em>tail</em> of the queue is that element that has been on the
24 dl 1.20 * queue the shortest time. New elements
25     * are inserted at the tail of the queue, and the queue retrieval
26     * operations obtain elements at the head of the queue.
27 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
28     * less predictable performance in most concurrent applications.
29 tim 1.12 *
30 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
31 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
32     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
33 dl 1.3 * dynamically created upon each insertion unless this would bring the
34     * queue above capacity.
35 dholmes 1.8 *
36 dl 1.36 * <p>This class and its iterator implement all of the
37     * <em>optional</em> methods of the {@link Collection} and {@link
38 dl 1.38 * Iterator} interfaces.
39 dl 1.21 *
40 dl 1.34 * <p>This class is a member of the
41 jsr166 1.48 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 dl 1.34 * Java Collections Framework</a>.
43     *
44 dl 1.6 * @since 1.5
45     * @author Doug Lea
46 dl 1.27 * @param <E> the type of elements held in this collection
47 tim 1.12 *
48 jsr166 1.40 */
49 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
50 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
51 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
52 tim 1.1
53 dl 1.2 /*
54     * A variant of the "two lock queue" algorithm. The putLock gates
55     * entry to put (and offer), and has an associated condition for
56     * waiting puts. Similarly for the takeLock. The "count" field
57     * that they both rely on is maintained as an atomic to avoid
58     * needing to get both locks in most cases. Also, to minimize need
59     * for puts to get takeLock and vice-versa, cascading notifies are
60     * used. When a put notices that it has enabled at least one take,
61     * it signals taker. That taker in turn signals others if more
62     * items have been entered since the signal. And symmetrically for
63 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
64 dl 1.2 * iterators acquire both locks.
65 jsr166 1.51 *
66     * Visibility between writers and readers is provided as follows:
67     *
68     * Whenever an element is enqueued, the putLock is acquired and
69     * count updated. A subsequent reader guarantees visibility to the
70     * enqueued Node by either acquiring the putLock (via fullyLock)
71     * or by acquiring the takeLock, and then reading n = count.get();
72     * this gives visibility to the first n items.
73     *
74     * To implement weakly consistent iterators, it appears we need to
75     * keep all Nodes GC-reachable from a predecessor dequeued Node.
76     * That would cause two problems:
77     * - allow a rogue Iterator to cause unbounded memory retention
78     * - cause cross-generational linking of old Nodes to new Nodes if
79     * a Node was tenured while live, which generational GCs have a
80     * hard time dealing with, causing repeated major collections.
81     * However, only non-deleted Nodes need to be reachable from
82     * dequeued Nodes, and reachability does not necessarily have to
83     * be of the kind understood by the GC. We use the trick of
84     * linking a Node that has just been dequeued to itself. Such a
85     * self-link implicitly means to advance to head.next.
86 dl 1.38 */
87 dl 1.2
88 dl 1.6 /**
89     * Linked list node class
90     */
91 dl 1.2 static class Node<E> {
92 jsr166 1.51 E item;
93    
94     /**
95     * One of:
96     * - the real successor Node
97     * - this Node, meaning the successor is head.next
98     * - null, meaning there is no successor (this is the last node)
99     */
100 dl 1.2 Node<E> next;
101 jsr166 1.51
102 dl 1.2 Node(E x) { item = x; }
103     }
104    
105 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
106 dl 1.2 private final int capacity;
107 dl 1.6
108     /** Current number of elements */
109 jsr166 1.61 private final AtomicInteger count = new AtomicInteger();
110 dl 1.2
111 jsr166 1.51 /**
112     * Head of linked list.
113     * Invariant: head.item == null
114     */
115 dl 1.6 private transient Node<E> head;
116    
117 jsr166 1.51 /**
118     * Tail of linked list.
119     * Invariant: last.next == null
120     */
121 dl 1.6 private transient Node<E> last;
122 dl 1.2
123 dl 1.6 /** Lock held by take, poll, etc */
124 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
125 dl 1.6
126     /** Wait queue for waiting takes */
127 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
128 dl 1.2
129 dl 1.6 /** Lock held by put, offer, etc */
130 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
131 dl 1.6
132     /** Wait queue for waiting puts */
133 dl 1.32 private final Condition notFull = putLock.newCondition();
134 dl 1.2
135     /**
136 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
137 dl 1.4 * otherwise ordinarily lock takeLock.)
138 dl 1.2 */
139     private void signalNotEmpty() {
140 dl 1.31 final ReentrantLock takeLock = this.takeLock;
141 dl 1.2 takeLock.lock();
142     try {
143     notEmpty.signal();
144 tim 1.17 } finally {
145 dl 1.2 takeLock.unlock();
146     }
147     }
148    
149     /**
150 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
151 dl 1.2 */
152     private void signalNotFull() {
153 dl 1.31 final ReentrantLock putLock = this.putLock;
154 dl 1.2 putLock.lock();
155     try {
156     notFull.signal();
157 tim 1.17 } finally {
158 dl 1.2 putLock.unlock();
159     }
160     }
161    
162     /**
163 dl 1.54 * Links node at end of queue.
164 jsr166 1.51 *
165 dl 1.54 * @param node the node
166 dl 1.2 */
167 dl 1.54 private void enqueue(Node<E> node) {
168 jsr166 1.51 // assert putLock.isHeldByCurrentThread();
169     // assert last.next == null;
170 dl 1.54 last = last.next = node;
171 dl 1.2 }
172    
173     /**
174 jsr166 1.51 * Removes a node from head of queue.
175     *
176 dl 1.6 * @return the node
177 dl 1.2 */
178 jsr166 1.51 private E dequeue() {
179     // assert takeLock.isHeldByCurrentThread();
180     // assert head.item == null;
181 dl 1.50 Node<E> h = head;
182     Node<E> first = h.next;
183 jsr166 1.51 h.next = h; // help GC
184 dl 1.2 head = first;
185 dl 1.28 E x = first.item;
186 dl 1.2 first.item = null;
187     return x;
188     }
189    
190     /**
191 tim 1.12 * Lock to prevent both puts and takes.
192 dl 1.2 */
193 jsr166 1.51 void fullyLock() {
194 dl 1.2 putLock.lock();
195     takeLock.lock();
196 tim 1.1 }
197 dl 1.2
198     /**
199 tim 1.12 * Unlock to allow both puts and takes.
200 dl 1.2 */
201 jsr166 1.51 void fullyUnlock() {
202 dl 1.2 takeLock.unlock();
203     putLock.unlock();
204     }
205    
206 jsr166 1.51 // /**
207     // * Tells whether both locks are held by current thread.
208     // */
209     // boolean isFullyLocked() {
210     // return (putLock.isHeldByCurrentThread() &&
211     // takeLock.isHeldByCurrentThread());
212     // }
213 dl 1.2
214     /**
215 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
216 dholmes 1.8 * {@link Integer#MAX_VALUE}.
217 dl 1.2 */
218     public LinkedBlockingQueue() {
219     this(Integer.MAX_VALUE);
220     }
221    
222     /**
223 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
224 tim 1.16 *
225 jsr166 1.43 * @param capacity the capacity of this queue
226 jsr166 1.51 * @throws IllegalArgumentException if {@code capacity} is not greater
227 jsr166 1.43 * than zero
228 dl 1.2 */
229     public LinkedBlockingQueue(int capacity) {
230 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
231 dl 1.2 this.capacity = capacity;
232 dl 1.6 last = head = new Node<E>(null);
233 dl 1.2 }
234    
235     /**
236 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
237 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
238 tim 1.12 * given collection,
239 dholmes 1.8 * added in traversal order of the collection's iterator.
240 jsr166 1.43 *
241 dholmes 1.9 * @param c the collection of elements to initially contain
242 jsr166 1.43 * @throws NullPointerException if the specified collection or any
243     * of its elements are null
244 dl 1.2 */
245 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
246 dl 1.2 this(Integer.MAX_VALUE);
247 jsr166 1.51 final ReentrantLock putLock = this.putLock;
248     putLock.lock(); // Never contended, but necessary for visibility
249     try {
250     int n = 0;
251     for (E e : c) {
252     if (e == null)
253     throw new NullPointerException();
254     if (n == capacity)
255     throw new IllegalStateException("Queue full");
256 dl 1.54 enqueue(new Node<E>(e));
257 jsr166 1.51 ++n;
258     }
259     count.set(n);
260     } finally {
261     putLock.unlock();
262     }
263 dl 1.2 }
264    
265 dholmes 1.9
266 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
267     // greater in size than Integer.MAX_VALUE
268 tim 1.12 /**
269 dl 1.20 * Returns the number of elements in this queue.
270     *
271 jsr166 1.43 * @return the number of elements in this queue
272 dholmes 1.8 */
273 dl 1.2 public int size() {
274     return count.get();
275 tim 1.1 }
276 dl 1.2
277 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
278     // without the reference to unlimited queues.
279 tim 1.12 /**
280 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
281     * (in the absence of memory or resource constraints) accept without
282 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
283 jsr166 1.51 * less the current {@code size} of this queue.
284 jsr166 1.41 *
285     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
286 jsr166 1.51 * an element will succeed by inspecting {@code remainingCapacity}
287 jsr166 1.41 * because it may be the case that another thread is about to
288 jsr166 1.43 * insert or remove an element.
289 dholmes 1.8 */
290 dl 1.2 public int remainingCapacity() {
291     return capacity - count.get();
292     }
293    
294 dholmes 1.22 /**
295 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
296 dholmes 1.22 * necessary for space to become available.
297 jsr166 1.43 *
298     * @throws InterruptedException {@inheritDoc}
299     * @throws NullPointerException {@inheritDoc}
300 dholmes 1.22 */
301 jsr166 1.42 public void put(E e) throws InterruptedException {
302     if (e == null) throw new NullPointerException();
303 jsr166 1.51 // Note: convention in all put/take/etc is to preset local var
304     // holding count negative to indicate failure unless set.
305 tim 1.12 int c = -1;
306 jsr166 1.60 Node<E> node = new Node<E>(e);
307 dl 1.31 final ReentrantLock putLock = this.putLock;
308     final AtomicInteger count = this.count;
309 dl 1.2 putLock.lockInterruptibly();
310     try {
311     /*
312     * Note that count is used in wait guard even though it is
313     * not protected by lock. This works because count can
314     * only decrease at this point (all other puts are shut
315     * out by lock), and we (or some other waiting put) are
316 jsr166 1.51 * signalled if it ever changes from capacity. Similarly
317     * for all other uses of count in other wait guards.
318 dl 1.2 */
319 jsr166 1.51 while (count.get() == capacity) {
320     notFull.await();
321 dl 1.2 }
322 dl 1.54 enqueue(node);
323 dl 1.2 c = count.getAndIncrement();
324 dl 1.6 if (c + 1 < capacity)
325 dl 1.2 notFull.signal();
326 tim 1.17 } finally {
327 dl 1.2 putLock.unlock();
328     }
329 tim 1.12 if (c == 0)
330 dl 1.2 signalNotEmpty();
331 tim 1.1 }
332 dl 1.2
333 dholmes 1.22 /**
334     * Inserts the specified element at the tail of this queue, waiting if
335     * necessary up to the specified wait time for space to become available.
336 jsr166 1.43 *
337 jsr166 1.51 * @return {@code true} if successful, or {@code false} if
338 jsr166 1.43 * the specified waiting time elapses before space is available.
339     * @throws InterruptedException {@inheritDoc}
340     * @throws NullPointerException {@inheritDoc}
341 dholmes 1.22 */
342 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
343 dholmes 1.8 throws InterruptedException {
344 tim 1.12
345 jsr166 1.42 if (e == null) throw new NullPointerException();
346 dl 1.2 long nanos = unit.toNanos(timeout);
347     int c = -1;
348 dl 1.31 final ReentrantLock putLock = this.putLock;
349     final AtomicInteger count = this.count;
350 dholmes 1.8 putLock.lockInterruptibly();
351 dl 1.2 try {
352 jsr166 1.51 while (count.get() == capacity) {
353 dl 1.2 if (nanos <= 0)
354     return false;
355 jsr166 1.51 nanos = notFull.awaitNanos(nanos);
356 dl 1.2 }
357 dl 1.54 enqueue(new Node<E>(e));
358 jsr166 1.51 c = count.getAndIncrement();
359     if (c + 1 < capacity)
360     notFull.signal();
361 tim 1.17 } finally {
362 dl 1.2 putLock.unlock();
363     }
364 tim 1.12 if (c == 0)
365 dl 1.2 signalNotEmpty();
366     return true;
367 tim 1.1 }
368 dl 1.2
369 dl 1.23 /**
370 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
371     * possible to do so immediately without exceeding the queue's capacity,
372 jsr166 1.51 * returning {@code true} upon success and {@code false} if this queue
373 jsr166 1.44 * is full.
374     * When using a capacity-restricted queue, this method is generally
375     * preferable to method {@link BlockingQueue#add add}, which can fail to
376     * insert an element only by throwing an exception.
377 dl 1.23 *
378 jsr166 1.43 * @throws NullPointerException if the specified element is null
379 dl 1.23 */
380 jsr166 1.42 public boolean offer(E e) {
381     if (e == null) throw new NullPointerException();
382 dl 1.31 final AtomicInteger count = this.count;
383 dl 1.2 if (count.get() == capacity)
384     return false;
385 tim 1.12 int c = -1;
386 jsr166 1.60 Node<E> node = new Node<E>(e);
387 dl 1.31 final ReentrantLock putLock = this.putLock;
388 dholmes 1.8 putLock.lock();
389 dl 1.2 try {
390     if (count.get() < capacity) {
391 dl 1.54 enqueue(node);
392 dl 1.2 c = count.getAndIncrement();
393 dl 1.6 if (c + 1 < capacity)
394 dl 1.2 notFull.signal();
395     }
396 tim 1.17 } finally {
397 dl 1.2 putLock.unlock();
398     }
399 tim 1.12 if (c == 0)
400 dl 1.2 signalNotEmpty();
401     return c >= 0;
402 tim 1.1 }
403 dl 1.2
404    
405     public E take() throws InterruptedException {
406     E x;
407     int c = -1;
408 dl 1.31 final AtomicInteger count = this.count;
409     final ReentrantLock takeLock = this.takeLock;
410 dl 1.2 takeLock.lockInterruptibly();
411     try {
412 jsr166 1.51 while (count.get() == 0) {
413     notEmpty.await();
414 dl 1.2 }
415 jsr166 1.51 x = dequeue();
416 dl 1.2 c = count.getAndDecrement();
417     if (c > 1)
418     notEmpty.signal();
419 tim 1.17 } finally {
420 dl 1.2 takeLock.unlock();
421     }
422 tim 1.12 if (c == capacity)
423 dl 1.2 signalNotFull();
424     return x;
425     }
426    
427     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428     E x = null;
429     int c = -1;
430 dholmes 1.8 long nanos = unit.toNanos(timeout);
431 dl 1.31 final AtomicInteger count = this.count;
432     final ReentrantLock takeLock = this.takeLock;
433 dl 1.2 takeLock.lockInterruptibly();
434     try {
435 jsr166 1.51 while (count.get() == 0) {
436 dl 1.2 if (nanos <= 0)
437     return null;
438 jsr166 1.51 nanos = notEmpty.awaitNanos(nanos);
439 dl 1.2 }
440 jsr166 1.51 x = dequeue();
441     c = count.getAndDecrement();
442     if (c > 1)
443     notEmpty.signal();
444 tim 1.17 } finally {
445 dl 1.2 takeLock.unlock();
446     }
447 tim 1.12 if (c == capacity)
448 dl 1.2 signalNotFull();
449     return x;
450     }
451    
452     public E poll() {
453 dl 1.31 final AtomicInteger count = this.count;
454 dl 1.2 if (count.get() == 0)
455     return null;
456     E x = null;
457 tim 1.12 int c = -1;
458 dl 1.31 final ReentrantLock takeLock = this.takeLock;
459 dl 1.30 takeLock.lock();
460 dl 1.2 try {
461     if (count.get() > 0) {
462 jsr166 1.51 x = dequeue();
463 dl 1.2 c = count.getAndDecrement();
464     if (c > 1)
465     notEmpty.signal();
466     }
467 tim 1.17 } finally {
468 dl 1.2 takeLock.unlock();
469     }
470 tim 1.12 if (c == capacity)
471 dl 1.2 signalNotFull();
472     return x;
473 tim 1.1 }
474 dl 1.2
475     public E peek() {
476     if (count.get() == 0)
477     return null;
478 dl 1.31 final ReentrantLock takeLock = this.takeLock;
479 dholmes 1.8 takeLock.lock();
480 dl 1.2 try {
481     Node<E> first = head.next;
482     if (first == null)
483     return null;
484     else
485     return first.item;
486 tim 1.17 } finally {
487 dl 1.2 takeLock.unlock();
488     }
489 tim 1.1 }
490    
491 dl 1.35 /**
492 jsr166 1.51 * Unlinks interior Node p with predecessor trail.
493     */
494     void unlink(Node<E> p, Node<E> trail) {
495     // assert isFullyLocked();
496     // p.next is not changed, to allow iterators that are
497     // traversing p to maintain their weak-consistency guarantee.
498     p.item = null;
499     trail.next = p.next;
500     if (last == p)
501     last = trail;
502     if (count.getAndDecrement() == capacity)
503     notFull.signal();
504     }
505    
506     /**
507 jsr166 1.44 * Removes a single instance of the specified element from this queue,
508 jsr166 1.51 * if it is present. More formally, removes an element {@code e} such
509     * that {@code o.equals(e)}, if this queue contains one or more such
510 jsr166 1.44 * elements.
511 jsr166 1.51 * Returns {@code true} if this queue contained the specified element
512 jsr166 1.44 * (or equivalently, if this queue changed as a result of the call).
513     *
514     * @param o element to be removed from this queue, if present
515 jsr166 1.51 * @return {@code true} if this queue changed as a result of the call
516 dl 1.35 */
517 dholmes 1.9 public boolean remove(Object o) {
518     if (o == null) return false;
519 dl 1.2 fullyLock();
520     try {
521 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
522     p != null;
523     trail = p, p = p.next) {
524 dholmes 1.9 if (o.equals(p.item)) {
525 jsr166 1.51 unlink(p, trail);
526     return true;
527 dl 1.2 }
528     }
529 jsr166 1.51 return false;
530 tim 1.17 } finally {
531 dl 1.2 fullyUnlock();
532     }
533 tim 1.1 }
534 dl 1.2
535 jsr166 1.43 /**
536 jsr166 1.56 * Returns {@code true} if this queue contains the specified element.
537     * More formally, returns {@code true} if and only if this queue contains
538     * at least one element {@code e} such that {@code o.equals(e)}.
539     *
540     * @param o object to be checked for containment in this queue
541     * @return {@code true} if this queue contains the specified element
542     */
543     public boolean contains(Object o) {
544     if (o == null) return false;
545     fullyLock();
546     try {
547     for (Node<E> p = head.next; p != null; p = p.next)
548     if (o.equals(p.item))
549     return true;
550     return false;
551     } finally {
552     fullyUnlock();
553     }
554     }
555    
556     /**
557 jsr166 1.43 * Returns an array containing all of the elements in this queue, in
558     * proper sequence.
559     *
560     * <p>The returned array will be "safe" in that no references to it are
561     * maintained by this queue. (In other words, this method must allocate
562     * a new array). The caller is thus free to modify the returned array.
563 jsr166 1.45 *
564 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
565     * APIs.
566     *
567     * @return an array containing all of the elements in this queue
568     */
569 dl 1.2 public Object[] toArray() {
570     fullyLock();
571     try {
572     int size = count.get();
573 tim 1.12 Object[] a = new Object[size];
574 dl 1.2 int k = 0;
575 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
576 dl 1.2 a[k++] = p.item;
577     return a;
578 tim 1.17 } finally {
579 dl 1.2 fullyUnlock();
580     }
581 tim 1.1 }
582 dl 1.2
583 jsr166 1.43 /**
584     * Returns an array containing all of the elements in this queue, in
585     * proper sequence; the runtime type of the returned array is that of
586     * the specified array. If the queue fits in the specified array, it
587     * is returned therein. Otherwise, a new array is allocated with the
588     * runtime type of the specified array and the size of this queue.
589     *
590     * <p>If this queue fits in the specified array with room to spare
591     * (i.e., the array has more elements than this queue), the element in
592     * the array immediately following the end of the queue is set to
593 jsr166 1.51 * {@code null}.
594 jsr166 1.43 *
595     * <p>Like the {@link #toArray()} method, this method acts as bridge between
596     * array-based and collection-based APIs. Further, this method allows
597     * precise control over the runtime type of the output array, and may,
598     * under certain circumstances, be used to save allocation costs.
599     *
600 jsr166 1.51 * <p>Suppose {@code x} is a queue known to contain only strings.
601 jsr166 1.43 * The following code can be used to dump the queue into a newly
602 jsr166 1.51 * allocated array of {@code String}:
603 jsr166 1.43 *
604     * <pre>
605     * String[] y = x.toArray(new String[0]);</pre>
606     *
607 jsr166 1.51 * Note that {@code toArray(new Object[0])} is identical in function to
608     * {@code toArray()}.
609 jsr166 1.43 *
610     * @param a the array into which the elements of the queue are to
611     * be stored, if it is big enough; otherwise, a new array of the
612     * same runtime type is allocated for this purpose
613     * @return an array containing all of the elements in this queue
614     * @throws ArrayStoreException if the runtime type of the specified array
615     * is not a supertype of the runtime type of every element in
616     * this queue
617     * @throws NullPointerException if the specified array is null
618     */
619 jsr166 1.51 @SuppressWarnings("unchecked")
620 dl 1.2 public <T> T[] toArray(T[] a) {
621     fullyLock();
622     try {
623     int size = count.get();
624     if (a.length < size)
625 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
626     (a.getClass().getComponentType(), size);
627 tim 1.12
628 dl 1.2 int k = 0;
629 jsr166 1.51 for (Node<E> p = head.next; p != null; p = p.next)
630 dl 1.2 a[k++] = (T)p.item;
631 jsr166 1.47 if (a.length > k)
632     a[k] = null;
633 dl 1.2 return a;
634 tim 1.17 } finally {
635 dl 1.2 fullyUnlock();
636     }
637 tim 1.1 }
638 dl 1.2
639     public String toString() {
640     fullyLock();
641     try {
642 jsr166 1.55 Node<E> p = head.next;
643     if (p == null)
644     return "[]";
645    
646     StringBuilder sb = new StringBuilder();
647     sb.append('[');
648     for (;;) {
649     E e = p.item;
650     sb.append(e == this ? "(this Collection)" : e);
651     p = p.next;
652     if (p == null)
653     return sb.append(']').toString();
654     sb.append(',').append(' ');
655     }
656 tim 1.17 } finally {
657 dl 1.2 fullyUnlock();
658     }
659 tim 1.1 }
660 dl 1.2
661 dl 1.35 /**
662     * Atomically removes all of the elements from this queue.
663     * The queue will be empty after this call returns.
664     */
665 dl 1.24 public void clear() {
666     fullyLock();
667     try {
668 jsr166 1.51 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
669     h.next = h;
670     p.item = null;
671     }
672     head = last;
673     // assert head.item == null && head.next == null;
674 dl 1.24 if (count.getAndSet(0) == capacity)
675 jsr166 1.51 notFull.signal();
676 dl 1.24 } finally {
677     fullyUnlock();
678     }
679     }
680    
681 jsr166 1.43 /**
682     * @throws UnsupportedOperationException {@inheritDoc}
683     * @throws ClassCastException {@inheritDoc}
684     * @throws NullPointerException {@inheritDoc}
685     * @throws IllegalArgumentException {@inheritDoc}
686     */
687 dl 1.24 public int drainTo(Collection<? super E> c) {
688 jsr166 1.51 return drainTo(c, Integer.MAX_VALUE);
689 dl 1.24 }
690 jsr166 1.40
691 jsr166 1.43 /**
692     * @throws UnsupportedOperationException {@inheritDoc}
693     * @throws ClassCastException {@inheritDoc}
694     * @throws NullPointerException {@inheritDoc}
695     * @throws IllegalArgumentException {@inheritDoc}
696     */
697 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
698     if (c == null)
699     throw new NullPointerException();
700     if (c == this)
701     throw new IllegalArgumentException();
702 jsr166 1.51 boolean signalNotFull = false;
703     final ReentrantLock takeLock = this.takeLock;
704     takeLock.lock();
705 dl 1.24 try {
706 jsr166 1.51 int n = Math.min(maxElements, count.get());
707     // count.get provides visibility to first n Nodes
708     Node<E> h = head;
709     int i = 0;
710     try {
711     while (i < n) {
712     Node<E> p = h.next;
713     c.add(p.item);
714     p.item = null;
715     h.next = h;
716     h = p;
717     ++i;
718     }
719     return n;
720     } finally {
721     // Restore invariants even if c.add() threw
722     if (i > 0) {
723     // assert h.item == null;
724     head = h;
725     signalNotFull = (count.getAndAdd(-i) == capacity);
726     }
727 dl 1.24 }
728     } finally {
729 jsr166 1.51 takeLock.unlock();
730     if (signalNotFull)
731     signalNotFull();
732 dl 1.24 }
733     }
734    
735 dholmes 1.14 /**
736     * Returns an iterator over the elements in this queue in proper sequence.
737 jsr166 1.57 * The elements will be returned in order from first (head) to last (tail).
738     *
739     * <p>The returned iterator is a "weakly consistent" iterator that
740 jsr166 1.52 * will never throw {@link java.util.ConcurrentModificationException
741 jsr166 1.57 * ConcurrentModificationException}, and guarantees to traverse
742     * elements as they existed upon construction of the iterator, and
743     * may (but is not guaranteed to) reflect any modifications
744     * subsequent to construction.
745 dholmes 1.14 *
746 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
747 dholmes 1.14 */
748 dl 1.2 public Iterator<E> iterator() {
749 jsr166 1.59 return new Itr();
750 tim 1.1 }
751 dl 1.2
752     private class Itr implements Iterator<E> {
753 tim 1.12 /*
754 jsr166 1.51 * Basic weakly-consistent iterator. At all times hold the next
755 dl 1.4 * item to hand out so that if hasNext() reports true, we will
756     * still have it to return even if lost race with a take etc.
757     */
758 dl 1.31 private Node<E> current;
759     private Node<E> lastRet;
760     private E currentElement;
761 tim 1.12
762 dl 1.2 Itr() {
763 jsr166 1.51 fullyLock();
764 dl 1.2 try {
765     current = head.next;
766 dl 1.4 if (current != null)
767     currentElement = current.item;
768 tim 1.17 } finally {
769 jsr166 1.51 fullyUnlock();
770 dl 1.2 }
771     }
772 tim 1.12
773     public boolean hasNext() {
774 dl 1.2 return current != null;
775     }
776    
777 jsr166 1.51 /**
778 jsr166 1.53 * Returns the next live successor of p, or null if no such.
779     *
780     * Unlike other traversal methods, iterators need to handle both:
781 jsr166 1.51 * - dequeued nodes (p.next == p)
782 jsr166 1.53 * - (possibly multiple) interior removed nodes (p.item == null)
783 jsr166 1.51 */
784     private Node<E> nextNode(Node<E> p) {
785 jsr166 1.53 for (;;) {
786     Node<E> s = p.next;
787     if (s == p)
788     return head.next;
789     if (s == null || s.item != null)
790     return s;
791     p = s;
792     }
793 jsr166 1.51 }
794    
795 tim 1.12 public E next() {
796 jsr166 1.51 fullyLock();
797 dl 1.2 try {
798     if (current == null)
799     throw new NoSuchElementException();
800 dl 1.4 E x = currentElement;
801 dl 1.2 lastRet = current;
802 jsr166 1.51 current = nextNode(current);
803     currentElement = (current == null) ? null : current.item;
804 dl 1.2 return x;
805 tim 1.17 } finally {
806 jsr166 1.51 fullyUnlock();
807 dl 1.2 }
808     }
809    
810 tim 1.12 public void remove() {
811 dl 1.2 if (lastRet == null)
812 tim 1.12 throw new IllegalStateException();
813 jsr166 1.51 fullyLock();
814 dl 1.2 try {
815     Node<E> node = lastRet;
816     lastRet = null;
817 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
818     p != null;
819     trail = p, p = p.next) {
820     if (p == node) {
821     unlink(p, trail);
822     break;
823     }
824 dl 1.2 }
825 tim 1.17 } finally {
826 jsr166 1.51 fullyUnlock();
827 dl 1.2 }
828     }
829 tim 1.1 }
830 dl 1.2
831     /**
832     * Save the state to a stream (that is, serialize it).
833     *
834     * @serialData The capacity is emitted (int), followed by all of
835 jsr166 1.51 * its elements (each an {@code Object}) in the proper order,
836 dl 1.2 * followed by a null
837 dl 1.6 * @param s the stream
838 dl 1.2 */
839     private void writeObject(java.io.ObjectOutputStream s)
840     throws java.io.IOException {
841    
842 tim 1.12 fullyLock();
843 dl 1.2 try {
844     // Write out any hidden stuff, plus capacity
845     s.defaultWriteObject();
846    
847     // Write out all elements in the proper order.
848 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
849 dl 1.2 s.writeObject(p.item);
850    
851     // Use trailing null as sentinel
852     s.writeObject(null);
853 tim 1.17 } finally {
854 dl 1.2 fullyUnlock();
855     }
856 tim 1.1 }
857    
858 dl 1.2 /**
859 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
860 dl 1.2 * deserialize it).
861 jsr166 1.51 *
862 dl 1.6 * @param s the stream
863 dl 1.2 */
864     private void readObject(java.io.ObjectInputStream s)
865     throws java.io.IOException, ClassNotFoundException {
866 tim 1.12 // Read in capacity, and any hidden stuff
867     s.defaultReadObject();
868 dl 1.2
869 dl 1.19 count.set(0);
870     last = head = new Node<E>(null);
871    
872 dl 1.6 // Read in all elements and place in queue
873 dl 1.2 for (;;) {
874 jsr166 1.51 @SuppressWarnings("unchecked")
875 dl 1.2 E item = (E)s.readObject();
876     if (item == null)
877     break;
878     add(item);
879     }
880 tim 1.1 }
881     }