ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.67
Committed: Thu Nov 24 02:35:13 2011 UTC (12 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.66: +0 -2 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.51
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14     import java.util.Iterator;
15     import java.util.NoSuchElementException;
16 tim 1.1
17     /**
18 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
19 dholmes 1.8 * linked nodes.
20     * This queue orders elements FIFO (first-in-first-out).
21 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
22 dholmes 1.8 * queue the longest time.
23     * The <em>tail</em> of the queue is that element that has been on the
24 dl 1.20 * queue the shortest time. New elements
25     * are inserted at the tail of the queue, and the queue retrieval
26     * operations obtain elements at the head of the queue.
27 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
28     * less predictable performance in most concurrent applications.
29 tim 1.12 *
30 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
31 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
32     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
33 dl 1.3 * dynamically created upon each insertion unless this would bring the
34     * queue above capacity.
35 dholmes 1.8 *
36 dl 1.36 * <p>This class and its iterator implement all of the
37     * <em>optional</em> methods of the {@link Collection} and {@link
38 dl 1.38 * Iterator} interfaces.
39 dl 1.21 *
40 dl 1.34 * <p>This class is a member of the
41 jsr166 1.48 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 dl 1.34 * Java Collections Framework</a>.
43     *
44 dl 1.6 * @since 1.5
45     * @author Doug Lea
46 dl 1.27 * @param <E> the type of elements held in this collection
47 tim 1.12 *
48 jsr166 1.40 */
49 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
50 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
51 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
52 tim 1.1
53 dl 1.2 /*
54     * A variant of the "two lock queue" algorithm. The putLock gates
55     * entry to put (and offer), and has an associated condition for
56     * waiting puts. Similarly for the takeLock. The "count" field
57     * that they both rely on is maintained as an atomic to avoid
58     * needing to get both locks in most cases. Also, to minimize need
59     * for puts to get takeLock and vice-versa, cascading notifies are
60     * used. When a put notices that it has enabled at least one take,
61     * it signals taker. That taker in turn signals others if more
62     * items have been entered since the signal. And symmetrically for
63 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
64 dl 1.2 * iterators acquire both locks.
65 jsr166 1.51 *
66     * Visibility between writers and readers is provided as follows:
67     *
68     * Whenever an element is enqueued, the putLock is acquired and
69     * count updated. A subsequent reader guarantees visibility to the
70     * enqueued Node by either acquiring the putLock (via fullyLock)
71     * or by acquiring the takeLock, and then reading n = count.get();
72     * this gives visibility to the first n items.
73     *
74     * To implement weakly consistent iterators, it appears we need to
75     * keep all Nodes GC-reachable from a predecessor dequeued Node.
76     * That would cause two problems:
77     * - allow a rogue Iterator to cause unbounded memory retention
78     * - cause cross-generational linking of old Nodes to new Nodes if
79     * a Node was tenured while live, which generational GCs have a
80     * hard time dealing with, causing repeated major collections.
81     * However, only non-deleted Nodes need to be reachable from
82     * dequeued Nodes, and reachability does not necessarily have to
83     * be of the kind understood by the GC. We use the trick of
84     * linking a Node that has just been dequeued to itself. Such a
85     * self-link implicitly means to advance to head.next.
86 dl 1.38 */
87 dl 1.2
88 dl 1.6 /**
89     * Linked list node class
90     */
91 dl 1.2 static class Node<E> {
92 jsr166 1.51 E item;
93    
94     /**
95     * One of:
96     * - the real successor Node
97     * - this Node, meaning the successor is head.next
98     * - null, meaning there is no successor (this is the last node)
99     */
100 dl 1.2 Node<E> next;
101 jsr166 1.51
102 dl 1.2 Node(E x) { item = x; }
103     }
104    
105 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
106 dl 1.2 private final int capacity;
107 dl 1.6
108     /** Current number of elements */
109 jsr166 1.61 private final AtomicInteger count = new AtomicInteger();
110 dl 1.2
111 jsr166 1.51 /**
112     * Head of linked list.
113     * Invariant: head.item == null
114     */
115 jsr166 1.64 transient Node<E> head;
116 dl 1.6
117 jsr166 1.51 /**
118     * Tail of linked list.
119     * Invariant: last.next == null
120     */
121 dl 1.6 private transient Node<E> last;
122 dl 1.2
123 dl 1.6 /** Lock held by take, poll, etc */
124 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
125 dl 1.6
126     /** Wait queue for waiting takes */
127 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
128 dl 1.2
129 dl 1.6 /** Lock held by put, offer, etc */
130 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
131 dl 1.6
132     /** Wait queue for waiting puts */
133 dl 1.32 private final Condition notFull = putLock.newCondition();
134 dl 1.2
135     /**
136 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
137 dl 1.4 * otherwise ordinarily lock takeLock.)
138 dl 1.2 */
139     private void signalNotEmpty() {
140 dl 1.31 final ReentrantLock takeLock = this.takeLock;
141 dl 1.2 takeLock.lock();
142     try {
143     notEmpty.signal();
144 tim 1.17 } finally {
145 dl 1.2 takeLock.unlock();
146     }
147     }
148    
149     /**
150 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
151 dl 1.2 */
152     private void signalNotFull() {
153 dl 1.31 final ReentrantLock putLock = this.putLock;
154 dl 1.2 putLock.lock();
155     try {
156     notFull.signal();
157 tim 1.17 } finally {
158 dl 1.2 putLock.unlock();
159     }
160     }
161    
162     /**
163 dl 1.54 * Links node at end of queue.
164 jsr166 1.51 *
165 dl 1.54 * @param node the node
166 dl 1.2 */
167 dl 1.54 private void enqueue(Node<E> node) {
168 jsr166 1.51 // assert putLock.isHeldByCurrentThread();
169     // assert last.next == null;
170 dl 1.54 last = last.next = node;
171 dl 1.2 }
172    
173     /**
174 jsr166 1.51 * Removes a node from head of queue.
175     *
176 dl 1.6 * @return the node
177 dl 1.2 */
178 jsr166 1.51 private E dequeue() {
179     // assert takeLock.isHeldByCurrentThread();
180     // assert head.item == null;
181 dl 1.50 Node<E> h = head;
182     Node<E> first = h.next;
183 jsr166 1.51 h.next = h; // help GC
184 dl 1.2 head = first;
185 dl 1.28 E x = first.item;
186 dl 1.2 first.item = null;
187     return x;
188     }
189    
190     /**
191 tim 1.12 * Lock to prevent both puts and takes.
192 dl 1.2 */
193 jsr166 1.51 void fullyLock() {
194 dl 1.2 putLock.lock();
195     takeLock.lock();
196 tim 1.1 }
197 dl 1.2
198     /**
199 tim 1.12 * Unlock to allow both puts and takes.
200 dl 1.2 */
201 jsr166 1.51 void fullyUnlock() {
202 dl 1.2 takeLock.unlock();
203     putLock.unlock();
204     }
205    
206 jsr166 1.51 // /**
207     // * Tells whether both locks are held by current thread.
208     // */
209     // boolean isFullyLocked() {
210     // return (putLock.isHeldByCurrentThread() &&
211     // takeLock.isHeldByCurrentThread());
212     // }
213 dl 1.2
214     /**
215 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
216 dholmes 1.8 * {@link Integer#MAX_VALUE}.
217 dl 1.2 */
218     public LinkedBlockingQueue() {
219     this(Integer.MAX_VALUE);
220     }
221    
222     /**
223 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
224 tim 1.16 *
225 jsr166 1.43 * @param capacity the capacity of this queue
226 jsr166 1.51 * @throws IllegalArgumentException if {@code capacity} is not greater
227 jsr166 1.43 * than zero
228 dl 1.2 */
229     public LinkedBlockingQueue(int capacity) {
230 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
231 dl 1.2 this.capacity = capacity;
232 dl 1.6 last = head = new Node<E>(null);
233 dl 1.2 }
234    
235     /**
236 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
237 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
238 tim 1.12 * given collection,
239 dholmes 1.8 * added in traversal order of the collection's iterator.
240 jsr166 1.43 *
241 dholmes 1.9 * @param c the collection of elements to initially contain
242 jsr166 1.43 * @throws NullPointerException if the specified collection or any
243     * of its elements are null
244 dl 1.2 */
245 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
246 dl 1.2 this(Integer.MAX_VALUE);
247 jsr166 1.51 final ReentrantLock putLock = this.putLock;
248     putLock.lock(); // Never contended, but necessary for visibility
249     try {
250     int n = 0;
251     for (E e : c) {
252     if (e == null)
253     throw new NullPointerException();
254     if (n == capacity)
255     throw new IllegalStateException("Queue full");
256 dl 1.54 enqueue(new Node<E>(e));
257 jsr166 1.51 ++n;
258     }
259     count.set(n);
260     } finally {
261     putLock.unlock();
262     }
263 dl 1.2 }
264    
265 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
266     // greater in size than Integer.MAX_VALUE
267 tim 1.12 /**
268 dl 1.20 * Returns the number of elements in this queue.
269     *
270 jsr166 1.43 * @return the number of elements in this queue
271 dholmes 1.8 */
272 dl 1.2 public int size() {
273     return count.get();
274 tim 1.1 }
275 dl 1.2
276 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
277     // without the reference to unlimited queues.
278 tim 1.12 /**
279 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
280     * (in the absence of memory or resource constraints) accept without
281 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
282 jsr166 1.51 * less the current {@code size} of this queue.
283 jsr166 1.41 *
284     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
285 jsr166 1.51 * an element will succeed by inspecting {@code remainingCapacity}
286 jsr166 1.41 * because it may be the case that another thread is about to
287 jsr166 1.43 * insert or remove an element.
288 dholmes 1.8 */
289 dl 1.2 public int remainingCapacity() {
290     return capacity - count.get();
291     }
292    
293 dholmes 1.22 /**
294 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
295 dholmes 1.22 * necessary for space to become available.
296 jsr166 1.43 *
297     * @throws InterruptedException {@inheritDoc}
298     * @throws NullPointerException {@inheritDoc}
299 dholmes 1.22 */
300 jsr166 1.42 public void put(E e) throws InterruptedException {
301     if (e == null) throw new NullPointerException();
302 jsr166 1.51 // Note: convention in all put/take/etc is to preset local var
303     // holding count negative to indicate failure unless set.
304 tim 1.12 int c = -1;
305 jsr166 1.60 Node<E> node = new Node<E>(e);
306 dl 1.31 final ReentrantLock putLock = this.putLock;
307     final AtomicInteger count = this.count;
308 dl 1.2 putLock.lockInterruptibly();
309     try {
310     /*
311     * Note that count is used in wait guard even though it is
312     * not protected by lock. This works because count can
313     * only decrease at this point (all other puts are shut
314     * out by lock), and we (or some other waiting put) are
315 jsr166 1.51 * signalled if it ever changes from capacity. Similarly
316     * for all other uses of count in other wait guards.
317 dl 1.2 */
318 jsr166 1.51 while (count.get() == capacity) {
319     notFull.await();
320 dl 1.2 }
321 dl 1.54 enqueue(node);
322 dl 1.2 c = count.getAndIncrement();
323 dl 1.6 if (c + 1 < capacity)
324 dl 1.2 notFull.signal();
325 tim 1.17 } finally {
326 dl 1.2 putLock.unlock();
327     }
328 tim 1.12 if (c == 0)
329 dl 1.2 signalNotEmpty();
330 tim 1.1 }
331 dl 1.2
332 dholmes 1.22 /**
333     * Inserts the specified element at the tail of this queue, waiting if
334     * necessary up to the specified wait time for space to become available.
335 jsr166 1.43 *
336 jsr166 1.51 * @return {@code true} if successful, or {@code false} if
337 jsr166 1.43 * the specified waiting time elapses before space is available.
338     * @throws InterruptedException {@inheritDoc}
339     * @throws NullPointerException {@inheritDoc}
340 dholmes 1.22 */
341 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
342 dholmes 1.8 throws InterruptedException {
343 tim 1.12
344 jsr166 1.42 if (e == null) throw new NullPointerException();
345 dl 1.2 long nanos = unit.toNanos(timeout);
346     int c = -1;
347 dl 1.31 final ReentrantLock putLock = this.putLock;
348     final AtomicInteger count = this.count;
349 dholmes 1.8 putLock.lockInterruptibly();
350 dl 1.2 try {
351 jsr166 1.51 while (count.get() == capacity) {
352 dl 1.2 if (nanos <= 0)
353     return false;
354 jsr166 1.51 nanos = notFull.awaitNanos(nanos);
355 dl 1.2 }
356 dl 1.54 enqueue(new Node<E>(e));
357 jsr166 1.51 c = count.getAndIncrement();
358     if (c + 1 < capacity)
359     notFull.signal();
360 tim 1.17 } finally {
361 dl 1.2 putLock.unlock();
362     }
363 tim 1.12 if (c == 0)
364 dl 1.2 signalNotEmpty();
365     return true;
366 tim 1.1 }
367 dl 1.2
368 dl 1.23 /**
369 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
370     * possible to do so immediately without exceeding the queue's capacity,
371 jsr166 1.51 * returning {@code true} upon success and {@code false} if this queue
372 jsr166 1.44 * is full.
373     * When using a capacity-restricted queue, this method is generally
374     * preferable to method {@link BlockingQueue#add add}, which can fail to
375     * insert an element only by throwing an exception.
376 dl 1.23 *
377 jsr166 1.43 * @throws NullPointerException if the specified element is null
378 dl 1.23 */
379 jsr166 1.42 public boolean offer(E e) {
380     if (e == null) throw new NullPointerException();
381 dl 1.31 final AtomicInteger count = this.count;
382 dl 1.2 if (count.get() == capacity)
383     return false;
384 tim 1.12 int c = -1;
385 jsr166 1.60 Node<E> node = new Node<E>(e);
386 dl 1.31 final ReentrantLock putLock = this.putLock;
387 dholmes 1.8 putLock.lock();
388 dl 1.2 try {
389     if (count.get() < capacity) {
390 dl 1.54 enqueue(node);
391 dl 1.2 c = count.getAndIncrement();
392 dl 1.6 if (c + 1 < capacity)
393 dl 1.2 notFull.signal();
394     }
395 tim 1.17 } finally {
396 dl 1.2 putLock.unlock();
397     }
398 tim 1.12 if (c == 0)
399 dl 1.2 signalNotEmpty();
400     return c >= 0;
401 tim 1.1 }
402 dl 1.2
403     public E take() throws InterruptedException {
404     E x;
405     int c = -1;
406 dl 1.31 final AtomicInteger count = this.count;
407     final ReentrantLock takeLock = this.takeLock;
408 dl 1.2 takeLock.lockInterruptibly();
409     try {
410 jsr166 1.51 while (count.get() == 0) {
411     notEmpty.await();
412 dl 1.2 }
413 jsr166 1.51 x = dequeue();
414 dl 1.2 c = count.getAndDecrement();
415     if (c > 1)
416     notEmpty.signal();
417 tim 1.17 } finally {
418 dl 1.2 takeLock.unlock();
419     }
420 tim 1.12 if (c == capacity)
421 dl 1.2 signalNotFull();
422     return x;
423     }
424    
425     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
426     E x = null;
427     int c = -1;
428 dholmes 1.8 long nanos = unit.toNanos(timeout);
429 dl 1.31 final AtomicInteger count = this.count;
430     final ReentrantLock takeLock = this.takeLock;
431 dl 1.2 takeLock.lockInterruptibly();
432     try {
433 jsr166 1.51 while (count.get() == 0) {
434 dl 1.2 if (nanos <= 0)
435     return null;
436 jsr166 1.51 nanos = notEmpty.awaitNanos(nanos);
437 dl 1.2 }
438 jsr166 1.51 x = dequeue();
439     c = count.getAndDecrement();
440     if (c > 1)
441     notEmpty.signal();
442 tim 1.17 } finally {
443 dl 1.2 takeLock.unlock();
444     }
445 tim 1.12 if (c == capacity)
446 dl 1.2 signalNotFull();
447     return x;
448     }
449    
450     public E poll() {
451 dl 1.31 final AtomicInteger count = this.count;
452 dl 1.2 if (count.get() == 0)
453     return null;
454     E x = null;
455 tim 1.12 int c = -1;
456 dl 1.31 final ReentrantLock takeLock = this.takeLock;
457 dl 1.30 takeLock.lock();
458 dl 1.2 try {
459     if (count.get() > 0) {
460 jsr166 1.51 x = dequeue();
461 dl 1.2 c = count.getAndDecrement();
462     if (c > 1)
463     notEmpty.signal();
464     }
465 tim 1.17 } finally {
466 dl 1.2 takeLock.unlock();
467     }
468 tim 1.12 if (c == capacity)
469 dl 1.2 signalNotFull();
470     return x;
471 tim 1.1 }
472 dl 1.2
473     public E peek() {
474     if (count.get() == 0)
475     return null;
476 dl 1.31 final ReentrantLock takeLock = this.takeLock;
477 dholmes 1.8 takeLock.lock();
478 dl 1.2 try {
479     Node<E> first = head.next;
480     if (first == null)
481     return null;
482     else
483     return first.item;
484 tim 1.17 } finally {
485 dl 1.2 takeLock.unlock();
486     }
487 tim 1.1 }
488    
489 dl 1.35 /**
490 jsr166 1.51 * Unlinks interior Node p with predecessor trail.
491     */
492     void unlink(Node<E> p, Node<E> trail) {
493     // assert isFullyLocked();
494     // p.next is not changed, to allow iterators that are
495     // traversing p to maintain their weak-consistency guarantee.
496     p.item = null;
497     trail.next = p.next;
498     if (last == p)
499     last = trail;
500     if (count.getAndDecrement() == capacity)
501     notFull.signal();
502     }
503    
504     /**
505 jsr166 1.44 * Removes a single instance of the specified element from this queue,
506 jsr166 1.51 * if it is present. More formally, removes an element {@code e} such
507     * that {@code o.equals(e)}, if this queue contains one or more such
508 jsr166 1.44 * elements.
509 jsr166 1.51 * Returns {@code true} if this queue contained the specified element
510 jsr166 1.44 * (or equivalently, if this queue changed as a result of the call).
511     *
512     * @param o element to be removed from this queue, if present
513 jsr166 1.51 * @return {@code true} if this queue changed as a result of the call
514 dl 1.35 */
515 dholmes 1.9 public boolean remove(Object o) {
516     if (o == null) return false;
517 dl 1.2 fullyLock();
518     try {
519 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
520     p != null;
521     trail = p, p = p.next) {
522 dholmes 1.9 if (o.equals(p.item)) {
523 jsr166 1.51 unlink(p, trail);
524     return true;
525 dl 1.2 }
526     }
527 jsr166 1.51 return false;
528 tim 1.17 } finally {
529 dl 1.2 fullyUnlock();
530     }
531 tim 1.1 }
532 dl 1.2
533 jsr166 1.43 /**
534 jsr166 1.56 * Returns {@code true} if this queue contains the specified element.
535     * More formally, returns {@code true} if and only if this queue contains
536     * at least one element {@code e} such that {@code o.equals(e)}.
537     *
538     * @param o object to be checked for containment in this queue
539     * @return {@code true} if this queue contains the specified element
540     */
541     public boolean contains(Object o) {
542     if (o == null) return false;
543     fullyLock();
544     try {
545     for (Node<E> p = head.next; p != null; p = p.next)
546     if (o.equals(p.item))
547     return true;
548     return false;
549     } finally {
550     fullyUnlock();
551     }
552     }
553    
554     /**
555 jsr166 1.43 * Returns an array containing all of the elements in this queue, in
556     * proper sequence.
557     *
558     * <p>The returned array will be "safe" in that no references to it are
559     * maintained by this queue. (In other words, this method must allocate
560     * a new array). The caller is thus free to modify the returned array.
561 jsr166 1.45 *
562 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
563     * APIs.
564     *
565     * @return an array containing all of the elements in this queue
566     */
567 dl 1.2 public Object[] toArray() {
568     fullyLock();
569     try {
570     int size = count.get();
571 tim 1.12 Object[] a = new Object[size];
572 dl 1.2 int k = 0;
573 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
574 dl 1.2 a[k++] = p.item;
575     return a;
576 tim 1.17 } finally {
577 dl 1.2 fullyUnlock();
578     }
579 tim 1.1 }
580 dl 1.2
581 jsr166 1.43 /**
582     * Returns an array containing all of the elements in this queue, in
583     * proper sequence; the runtime type of the returned array is that of
584     * the specified array. If the queue fits in the specified array, it
585     * is returned therein. Otherwise, a new array is allocated with the
586     * runtime type of the specified array and the size of this queue.
587     *
588     * <p>If this queue fits in the specified array with room to spare
589     * (i.e., the array has more elements than this queue), the element in
590     * the array immediately following the end of the queue is set to
591 jsr166 1.51 * {@code null}.
592 jsr166 1.43 *
593     * <p>Like the {@link #toArray()} method, this method acts as bridge between
594     * array-based and collection-based APIs. Further, this method allows
595     * precise control over the runtime type of the output array, and may,
596     * under certain circumstances, be used to save allocation costs.
597     *
598 jsr166 1.51 * <p>Suppose {@code x} is a queue known to contain only strings.
599 jsr166 1.43 * The following code can be used to dump the queue into a newly
600 jsr166 1.51 * allocated array of {@code String}:
601 jsr166 1.43 *
602 jsr166 1.62 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
603 jsr166 1.43 *
604 jsr166 1.51 * Note that {@code toArray(new Object[0])} is identical in function to
605     * {@code toArray()}.
606 jsr166 1.43 *
607     * @param a the array into which the elements of the queue are to
608     * be stored, if it is big enough; otherwise, a new array of the
609     * same runtime type is allocated for this purpose
610     * @return an array containing all of the elements in this queue
611     * @throws ArrayStoreException if the runtime type of the specified array
612     * is not a supertype of the runtime type of every element in
613     * this queue
614     * @throws NullPointerException if the specified array is null
615     */
616 jsr166 1.51 @SuppressWarnings("unchecked")
617 dl 1.2 public <T> T[] toArray(T[] a) {
618     fullyLock();
619     try {
620     int size = count.get();
621     if (a.length < size)
622 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
623     (a.getClass().getComponentType(), size);
624 tim 1.12
625 dl 1.2 int k = 0;
626 jsr166 1.51 for (Node<E> p = head.next; p != null; p = p.next)
627 dl 1.2 a[k++] = (T)p.item;
628 jsr166 1.47 if (a.length > k)
629     a[k] = null;
630 dl 1.2 return a;
631 tim 1.17 } finally {
632 dl 1.2 fullyUnlock();
633     }
634 tim 1.1 }
635 dl 1.2
636     public String toString() {
637     fullyLock();
638     try {
639 jsr166 1.55 Node<E> p = head.next;
640     if (p == null)
641     return "[]";
642    
643     StringBuilder sb = new StringBuilder();
644     sb.append('[');
645     for (;;) {
646     E e = p.item;
647     sb.append(e == this ? "(this Collection)" : e);
648     p = p.next;
649     if (p == null)
650     return sb.append(']').toString();
651     sb.append(',').append(' ');
652     }
653 tim 1.17 } finally {
654 dl 1.2 fullyUnlock();
655     }
656 tim 1.1 }
657 dl 1.2
658 dl 1.35 /**
659     * Atomically removes all of the elements from this queue.
660     * The queue will be empty after this call returns.
661     */
662 dl 1.24 public void clear() {
663     fullyLock();
664     try {
665 jsr166 1.51 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
666     h.next = h;
667     p.item = null;
668     }
669     head = last;
670     // assert head.item == null && head.next == null;
671 dl 1.24 if (count.getAndSet(0) == capacity)
672 jsr166 1.51 notFull.signal();
673 dl 1.24 } finally {
674     fullyUnlock();
675     }
676     }
677    
678 jsr166 1.43 /**
679     * @throws UnsupportedOperationException {@inheritDoc}
680     * @throws ClassCastException {@inheritDoc}
681     * @throws NullPointerException {@inheritDoc}
682     * @throws IllegalArgumentException {@inheritDoc}
683     */
684 dl 1.24 public int drainTo(Collection<? super E> c) {
685 jsr166 1.51 return drainTo(c, Integer.MAX_VALUE);
686 dl 1.24 }
687 jsr166 1.40
688 jsr166 1.43 /**
689     * @throws UnsupportedOperationException {@inheritDoc}
690     * @throws ClassCastException {@inheritDoc}
691     * @throws NullPointerException {@inheritDoc}
692     * @throws IllegalArgumentException {@inheritDoc}
693     */
694 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
695     if (c == null)
696     throw new NullPointerException();
697     if (c == this)
698     throw new IllegalArgumentException();
699 jsr166 1.63 if (maxElements <= 0)
700     return 0;
701 jsr166 1.51 boolean signalNotFull = false;
702     final ReentrantLock takeLock = this.takeLock;
703     takeLock.lock();
704 dl 1.24 try {
705 jsr166 1.51 int n = Math.min(maxElements, count.get());
706     // count.get provides visibility to first n Nodes
707     Node<E> h = head;
708     int i = 0;
709     try {
710     while (i < n) {
711     Node<E> p = h.next;
712     c.add(p.item);
713     p.item = null;
714     h.next = h;
715     h = p;
716     ++i;
717     }
718     return n;
719     } finally {
720     // Restore invariants even if c.add() threw
721     if (i > 0) {
722     // assert h.item == null;
723     head = h;
724     signalNotFull = (count.getAndAdd(-i) == capacity);
725     }
726 dl 1.24 }
727     } finally {
728 jsr166 1.51 takeLock.unlock();
729     if (signalNotFull)
730     signalNotFull();
731 dl 1.24 }
732     }
733    
734 dholmes 1.14 /**
735     * Returns an iterator over the elements in this queue in proper sequence.
736 jsr166 1.57 * The elements will be returned in order from first (head) to last (tail).
737     *
738     * <p>The returned iterator is a "weakly consistent" iterator that
739 jsr166 1.52 * will never throw {@link java.util.ConcurrentModificationException
740 jsr166 1.57 * ConcurrentModificationException}, and guarantees to traverse
741     * elements as they existed upon construction of the iterator, and
742     * may (but is not guaranteed to) reflect any modifications
743     * subsequent to construction.
744 dholmes 1.14 *
745 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
746 dholmes 1.14 */
747 dl 1.2 public Iterator<E> iterator() {
748 jsr166 1.59 return new Itr();
749 tim 1.1 }
750 dl 1.2
751     private class Itr implements Iterator<E> {
752 tim 1.12 /*
753 jsr166 1.51 * Basic weakly-consistent iterator. At all times hold the next
754 dl 1.4 * item to hand out so that if hasNext() reports true, we will
755     * still have it to return even if lost race with a take etc.
756     */
757 dl 1.31 private Node<E> current;
758     private Node<E> lastRet;
759     private E currentElement;
760 tim 1.12
761 dl 1.2 Itr() {
762 jsr166 1.51 fullyLock();
763 dl 1.2 try {
764     current = head.next;
765 dl 1.4 if (current != null)
766     currentElement = current.item;
767 tim 1.17 } finally {
768 jsr166 1.51 fullyUnlock();
769 dl 1.2 }
770     }
771 tim 1.12
772     public boolean hasNext() {
773 dl 1.2 return current != null;
774     }
775    
776 jsr166 1.51 /**
777 jsr166 1.53 * Returns the next live successor of p, or null if no such.
778     *
779     * Unlike other traversal methods, iterators need to handle both:
780 jsr166 1.51 * - dequeued nodes (p.next == p)
781 jsr166 1.53 * - (possibly multiple) interior removed nodes (p.item == null)
782 jsr166 1.51 */
783     private Node<E> nextNode(Node<E> p) {
784 jsr166 1.53 for (;;) {
785     Node<E> s = p.next;
786     if (s == p)
787     return head.next;
788     if (s == null || s.item != null)
789     return s;
790     p = s;
791     }
792 jsr166 1.51 }
793    
794 tim 1.12 public E next() {
795 jsr166 1.51 fullyLock();
796 dl 1.2 try {
797     if (current == null)
798     throw new NoSuchElementException();
799 dl 1.4 E x = currentElement;
800 dl 1.2 lastRet = current;
801 jsr166 1.51 current = nextNode(current);
802     currentElement = (current == null) ? null : current.item;
803 dl 1.2 return x;
804 tim 1.17 } finally {
805 jsr166 1.51 fullyUnlock();
806 dl 1.2 }
807     }
808    
809 tim 1.12 public void remove() {
810 dl 1.2 if (lastRet == null)
811 tim 1.12 throw new IllegalStateException();
812 jsr166 1.51 fullyLock();
813 dl 1.2 try {
814     Node<E> node = lastRet;
815     lastRet = null;
816 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
817     p != null;
818     trail = p, p = p.next) {
819     if (p == node) {
820     unlink(p, trail);
821     break;
822     }
823 dl 1.2 }
824 tim 1.17 } finally {
825 jsr166 1.51 fullyUnlock();
826 dl 1.2 }
827     }
828 tim 1.1 }
829 dl 1.2
830     /**
831 jsr166 1.66 * Saves the state to a stream (that is, serializes it).
832 dl 1.2 *
833     * @serialData The capacity is emitted (int), followed by all of
834 jsr166 1.51 * its elements (each an {@code Object}) in the proper order,
835 dl 1.2 * followed by a null
836 dl 1.6 * @param s the stream
837 dl 1.2 */
838     private void writeObject(java.io.ObjectOutputStream s)
839     throws java.io.IOException {
840    
841 tim 1.12 fullyLock();
842 dl 1.2 try {
843     // Write out any hidden stuff, plus capacity
844     s.defaultWriteObject();
845    
846     // Write out all elements in the proper order.
847 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
848 dl 1.2 s.writeObject(p.item);
849    
850     // Use trailing null as sentinel
851     s.writeObject(null);
852 tim 1.17 } finally {
853 dl 1.2 fullyUnlock();
854     }
855 tim 1.1 }
856    
857 dl 1.2 /**
858 jsr166 1.65 * Reconstitutes this queue from a stream (that is, deserializes it).
859 jsr166 1.51 *
860 dl 1.6 * @param s the stream
861 dl 1.2 */
862     private void readObject(java.io.ObjectInputStream s)
863     throws java.io.IOException, ClassNotFoundException {
864 tim 1.12 // Read in capacity, and any hidden stuff
865     s.defaultReadObject();
866 dl 1.2
867 dl 1.19 count.set(0);
868     last = head = new Node<E>(null);
869    
870 dl 1.6 // Read in all elements and place in queue
871 dl 1.2 for (;;) {
872 jsr166 1.51 @SuppressWarnings("unchecked")
873 dl 1.2 E item = (E)s.readObject();
874     if (item == null)
875     break;
876     add(item);
877     }
878 tim 1.1 }
879     }