ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.71
Committed: Sun Dec 30 05:23:05 2012 UTC (11 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.70: +2 -2 lines
Log Message:
javadoc style

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.51
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14     import java.util.Iterator;
15     import java.util.NoSuchElementException;
16 tim 1.1
17     /**
18 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
19 dholmes 1.8 * linked nodes.
20     * This queue orders elements FIFO (first-in-first-out).
21 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
22 dholmes 1.8 * queue the longest time.
23     * The <em>tail</em> of the queue is that element that has been on the
24 dl 1.20 * queue the shortest time. New elements
25     * are inserted at the tail of the queue, and the queue retrieval
26     * operations obtain elements at the head of the queue.
27 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
28     * less predictable performance in most concurrent applications.
29 tim 1.12 *
30 jsr166 1.70 * <p>The optional capacity bound constructor argument serves as a
31 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
32     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
33 dl 1.3 * dynamically created upon each insertion unless this would bring the
34     * queue above capacity.
35 dholmes 1.8 *
36 dl 1.36 * <p>This class and its iterator implement all of the
37     * <em>optional</em> methods of the {@link Collection} and {@link
38 dl 1.38 * Iterator} interfaces.
39 dl 1.21 *
40 dl 1.34 * <p>This class is a member of the
41 jsr166 1.48 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 dl 1.34 * Java Collections Framework</a>.
43     *
44 dl 1.6 * @since 1.5
45     * @author Doug Lea
46 dl 1.27 * @param <E> the type of elements held in this collection
47 jsr166 1.40 */
48 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
49 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
50 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
51 tim 1.1
52 dl 1.2 /*
53     * A variant of the "two lock queue" algorithm. The putLock gates
54     * entry to put (and offer), and has an associated condition for
55     * waiting puts. Similarly for the takeLock. The "count" field
56     * that they both rely on is maintained as an atomic to avoid
57     * needing to get both locks in most cases. Also, to minimize need
58     * for puts to get takeLock and vice-versa, cascading notifies are
59     * used. When a put notices that it has enabled at least one take,
60     * it signals taker. That taker in turn signals others if more
61     * items have been entered since the signal. And symmetrically for
62 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
63 dl 1.2 * iterators acquire both locks.
64 jsr166 1.51 *
65     * Visibility between writers and readers is provided as follows:
66     *
67     * Whenever an element is enqueued, the putLock is acquired and
68     * count updated. A subsequent reader guarantees visibility to the
69     * enqueued Node by either acquiring the putLock (via fullyLock)
70     * or by acquiring the takeLock, and then reading n = count.get();
71     * this gives visibility to the first n items.
72     *
73     * To implement weakly consistent iterators, it appears we need to
74     * keep all Nodes GC-reachable from a predecessor dequeued Node.
75     * That would cause two problems:
76     * - allow a rogue Iterator to cause unbounded memory retention
77     * - cause cross-generational linking of old Nodes to new Nodes if
78     * a Node was tenured while live, which generational GCs have a
79     * hard time dealing with, causing repeated major collections.
80     * However, only non-deleted Nodes need to be reachable from
81     * dequeued Nodes, and reachability does not necessarily have to
82     * be of the kind understood by the GC. We use the trick of
83     * linking a Node that has just been dequeued to itself. Such a
84     * self-link implicitly means to advance to head.next.
85 dl 1.38 */
86 dl 1.2
87 dl 1.6 /**
88     * Linked list node class
89     */
90 dl 1.2 static class Node<E> {
91 jsr166 1.51 E item;
92    
93     /**
94     * One of:
95     * - the real successor Node
96     * - this Node, meaning the successor is head.next
97     * - null, meaning there is no successor (this is the last node)
98     */
99 dl 1.2 Node<E> next;
100 jsr166 1.51
101 dl 1.2 Node(E x) { item = x; }
102     }
103    
104 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
105 dl 1.2 private final int capacity;
106 dl 1.6
107     /** Current number of elements */
108 jsr166 1.61 private final AtomicInteger count = new AtomicInteger();
109 dl 1.2
110 jsr166 1.51 /**
111     * Head of linked list.
112     * Invariant: head.item == null
113     */
114 jsr166 1.64 transient Node<E> head;
115 dl 1.6
116 jsr166 1.51 /**
117     * Tail of linked list.
118     * Invariant: last.next == null
119     */
120 dl 1.6 private transient Node<E> last;
121 dl 1.2
122 dl 1.6 /** Lock held by take, poll, etc */
123 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
124 dl 1.6
125     /** Wait queue for waiting takes */
126 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
127 dl 1.2
128 dl 1.6 /** Lock held by put, offer, etc */
129 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
130 dl 1.6
131     /** Wait queue for waiting puts */
132 dl 1.32 private final Condition notFull = putLock.newCondition();
133 dl 1.2
134     /**
135 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
136 dl 1.4 * otherwise ordinarily lock takeLock.)
137 dl 1.2 */
138     private void signalNotEmpty() {
139 dl 1.31 final ReentrantLock takeLock = this.takeLock;
140 dl 1.2 takeLock.lock();
141     try {
142     notEmpty.signal();
143 tim 1.17 } finally {
144 dl 1.2 takeLock.unlock();
145     }
146     }
147    
148     /**
149 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
150 dl 1.2 */
151     private void signalNotFull() {
152 dl 1.31 final ReentrantLock putLock = this.putLock;
153 dl 1.2 putLock.lock();
154     try {
155     notFull.signal();
156 tim 1.17 } finally {
157 dl 1.2 putLock.unlock();
158     }
159     }
160    
161     /**
162 dl 1.54 * Links node at end of queue.
163 jsr166 1.51 *
164 dl 1.54 * @param node the node
165 dl 1.2 */
166 dl 1.54 private void enqueue(Node<E> node) {
167 jsr166 1.51 // assert putLock.isHeldByCurrentThread();
168     // assert last.next == null;
169 dl 1.54 last = last.next = node;
170 dl 1.2 }
171    
172     /**
173 jsr166 1.51 * Removes a node from head of queue.
174     *
175 dl 1.6 * @return the node
176 dl 1.2 */
177 jsr166 1.51 private E dequeue() {
178     // assert takeLock.isHeldByCurrentThread();
179     // assert head.item == null;
180 dl 1.50 Node<E> h = head;
181     Node<E> first = h.next;
182 jsr166 1.51 h.next = h; // help GC
183 dl 1.2 head = first;
184 dl 1.28 E x = first.item;
185 dl 1.2 first.item = null;
186     return x;
187     }
188    
189     /**
190 jsr166 1.71 * Locks to prevent both puts and takes.
191 dl 1.2 */
192 jsr166 1.51 void fullyLock() {
193 dl 1.2 putLock.lock();
194     takeLock.lock();
195 tim 1.1 }
196 dl 1.2
197     /**
198 jsr166 1.71 * Unlocks to allow both puts and takes.
199 dl 1.2 */
200 jsr166 1.51 void fullyUnlock() {
201 dl 1.2 takeLock.unlock();
202     putLock.unlock();
203     }
204    
205 jsr166 1.51 // /**
206     // * Tells whether both locks are held by current thread.
207     // */
208     // boolean isFullyLocked() {
209     // return (putLock.isHeldByCurrentThread() &&
210     // takeLock.isHeldByCurrentThread());
211     // }
212 dl 1.2
213     /**
214 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
215 dholmes 1.8 * {@link Integer#MAX_VALUE}.
216 dl 1.2 */
217     public LinkedBlockingQueue() {
218     this(Integer.MAX_VALUE);
219     }
220    
221     /**
222 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
223 tim 1.16 *
224 jsr166 1.43 * @param capacity the capacity of this queue
225 jsr166 1.51 * @throws IllegalArgumentException if {@code capacity} is not greater
226 jsr166 1.43 * than zero
227 dl 1.2 */
228     public LinkedBlockingQueue(int capacity) {
229 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
230 dl 1.2 this.capacity = capacity;
231 dl 1.6 last = head = new Node<E>(null);
232 dl 1.2 }
233    
234     /**
235 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
236 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
237 tim 1.12 * given collection,
238 dholmes 1.8 * added in traversal order of the collection's iterator.
239 jsr166 1.43 *
240 dholmes 1.9 * @param c the collection of elements to initially contain
241 jsr166 1.43 * @throws NullPointerException if the specified collection or any
242     * of its elements are null
243 dl 1.2 */
244 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
245 dl 1.2 this(Integer.MAX_VALUE);
246 jsr166 1.51 final ReentrantLock putLock = this.putLock;
247     putLock.lock(); // Never contended, but necessary for visibility
248     try {
249     int n = 0;
250     for (E e : c) {
251     if (e == null)
252     throw new NullPointerException();
253     if (n == capacity)
254     throw new IllegalStateException("Queue full");
255 dl 1.54 enqueue(new Node<E>(e));
256 jsr166 1.51 ++n;
257     }
258     count.set(n);
259     } finally {
260     putLock.unlock();
261     }
262 dl 1.2 }
263    
264 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
265     // greater in size than Integer.MAX_VALUE
266 tim 1.12 /**
267 dl 1.20 * Returns the number of elements in this queue.
268     *
269 jsr166 1.43 * @return the number of elements in this queue
270 dholmes 1.8 */
271 dl 1.2 public int size() {
272     return count.get();
273 tim 1.1 }
274 dl 1.2
275 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
276     // without the reference to unlimited queues.
277 tim 1.12 /**
278 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
279     * (in the absence of memory or resource constraints) accept without
280 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
281 jsr166 1.51 * less the current {@code size} of this queue.
282 jsr166 1.41 *
283     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
284 jsr166 1.51 * an element will succeed by inspecting {@code remainingCapacity}
285 jsr166 1.41 * because it may be the case that another thread is about to
286 jsr166 1.43 * insert or remove an element.
287 dholmes 1.8 */
288 dl 1.2 public int remainingCapacity() {
289     return capacity - count.get();
290     }
291    
292 dholmes 1.22 /**
293 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
294 dholmes 1.22 * necessary for space to become available.
295 jsr166 1.43 *
296     * @throws InterruptedException {@inheritDoc}
297     * @throws NullPointerException {@inheritDoc}
298 dholmes 1.22 */
299 jsr166 1.42 public void put(E e) throws InterruptedException {
300     if (e == null) throw new NullPointerException();
301 jsr166 1.51 // Note: convention in all put/take/etc is to preset local var
302     // holding count negative to indicate failure unless set.
303 tim 1.12 int c = -1;
304 jsr166 1.60 Node<E> node = new Node<E>(e);
305 dl 1.31 final ReentrantLock putLock = this.putLock;
306     final AtomicInteger count = this.count;
307 dl 1.2 putLock.lockInterruptibly();
308     try {
309     /*
310     * Note that count is used in wait guard even though it is
311     * not protected by lock. This works because count can
312     * only decrease at this point (all other puts are shut
313     * out by lock), and we (or some other waiting put) are
314 jsr166 1.51 * signalled if it ever changes from capacity. Similarly
315     * for all other uses of count in other wait guards.
316 dl 1.2 */
317 jsr166 1.51 while (count.get() == capacity) {
318     notFull.await();
319 dl 1.2 }
320 dl 1.54 enqueue(node);
321 dl 1.2 c = count.getAndIncrement();
322 dl 1.6 if (c + 1 < capacity)
323 dl 1.2 notFull.signal();
324 tim 1.17 } finally {
325 dl 1.2 putLock.unlock();
326     }
327 tim 1.12 if (c == 0)
328 dl 1.2 signalNotEmpty();
329 tim 1.1 }
330 dl 1.2
331 dholmes 1.22 /**
332     * Inserts the specified element at the tail of this queue, waiting if
333     * necessary up to the specified wait time for space to become available.
334 jsr166 1.43 *
335 jsr166 1.51 * @return {@code true} if successful, or {@code false} if
336 jsr166 1.43 * the specified waiting time elapses before space is available.
337     * @throws InterruptedException {@inheritDoc}
338     * @throws NullPointerException {@inheritDoc}
339 dholmes 1.22 */
340 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
341 dholmes 1.8 throws InterruptedException {
342 tim 1.12
343 jsr166 1.42 if (e == null) throw new NullPointerException();
344 dl 1.2 long nanos = unit.toNanos(timeout);
345     int c = -1;
346 dl 1.31 final ReentrantLock putLock = this.putLock;
347     final AtomicInteger count = this.count;
348 dholmes 1.8 putLock.lockInterruptibly();
349 dl 1.2 try {
350 jsr166 1.51 while (count.get() == capacity) {
351 dl 1.2 if (nanos <= 0)
352     return false;
353 jsr166 1.51 nanos = notFull.awaitNanos(nanos);
354 dl 1.2 }
355 dl 1.54 enqueue(new Node<E>(e));
356 jsr166 1.51 c = count.getAndIncrement();
357     if (c + 1 < capacity)
358     notFull.signal();
359 tim 1.17 } finally {
360 dl 1.2 putLock.unlock();
361     }
362 tim 1.12 if (c == 0)
363 dl 1.2 signalNotEmpty();
364     return true;
365 tim 1.1 }
366 dl 1.2
367 dl 1.23 /**
368 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
369     * possible to do so immediately without exceeding the queue's capacity,
370 jsr166 1.51 * returning {@code true} upon success and {@code false} if this queue
371 jsr166 1.44 * is full.
372     * When using a capacity-restricted queue, this method is generally
373     * preferable to method {@link BlockingQueue#add add}, which can fail to
374     * insert an element only by throwing an exception.
375 dl 1.23 *
376 jsr166 1.43 * @throws NullPointerException if the specified element is null
377 dl 1.23 */
378 jsr166 1.42 public boolean offer(E e) {
379     if (e == null) throw new NullPointerException();
380 dl 1.31 final AtomicInteger count = this.count;
381 dl 1.2 if (count.get() == capacity)
382     return false;
383 tim 1.12 int c = -1;
384 jsr166 1.60 Node<E> node = new Node<E>(e);
385 dl 1.31 final ReentrantLock putLock = this.putLock;
386 dholmes 1.8 putLock.lock();
387 dl 1.2 try {
388     if (count.get() < capacity) {
389 dl 1.54 enqueue(node);
390 dl 1.2 c = count.getAndIncrement();
391 dl 1.6 if (c + 1 < capacity)
392 dl 1.2 notFull.signal();
393     }
394 tim 1.17 } finally {
395 dl 1.2 putLock.unlock();
396     }
397 tim 1.12 if (c == 0)
398 dl 1.2 signalNotEmpty();
399     return c >= 0;
400 tim 1.1 }
401 dl 1.2
402     public E take() throws InterruptedException {
403     E x;
404     int c = -1;
405 dl 1.31 final AtomicInteger count = this.count;
406     final ReentrantLock takeLock = this.takeLock;
407 dl 1.2 takeLock.lockInterruptibly();
408     try {
409 jsr166 1.51 while (count.get() == 0) {
410     notEmpty.await();
411 dl 1.2 }
412 jsr166 1.51 x = dequeue();
413 dl 1.2 c = count.getAndDecrement();
414     if (c > 1)
415     notEmpty.signal();
416 tim 1.17 } finally {
417 dl 1.2 takeLock.unlock();
418     }
419 tim 1.12 if (c == capacity)
420 dl 1.2 signalNotFull();
421     return x;
422     }
423    
424     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
425     E x = null;
426     int c = -1;
427 dholmes 1.8 long nanos = unit.toNanos(timeout);
428 dl 1.31 final AtomicInteger count = this.count;
429     final ReentrantLock takeLock = this.takeLock;
430 dl 1.2 takeLock.lockInterruptibly();
431     try {
432 jsr166 1.51 while (count.get() == 0) {
433 dl 1.2 if (nanos <= 0)
434     return null;
435 jsr166 1.51 nanos = notEmpty.awaitNanos(nanos);
436 dl 1.2 }
437 jsr166 1.51 x = dequeue();
438     c = count.getAndDecrement();
439     if (c > 1)
440     notEmpty.signal();
441 tim 1.17 } finally {
442 dl 1.2 takeLock.unlock();
443     }
444 tim 1.12 if (c == capacity)
445 dl 1.2 signalNotFull();
446     return x;
447     }
448    
449     public E poll() {
450 dl 1.31 final AtomicInteger count = this.count;
451 dl 1.2 if (count.get() == 0)
452     return null;
453     E x = null;
454 tim 1.12 int c = -1;
455 dl 1.31 final ReentrantLock takeLock = this.takeLock;
456 dl 1.30 takeLock.lock();
457 dl 1.2 try {
458     if (count.get() > 0) {
459 jsr166 1.51 x = dequeue();
460 dl 1.2 c = count.getAndDecrement();
461     if (c > 1)
462     notEmpty.signal();
463     }
464 tim 1.17 } finally {
465 dl 1.2 takeLock.unlock();
466     }
467 tim 1.12 if (c == capacity)
468 dl 1.2 signalNotFull();
469     return x;
470 tim 1.1 }
471 dl 1.2
472     public E peek() {
473     if (count.get() == 0)
474     return null;
475 dl 1.31 final ReentrantLock takeLock = this.takeLock;
476 dholmes 1.8 takeLock.lock();
477 dl 1.2 try {
478     Node<E> first = head.next;
479     if (first == null)
480     return null;
481     else
482     return first.item;
483 tim 1.17 } finally {
484 dl 1.2 takeLock.unlock();
485     }
486 tim 1.1 }
487    
488 dl 1.35 /**
489 jsr166 1.51 * Unlinks interior Node p with predecessor trail.
490     */
491     void unlink(Node<E> p, Node<E> trail) {
492     // assert isFullyLocked();
493     // p.next is not changed, to allow iterators that are
494     // traversing p to maintain their weak-consistency guarantee.
495     p.item = null;
496     trail.next = p.next;
497     if (last == p)
498     last = trail;
499     if (count.getAndDecrement() == capacity)
500     notFull.signal();
501     }
502    
503     /**
504 jsr166 1.44 * Removes a single instance of the specified element from this queue,
505 jsr166 1.51 * if it is present. More formally, removes an element {@code e} such
506     * that {@code o.equals(e)}, if this queue contains one or more such
507 jsr166 1.44 * elements.
508 jsr166 1.51 * Returns {@code true} if this queue contained the specified element
509 jsr166 1.44 * (or equivalently, if this queue changed as a result of the call).
510     *
511     * @param o element to be removed from this queue, if present
512 jsr166 1.51 * @return {@code true} if this queue changed as a result of the call
513 dl 1.35 */
514 dholmes 1.9 public boolean remove(Object o) {
515     if (o == null) return false;
516 dl 1.2 fullyLock();
517     try {
518 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
519     p != null;
520     trail = p, p = p.next) {
521 dholmes 1.9 if (o.equals(p.item)) {
522 jsr166 1.51 unlink(p, trail);
523     return true;
524 dl 1.2 }
525     }
526 jsr166 1.51 return false;
527 tim 1.17 } finally {
528 dl 1.2 fullyUnlock();
529     }
530 tim 1.1 }
531 dl 1.2
532 jsr166 1.43 /**
533 jsr166 1.56 * Returns {@code true} if this queue contains the specified element.
534     * More formally, returns {@code true} if and only if this queue contains
535     * at least one element {@code e} such that {@code o.equals(e)}.
536     *
537     * @param o object to be checked for containment in this queue
538     * @return {@code true} if this queue contains the specified element
539     */
540     public boolean contains(Object o) {
541     if (o == null) return false;
542     fullyLock();
543     try {
544     for (Node<E> p = head.next; p != null; p = p.next)
545     if (o.equals(p.item))
546     return true;
547     return false;
548     } finally {
549     fullyUnlock();
550     }
551     }
552    
553     /**
554 jsr166 1.43 * Returns an array containing all of the elements in this queue, in
555     * proper sequence.
556     *
557     * <p>The returned array will be "safe" in that no references to it are
558     * maintained by this queue. (In other words, this method must allocate
559     * a new array). The caller is thus free to modify the returned array.
560 jsr166 1.45 *
561 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
562     * APIs.
563     *
564     * @return an array containing all of the elements in this queue
565     */
566 dl 1.2 public Object[] toArray() {
567     fullyLock();
568     try {
569     int size = count.get();
570 tim 1.12 Object[] a = new Object[size];
571 dl 1.2 int k = 0;
572 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
573 dl 1.2 a[k++] = p.item;
574     return a;
575 tim 1.17 } finally {
576 dl 1.2 fullyUnlock();
577     }
578 tim 1.1 }
579 dl 1.2
580 jsr166 1.43 /**
581     * Returns an array containing all of the elements in this queue, in
582     * proper sequence; the runtime type of the returned array is that of
583     * the specified array. If the queue fits in the specified array, it
584     * is returned therein. Otherwise, a new array is allocated with the
585     * runtime type of the specified array and the size of this queue.
586     *
587     * <p>If this queue fits in the specified array with room to spare
588     * (i.e., the array has more elements than this queue), the element in
589     * the array immediately following the end of the queue is set to
590 jsr166 1.51 * {@code null}.
591 jsr166 1.43 *
592     * <p>Like the {@link #toArray()} method, this method acts as bridge between
593     * array-based and collection-based APIs. Further, this method allows
594     * precise control over the runtime type of the output array, and may,
595     * under certain circumstances, be used to save allocation costs.
596     *
597 jsr166 1.51 * <p>Suppose {@code x} is a queue known to contain only strings.
598 jsr166 1.43 * The following code can be used to dump the queue into a newly
599 jsr166 1.51 * allocated array of {@code String}:
600 jsr166 1.43 *
601 jsr166 1.62 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
602 jsr166 1.43 *
603 jsr166 1.51 * Note that {@code toArray(new Object[0])} is identical in function to
604     * {@code toArray()}.
605 jsr166 1.43 *
606     * @param a the array into which the elements of the queue are to
607     * be stored, if it is big enough; otherwise, a new array of the
608     * same runtime type is allocated for this purpose
609     * @return an array containing all of the elements in this queue
610     * @throws ArrayStoreException if the runtime type of the specified array
611     * is not a supertype of the runtime type of every element in
612     * this queue
613     * @throws NullPointerException if the specified array is null
614     */
615 jsr166 1.51 @SuppressWarnings("unchecked")
616 dl 1.2 public <T> T[] toArray(T[] a) {
617     fullyLock();
618     try {
619     int size = count.get();
620     if (a.length < size)
621 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
622     (a.getClass().getComponentType(), size);
623 tim 1.12
624 dl 1.2 int k = 0;
625 jsr166 1.51 for (Node<E> p = head.next; p != null; p = p.next)
626 dl 1.2 a[k++] = (T)p.item;
627 jsr166 1.47 if (a.length > k)
628     a[k] = null;
629 dl 1.2 return a;
630 tim 1.17 } finally {
631 dl 1.2 fullyUnlock();
632     }
633 tim 1.1 }
634 dl 1.2
635     public String toString() {
636     fullyLock();
637     try {
638 jsr166 1.55 Node<E> p = head.next;
639     if (p == null)
640     return "[]";
641    
642     StringBuilder sb = new StringBuilder();
643     sb.append('[');
644     for (;;) {
645     E e = p.item;
646     sb.append(e == this ? "(this Collection)" : e);
647     p = p.next;
648     if (p == null)
649     return sb.append(']').toString();
650     sb.append(',').append(' ');
651     }
652 tim 1.17 } finally {
653 dl 1.2 fullyUnlock();
654     }
655 tim 1.1 }
656 dl 1.2
657 dl 1.35 /**
658     * Atomically removes all of the elements from this queue.
659     * The queue will be empty after this call returns.
660     */
661 dl 1.24 public void clear() {
662     fullyLock();
663     try {
664 jsr166 1.51 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
665     h.next = h;
666     p.item = null;
667     }
668     head = last;
669     // assert head.item == null && head.next == null;
670 dl 1.24 if (count.getAndSet(0) == capacity)
671 jsr166 1.51 notFull.signal();
672 dl 1.24 } finally {
673     fullyUnlock();
674     }
675     }
676    
677 jsr166 1.43 /**
678     * @throws UnsupportedOperationException {@inheritDoc}
679     * @throws ClassCastException {@inheritDoc}
680     * @throws NullPointerException {@inheritDoc}
681     * @throws IllegalArgumentException {@inheritDoc}
682     */
683 dl 1.24 public int drainTo(Collection<? super E> c) {
684 jsr166 1.51 return drainTo(c, Integer.MAX_VALUE);
685 dl 1.24 }
686 jsr166 1.40
687 jsr166 1.43 /**
688     * @throws UnsupportedOperationException {@inheritDoc}
689     * @throws ClassCastException {@inheritDoc}
690     * @throws NullPointerException {@inheritDoc}
691     * @throws IllegalArgumentException {@inheritDoc}
692     */
693 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
694     if (c == null)
695     throw new NullPointerException();
696     if (c == this)
697     throw new IllegalArgumentException();
698 jsr166 1.63 if (maxElements <= 0)
699     return 0;
700 jsr166 1.51 boolean signalNotFull = false;
701     final ReentrantLock takeLock = this.takeLock;
702     takeLock.lock();
703 dl 1.24 try {
704 jsr166 1.51 int n = Math.min(maxElements, count.get());
705     // count.get provides visibility to first n Nodes
706     Node<E> h = head;
707     int i = 0;
708     try {
709     while (i < n) {
710     Node<E> p = h.next;
711     c.add(p.item);
712     p.item = null;
713     h.next = h;
714     h = p;
715     ++i;
716     }
717     return n;
718     } finally {
719     // Restore invariants even if c.add() threw
720     if (i > 0) {
721     // assert h.item == null;
722     head = h;
723     signalNotFull = (count.getAndAdd(-i) == capacity);
724     }
725 dl 1.24 }
726     } finally {
727 jsr166 1.51 takeLock.unlock();
728     if (signalNotFull)
729     signalNotFull();
730 dl 1.24 }
731     }
732    
733 dholmes 1.14 /**
734     * Returns an iterator over the elements in this queue in proper sequence.
735 jsr166 1.57 * The elements will be returned in order from first (head) to last (tail).
736     *
737     * <p>The returned iterator is a "weakly consistent" iterator that
738 jsr166 1.52 * will never throw {@link java.util.ConcurrentModificationException
739 jsr166 1.57 * ConcurrentModificationException}, and guarantees to traverse
740     * elements as they existed upon construction of the iterator, and
741     * may (but is not guaranteed to) reflect any modifications
742     * subsequent to construction.
743 dholmes 1.14 *
744 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
745 dholmes 1.14 */
746 dl 1.2 public Iterator<E> iterator() {
747 jsr166 1.59 return new Itr();
748 tim 1.1 }
749 dl 1.2
750     private class Itr implements Iterator<E> {
751 tim 1.12 /*
752 jsr166 1.51 * Basic weakly-consistent iterator. At all times hold the next
753 dl 1.4 * item to hand out so that if hasNext() reports true, we will
754     * still have it to return even if lost race with a take etc.
755     */
756 dl 1.31 private Node<E> current;
757     private Node<E> lastRet;
758     private E currentElement;
759 tim 1.12
760 dl 1.2 Itr() {
761 jsr166 1.51 fullyLock();
762 dl 1.2 try {
763     current = head.next;
764 dl 1.4 if (current != null)
765     currentElement = current.item;
766 tim 1.17 } finally {
767 jsr166 1.51 fullyUnlock();
768 dl 1.2 }
769     }
770 tim 1.12
771     public boolean hasNext() {
772 dl 1.2 return current != null;
773     }
774    
775 jsr166 1.51 /**
776 jsr166 1.53 * Returns the next live successor of p, or null if no such.
777     *
778     * Unlike other traversal methods, iterators need to handle both:
779 jsr166 1.51 * - dequeued nodes (p.next == p)
780 jsr166 1.53 * - (possibly multiple) interior removed nodes (p.item == null)
781 jsr166 1.51 */
782     private Node<E> nextNode(Node<E> p) {
783 jsr166 1.53 for (;;) {
784     Node<E> s = p.next;
785     if (s == p)
786     return head.next;
787     if (s == null || s.item != null)
788     return s;
789     p = s;
790     }
791 jsr166 1.51 }
792    
793 tim 1.12 public E next() {
794 jsr166 1.51 fullyLock();
795 dl 1.2 try {
796     if (current == null)
797     throw new NoSuchElementException();
798 dl 1.4 E x = currentElement;
799 dl 1.2 lastRet = current;
800 jsr166 1.51 current = nextNode(current);
801     currentElement = (current == null) ? null : current.item;
802 dl 1.2 return x;
803 tim 1.17 } finally {
804 jsr166 1.51 fullyUnlock();
805 dl 1.2 }
806     }
807    
808 tim 1.12 public void remove() {
809 dl 1.2 if (lastRet == null)
810 tim 1.12 throw new IllegalStateException();
811 jsr166 1.51 fullyLock();
812 dl 1.2 try {
813     Node<E> node = lastRet;
814     lastRet = null;
815 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
816     p != null;
817     trail = p, p = p.next) {
818     if (p == node) {
819     unlink(p, trail);
820     break;
821     }
822 dl 1.2 }
823 tim 1.17 } finally {
824 jsr166 1.51 fullyUnlock();
825 dl 1.2 }
826     }
827 tim 1.1 }
828 dl 1.2
829     /**
830 jsr166 1.68 * Saves this queue to a stream (that is, serializes it).
831 dl 1.2 *
832     * @serialData The capacity is emitted (int), followed by all of
833 jsr166 1.51 * its elements (each an {@code Object}) in the proper order,
834 dl 1.2 * followed by a null
835     */
836     private void writeObject(java.io.ObjectOutputStream s)
837     throws java.io.IOException {
838    
839 tim 1.12 fullyLock();
840 dl 1.2 try {
841     // Write out any hidden stuff, plus capacity
842     s.defaultWriteObject();
843    
844     // Write out all elements in the proper order.
845 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
846 dl 1.2 s.writeObject(p.item);
847    
848     // Use trailing null as sentinel
849     s.writeObject(null);
850 tim 1.17 } finally {
851 dl 1.2 fullyUnlock();
852     }
853 tim 1.1 }
854    
855 dl 1.2 /**
856 jsr166 1.65 * Reconstitutes this queue from a stream (that is, deserializes it).
857 dl 1.2 */
858     private void readObject(java.io.ObjectInputStream s)
859     throws java.io.IOException, ClassNotFoundException {
860 tim 1.12 // Read in capacity, and any hidden stuff
861     s.defaultReadObject();
862 dl 1.2
863 dl 1.19 count.set(0);
864     last = head = new Node<E>(null);
865    
866 dl 1.6 // Read in all elements and place in queue
867 dl 1.2 for (;;) {
868 jsr166 1.51 @SuppressWarnings("unchecked")
869 dl 1.2 E item = (E)s.readObject();
870     if (item == null)
871     break;
872     add(item);
873     }
874 tim 1.1 }
875     }