ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.66
Committed: Tue Oct 25 18:46:37 2011 UTC (12 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.65: +1 -1 lines
Log Message:
tidy javadoc of readObject/writeObject methods

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.51
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14     import java.util.Iterator;
15     import java.util.NoSuchElementException;
16 tim 1.1
17     /**
18 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
19 dholmes 1.8 * linked nodes.
20     * This queue orders elements FIFO (first-in-first-out).
21 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
22 dholmes 1.8 * queue the longest time.
23     * The <em>tail</em> of the queue is that element that has been on the
24 dl 1.20 * queue the shortest time. New elements
25     * are inserted at the tail of the queue, and the queue retrieval
26     * operations obtain elements at the head of the queue.
27 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
28     * less predictable performance in most concurrent applications.
29 tim 1.12 *
30 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
31 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
32     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
33 dl 1.3 * dynamically created upon each insertion unless this would bring the
34     * queue above capacity.
35 dholmes 1.8 *
36 dl 1.36 * <p>This class and its iterator implement all of the
37     * <em>optional</em> methods of the {@link Collection} and {@link
38 dl 1.38 * Iterator} interfaces.
39 dl 1.21 *
40 dl 1.34 * <p>This class is a member of the
41 jsr166 1.48 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 dl 1.34 * Java Collections Framework</a>.
43     *
44 dl 1.6 * @since 1.5
45     * @author Doug Lea
46 dl 1.27 * @param <E> the type of elements held in this collection
47 tim 1.12 *
48 jsr166 1.40 */
49 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
50 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
51 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
52 tim 1.1
53 dl 1.2 /*
54     * A variant of the "two lock queue" algorithm. The putLock gates
55     * entry to put (and offer), and has an associated condition for
56     * waiting puts. Similarly for the takeLock. The "count" field
57     * that they both rely on is maintained as an atomic to avoid
58     * needing to get both locks in most cases. Also, to minimize need
59     * for puts to get takeLock and vice-versa, cascading notifies are
60     * used. When a put notices that it has enabled at least one take,
61     * it signals taker. That taker in turn signals others if more
62     * items have been entered since the signal. And symmetrically for
63 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
64 dl 1.2 * iterators acquire both locks.
65 jsr166 1.51 *
66     * Visibility between writers and readers is provided as follows:
67     *
68     * Whenever an element is enqueued, the putLock is acquired and
69     * count updated. A subsequent reader guarantees visibility to the
70     * enqueued Node by either acquiring the putLock (via fullyLock)
71     * or by acquiring the takeLock, and then reading n = count.get();
72     * this gives visibility to the first n items.
73     *
74     * To implement weakly consistent iterators, it appears we need to
75     * keep all Nodes GC-reachable from a predecessor dequeued Node.
76     * That would cause two problems:
77     * - allow a rogue Iterator to cause unbounded memory retention
78     * - cause cross-generational linking of old Nodes to new Nodes if
79     * a Node was tenured while live, which generational GCs have a
80     * hard time dealing with, causing repeated major collections.
81     * However, only non-deleted Nodes need to be reachable from
82     * dequeued Nodes, and reachability does not necessarily have to
83     * be of the kind understood by the GC. We use the trick of
84     * linking a Node that has just been dequeued to itself. Such a
85     * self-link implicitly means to advance to head.next.
86 dl 1.38 */
87 dl 1.2
88 dl 1.6 /**
89     * Linked list node class
90     */
91 dl 1.2 static class Node<E> {
92 jsr166 1.51 E item;
93    
94     /**
95     * One of:
96     * - the real successor Node
97     * - this Node, meaning the successor is head.next
98     * - null, meaning there is no successor (this is the last node)
99     */
100 dl 1.2 Node<E> next;
101 jsr166 1.51
102 dl 1.2 Node(E x) { item = x; }
103     }
104    
105 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
106 dl 1.2 private final int capacity;
107 dl 1.6
108     /** Current number of elements */
109 jsr166 1.61 private final AtomicInteger count = new AtomicInteger();
110 dl 1.2
111 jsr166 1.51 /**
112     * Head of linked list.
113     * Invariant: head.item == null
114     */
115 jsr166 1.64 transient Node<E> head;
116 dl 1.6
117 jsr166 1.51 /**
118     * Tail of linked list.
119     * Invariant: last.next == null
120     */
121 dl 1.6 private transient Node<E> last;
122 dl 1.2
123 dl 1.6 /** Lock held by take, poll, etc */
124 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
125 dl 1.6
126     /** Wait queue for waiting takes */
127 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
128 dl 1.2
129 dl 1.6 /** Lock held by put, offer, etc */
130 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
131 dl 1.6
132     /** Wait queue for waiting puts */
133 dl 1.32 private final Condition notFull = putLock.newCondition();
134 dl 1.2
135     /**
136 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
137 dl 1.4 * otherwise ordinarily lock takeLock.)
138 dl 1.2 */
139     private void signalNotEmpty() {
140 dl 1.31 final ReentrantLock takeLock = this.takeLock;
141 dl 1.2 takeLock.lock();
142     try {
143     notEmpty.signal();
144 tim 1.17 } finally {
145 dl 1.2 takeLock.unlock();
146     }
147     }
148    
149     /**
150 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
151 dl 1.2 */
152     private void signalNotFull() {
153 dl 1.31 final ReentrantLock putLock = this.putLock;
154 dl 1.2 putLock.lock();
155     try {
156     notFull.signal();
157 tim 1.17 } finally {
158 dl 1.2 putLock.unlock();
159     }
160     }
161    
162     /**
163 dl 1.54 * Links node at end of queue.
164 jsr166 1.51 *
165 dl 1.54 * @param node the node
166 dl 1.2 */
167 dl 1.54 private void enqueue(Node<E> node) {
168 jsr166 1.51 // assert putLock.isHeldByCurrentThread();
169     // assert last.next == null;
170 dl 1.54 last = last.next = node;
171 dl 1.2 }
172    
173     /**
174 jsr166 1.51 * Removes a node from head of queue.
175     *
176 dl 1.6 * @return the node
177 dl 1.2 */
178 jsr166 1.51 private E dequeue() {
179     // assert takeLock.isHeldByCurrentThread();
180     // assert head.item == null;
181 dl 1.50 Node<E> h = head;
182     Node<E> first = h.next;
183 jsr166 1.51 h.next = h; // help GC
184 dl 1.2 head = first;
185 dl 1.28 E x = first.item;
186 dl 1.2 first.item = null;
187     return x;
188     }
189    
190     /**
191 tim 1.12 * Lock to prevent both puts and takes.
192 dl 1.2 */
193 jsr166 1.51 void fullyLock() {
194 dl 1.2 putLock.lock();
195     takeLock.lock();
196 tim 1.1 }
197 dl 1.2
198     /**
199 tim 1.12 * Unlock to allow both puts and takes.
200 dl 1.2 */
201 jsr166 1.51 void fullyUnlock() {
202 dl 1.2 takeLock.unlock();
203     putLock.unlock();
204     }
205    
206 jsr166 1.51 // /**
207     // * Tells whether both locks are held by current thread.
208     // */
209     // boolean isFullyLocked() {
210     // return (putLock.isHeldByCurrentThread() &&
211     // takeLock.isHeldByCurrentThread());
212     // }
213 dl 1.2
214     /**
215 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
216 dholmes 1.8 * {@link Integer#MAX_VALUE}.
217 dl 1.2 */
218     public LinkedBlockingQueue() {
219     this(Integer.MAX_VALUE);
220     }
221    
222     /**
223 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
224 tim 1.16 *
225 jsr166 1.43 * @param capacity the capacity of this queue
226 jsr166 1.51 * @throws IllegalArgumentException if {@code capacity} is not greater
227 jsr166 1.43 * than zero
228 dl 1.2 */
229     public LinkedBlockingQueue(int capacity) {
230 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
231 dl 1.2 this.capacity = capacity;
232 dl 1.6 last = head = new Node<E>(null);
233 dl 1.2 }
234    
235     /**
236 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
237 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
238 tim 1.12 * given collection,
239 dholmes 1.8 * added in traversal order of the collection's iterator.
240 jsr166 1.43 *
241 dholmes 1.9 * @param c the collection of elements to initially contain
242 jsr166 1.43 * @throws NullPointerException if the specified collection or any
243     * of its elements are null
244 dl 1.2 */
245 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
246 dl 1.2 this(Integer.MAX_VALUE);
247 jsr166 1.51 final ReentrantLock putLock = this.putLock;
248     putLock.lock(); // Never contended, but necessary for visibility
249     try {
250     int n = 0;
251     for (E e : c) {
252     if (e == null)
253     throw new NullPointerException();
254     if (n == capacity)
255     throw new IllegalStateException("Queue full");
256 dl 1.54 enqueue(new Node<E>(e));
257 jsr166 1.51 ++n;
258     }
259     count.set(n);
260     } finally {
261     putLock.unlock();
262     }
263 dl 1.2 }
264    
265 dholmes 1.9
266 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
267     // greater in size than Integer.MAX_VALUE
268 tim 1.12 /**
269 dl 1.20 * Returns the number of elements in this queue.
270     *
271 jsr166 1.43 * @return the number of elements in this queue
272 dholmes 1.8 */
273 dl 1.2 public int size() {
274     return count.get();
275 tim 1.1 }
276 dl 1.2
277 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
278     // without the reference to unlimited queues.
279 tim 1.12 /**
280 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
281     * (in the absence of memory or resource constraints) accept without
282 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
283 jsr166 1.51 * less the current {@code size} of this queue.
284 jsr166 1.41 *
285     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
286 jsr166 1.51 * an element will succeed by inspecting {@code remainingCapacity}
287 jsr166 1.41 * because it may be the case that another thread is about to
288 jsr166 1.43 * insert or remove an element.
289 dholmes 1.8 */
290 dl 1.2 public int remainingCapacity() {
291     return capacity - count.get();
292     }
293    
294 dholmes 1.22 /**
295 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
296 dholmes 1.22 * necessary for space to become available.
297 jsr166 1.43 *
298     * @throws InterruptedException {@inheritDoc}
299     * @throws NullPointerException {@inheritDoc}
300 dholmes 1.22 */
301 jsr166 1.42 public void put(E e) throws InterruptedException {
302     if (e == null) throw new NullPointerException();
303 jsr166 1.51 // Note: convention in all put/take/etc is to preset local var
304     // holding count negative to indicate failure unless set.
305 tim 1.12 int c = -1;
306 jsr166 1.60 Node<E> node = new Node<E>(e);
307 dl 1.31 final ReentrantLock putLock = this.putLock;
308     final AtomicInteger count = this.count;
309 dl 1.2 putLock.lockInterruptibly();
310     try {
311     /*
312     * Note that count is used in wait guard even though it is
313     * not protected by lock. This works because count can
314     * only decrease at this point (all other puts are shut
315     * out by lock), and we (or some other waiting put) are
316 jsr166 1.51 * signalled if it ever changes from capacity. Similarly
317     * for all other uses of count in other wait guards.
318 dl 1.2 */
319 jsr166 1.51 while (count.get() == capacity) {
320     notFull.await();
321 dl 1.2 }
322 dl 1.54 enqueue(node);
323 dl 1.2 c = count.getAndIncrement();
324 dl 1.6 if (c + 1 < capacity)
325 dl 1.2 notFull.signal();
326 tim 1.17 } finally {
327 dl 1.2 putLock.unlock();
328     }
329 tim 1.12 if (c == 0)
330 dl 1.2 signalNotEmpty();
331 tim 1.1 }
332 dl 1.2
333 dholmes 1.22 /**
334     * Inserts the specified element at the tail of this queue, waiting if
335     * necessary up to the specified wait time for space to become available.
336 jsr166 1.43 *
337 jsr166 1.51 * @return {@code true} if successful, or {@code false} if
338 jsr166 1.43 * the specified waiting time elapses before space is available.
339     * @throws InterruptedException {@inheritDoc}
340     * @throws NullPointerException {@inheritDoc}
341 dholmes 1.22 */
342 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
343 dholmes 1.8 throws InterruptedException {
344 tim 1.12
345 jsr166 1.42 if (e == null) throw new NullPointerException();
346 dl 1.2 long nanos = unit.toNanos(timeout);
347     int c = -1;
348 dl 1.31 final ReentrantLock putLock = this.putLock;
349     final AtomicInteger count = this.count;
350 dholmes 1.8 putLock.lockInterruptibly();
351 dl 1.2 try {
352 jsr166 1.51 while (count.get() == capacity) {
353 dl 1.2 if (nanos <= 0)
354     return false;
355 jsr166 1.51 nanos = notFull.awaitNanos(nanos);
356 dl 1.2 }
357 dl 1.54 enqueue(new Node<E>(e));
358 jsr166 1.51 c = count.getAndIncrement();
359     if (c + 1 < capacity)
360     notFull.signal();
361 tim 1.17 } finally {
362 dl 1.2 putLock.unlock();
363     }
364 tim 1.12 if (c == 0)
365 dl 1.2 signalNotEmpty();
366     return true;
367 tim 1.1 }
368 dl 1.2
369 dl 1.23 /**
370 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
371     * possible to do so immediately without exceeding the queue's capacity,
372 jsr166 1.51 * returning {@code true} upon success and {@code false} if this queue
373 jsr166 1.44 * is full.
374     * When using a capacity-restricted queue, this method is generally
375     * preferable to method {@link BlockingQueue#add add}, which can fail to
376     * insert an element only by throwing an exception.
377 dl 1.23 *
378 jsr166 1.43 * @throws NullPointerException if the specified element is null
379 dl 1.23 */
380 jsr166 1.42 public boolean offer(E e) {
381     if (e == null) throw new NullPointerException();
382 dl 1.31 final AtomicInteger count = this.count;
383 dl 1.2 if (count.get() == capacity)
384     return false;
385 tim 1.12 int c = -1;
386 jsr166 1.60 Node<E> node = new Node<E>(e);
387 dl 1.31 final ReentrantLock putLock = this.putLock;
388 dholmes 1.8 putLock.lock();
389 dl 1.2 try {
390     if (count.get() < capacity) {
391 dl 1.54 enqueue(node);
392 dl 1.2 c = count.getAndIncrement();
393 dl 1.6 if (c + 1 < capacity)
394 dl 1.2 notFull.signal();
395     }
396 tim 1.17 } finally {
397 dl 1.2 putLock.unlock();
398     }
399 tim 1.12 if (c == 0)
400 dl 1.2 signalNotEmpty();
401     return c >= 0;
402 tim 1.1 }
403 dl 1.2
404    
405     public E take() throws InterruptedException {
406     E x;
407     int c = -1;
408 dl 1.31 final AtomicInteger count = this.count;
409     final ReentrantLock takeLock = this.takeLock;
410 dl 1.2 takeLock.lockInterruptibly();
411     try {
412 jsr166 1.51 while (count.get() == 0) {
413     notEmpty.await();
414 dl 1.2 }
415 jsr166 1.51 x = dequeue();
416 dl 1.2 c = count.getAndDecrement();
417     if (c > 1)
418     notEmpty.signal();
419 tim 1.17 } finally {
420 dl 1.2 takeLock.unlock();
421     }
422 tim 1.12 if (c == capacity)
423 dl 1.2 signalNotFull();
424     return x;
425     }
426    
427     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428     E x = null;
429     int c = -1;
430 dholmes 1.8 long nanos = unit.toNanos(timeout);
431 dl 1.31 final AtomicInteger count = this.count;
432     final ReentrantLock takeLock = this.takeLock;
433 dl 1.2 takeLock.lockInterruptibly();
434     try {
435 jsr166 1.51 while (count.get() == 0) {
436 dl 1.2 if (nanos <= 0)
437     return null;
438 jsr166 1.51 nanos = notEmpty.awaitNanos(nanos);
439 dl 1.2 }
440 jsr166 1.51 x = dequeue();
441     c = count.getAndDecrement();
442     if (c > 1)
443     notEmpty.signal();
444 tim 1.17 } finally {
445 dl 1.2 takeLock.unlock();
446     }
447 tim 1.12 if (c == capacity)
448 dl 1.2 signalNotFull();
449     return x;
450     }
451    
452     public E poll() {
453 dl 1.31 final AtomicInteger count = this.count;
454 dl 1.2 if (count.get() == 0)
455     return null;
456     E x = null;
457 tim 1.12 int c = -1;
458 dl 1.31 final ReentrantLock takeLock = this.takeLock;
459 dl 1.30 takeLock.lock();
460 dl 1.2 try {
461     if (count.get() > 0) {
462 jsr166 1.51 x = dequeue();
463 dl 1.2 c = count.getAndDecrement();
464     if (c > 1)
465     notEmpty.signal();
466     }
467 tim 1.17 } finally {
468 dl 1.2 takeLock.unlock();
469     }
470 tim 1.12 if (c == capacity)
471 dl 1.2 signalNotFull();
472     return x;
473 tim 1.1 }
474 dl 1.2
475     public E peek() {
476     if (count.get() == 0)
477     return null;
478 dl 1.31 final ReentrantLock takeLock = this.takeLock;
479 dholmes 1.8 takeLock.lock();
480 dl 1.2 try {
481     Node<E> first = head.next;
482     if (first == null)
483     return null;
484     else
485     return first.item;
486 tim 1.17 } finally {
487 dl 1.2 takeLock.unlock();
488     }
489 tim 1.1 }
490    
491 dl 1.35 /**
492 jsr166 1.51 * Unlinks interior Node p with predecessor trail.
493     */
494     void unlink(Node<E> p, Node<E> trail) {
495     // assert isFullyLocked();
496     // p.next is not changed, to allow iterators that are
497     // traversing p to maintain their weak-consistency guarantee.
498     p.item = null;
499     trail.next = p.next;
500     if (last == p)
501     last = trail;
502     if (count.getAndDecrement() == capacity)
503     notFull.signal();
504     }
505    
506     /**
507 jsr166 1.44 * Removes a single instance of the specified element from this queue,
508 jsr166 1.51 * if it is present. More formally, removes an element {@code e} such
509     * that {@code o.equals(e)}, if this queue contains one or more such
510 jsr166 1.44 * elements.
511 jsr166 1.51 * Returns {@code true} if this queue contained the specified element
512 jsr166 1.44 * (or equivalently, if this queue changed as a result of the call).
513     *
514     * @param o element to be removed from this queue, if present
515 jsr166 1.51 * @return {@code true} if this queue changed as a result of the call
516 dl 1.35 */
517 dholmes 1.9 public boolean remove(Object o) {
518     if (o == null) return false;
519 dl 1.2 fullyLock();
520     try {
521 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
522     p != null;
523     trail = p, p = p.next) {
524 dholmes 1.9 if (o.equals(p.item)) {
525 jsr166 1.51 unlink(p, trail);
526     return true;
527 dl 1.2 }
528     }
529 jsr166 1.51 return false;
530 tim 1.17 } finally {
531 dl 1.2 fullyUnlock();
532     }
533 tim 1.1 }
534 dl 1.2
535 jsr166 1.43 /**
536 jsr166 1.56 * Returns {@code true} if this queue contains the specified element.
537     * More formally, returns {@code true} if and only if this queue contains
538     * at least one element {@code e} such that {@code o.equals(e)}.
539     *
540     * @param o object to be checked for containment in this queue
541     * @return {@code true} if this queue contains the specified element
542     */
543     public boolean contains(Object o) {
544     if (o == null) return false;
545     fullyLock();
546     try {
547     for (Node<E> p = head.next; p != null; p = p.next)
548     if (o.equals(p.item))
549     return true;
550     return false;
551     } finally {
552     fullyUnlock();
553     }
554     }
555    
556     /**
557 jsr166 1.43 * Returns an array containing all of the elements in this queue, in
558     * proper sequence.
559     *
560     * <p>The returned array will be "safe" in that no references to it are
561     * maintained by this queue. (In other words, this method must allocate
562     * a new array). The caller is thus free to modify the returned array.
563 jsr166 1.45 *
564 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
565     * APIs.
566     *
567     * @return an array containing all of the elements in this queue
568     */
569 dl 1.2 public Object[] toArray() {
570     fullyLock();
571     try {
572     int size = count.get();
573 tim 1.12 Object[] a = new Object[size];
574 dl 1.2 int k = 0;
575 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
576 dl 1.2 a[k++] = p.item;
577     return a;
578 tim 1.17 } finally {
579 dl 1.2 fullyUnlock();
580     }
581 tim 1.1 }
582 dl 1.2
583 jsr166 1.43 /**
584     * Returns an array containing all of the elements in this queue, in
585     * proper sequence; the runtime type of the returned array is that of
586     * the specified array. If the queue fits in the specified array, it
587     * is returned therein. Otherwise, a new array is allocated with the
588     * runtime type of the specified array and the size of this queue.
589     *
590     * <p>If this queue fits in the specified array with room to spare
591     * (i.e., the array has more elements than this queue), the element in
592     * the array immediately following the end of the queue is set to
593 jsr166 1.51 * {@code null}.
594 jsr166 1.43 *
595     * <p>Like the {@link #toArray()} method, this method acts as bridge between
596     * array-based and collection-based APIs. Further, this method allows
597     * precise control over the runtime type of the output array, and may,
598     * under certain circumstances, be used to save allocation costs.
599     *
600 jsr166 1.51 * <p>Suppose {@code x} is a queue known to contain only strings.
601 jsr166 1.43 * The following code can be used to dump the queue into a newly
602 jsr166 1.51 * allocated array of {@code String}:
603 jsr166 1.43 *
604 jsr166 1.62 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
605 jsr166 1.43 *
606 jsr166 1.51 * Note that {@code toArray(new Object[0])} is identical in function to
607     * {@code toArray()}.
608 jsr166 1.43 *
609     * @param a the array into which the elements of the queue are to
610     * be stored, if it is big enough; otherwise, a new array of the
611     * same runtime type is allocated for this purpose
612     * @return an array containing all of the elements in this queue
613     * @throws ArrayStoreException if the runtime type of the specified array
614     * is not a supertype of the runtime type of every element in
615     * this queue
616     * @throws NullPointerException if the specified array is null
617     */
618 jsr166 1.51 @SuppressWarnings("unchecked")
619 dl 1.2 public <T> T[] toArray(T[] a) {
620     fullyLock();
621     try {
622     int size = count.get();
623     if (a.length < size)
624 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
625     (a.getClass().getComponentType(), size);
626 tim 1.12
627 dl 1.2 int k = 0;
628 jsr166 1.51 for (Node<E> p = head.next; p != null; p = p.next)
629 dl 1.2 a[k++] = (T)p.item;
630 jsr166 1.47 if (a.length > k)
631     a[k] = null;
632 dl 1.2 return a;
633 tim 1.17 } finally {
634 dl 1.2 fullyUnlock();
635     }
636 tim 1.1 }
637 dl 1.2
638     public String toString() {
639     fullyLock();
640     try {
641 jsr166 1.55 Node<E> p = head.next;
642     if (p == null)
643     return "[]";
644    
645     StringBuilder sb = new StringBuilder();
646     sb.append('[');
647     for (;;) {
648     E e = p.item;
649     sb.append(e == this ? "(this Collection)" : e);
650     p = p.next;
651     if (p == null)
652     return sb.append(']').toString();
653     sb.append(',').append(' ');
654     }
655 tim 1.17 } finally {
656 dl 1.2 fullyUnlock();
657     }
658 tim 1.1 }
659 dl 1.2
660 dl 1.35 /**
661     * Atomically removes all of the elements from this queue.
662     * The queue will be empty after this call returns.
663     */
664 dl 1.24 public void clear() {
665     fullyLock();
666     try {
667 jsr166 1.51 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
668     h.next = h;
669     p.item = null;
670     }
671     head = last;
672     // assert head.item == null && head.next == null;
673 dl 1.24 if (count.getAndSet(0) == capacity)
674 jsr166 1.51 notFull.signal();
675 dl 1.24 } finally {
676     fullyUnlock();
677     }
678     }
679    
680 jsr166 1.43 /**
681     * @throws UnsupportedOperationException {@inheritDoc}
682     * @throws ClassCastException {@inheritDoc}
683     * @throws NullPointerException {@inheritDoc}
684     * @throws IllegalArgumentException {@inheritDoc}
685     */
686 dl 1.24 public int drainTo(Collection<? super E> c) {
687 jsr166 1.51 return drainTo(c, Integer.MAX_VALUE);
688 dl 1.24 }
689 jsr166 1.40
690 jsr166 1.43 /**
691     * @throws UnsupportedOperationException {@inheritDoc}
692     * @throws ClassCastException {@inheritDoc}
693     * @throws NullPointerException {@inheritDoc}
694     * @throws IllegalArgumentException {@inheritDoc}
695     */
696 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
697     if (c == null)
698     throw new NullPointerException();
699     if (c == this)
700     throw new IllegalArgumentException();
701 jsr166 1.63 if (maxElements <= 0)
702     return 0;
703 jsr166 1.51 boolean signalNotFull = false;
704     final ReentrantLock takeLock = this.takeLock;
705     takeLock.lock();
706 dl 1.24 try {
707 jsr166 1.51 int n = Math.min(maxElements, count.get());
708     // count.get provides visibility to first n Nodes
709     Node<E> h = head;
710     int i = 0;
711     try {
712     while (i < n) {
713     Node<E> p = h.next;
714     c.add(p.item);
715     p.item = null;
716     h.next = h;
717     h = p;
718     ++i;
719     }
720     return n;
721     } finally {
722     // Restore invariants even if c.add() threw
723     if (i > 0) {
724     // assert h.item == null;
725     head = h;
726     signalNotFull = (count.getAndAdd(-i) == capacity);
727     }
728 dl 1.24 }
729     } finally {
730 jsr166 1.51 takeLock.unlock();
731     if (signalNotFull)
732     signalNotFull();
733 dl 1.24 }
734     }
735    
736 dholmes 1.14 /**
737     * Returns an iterator over the elements in this queue in proper sequence.
738 jsr166 1.57 * The elements will be returned in order from first (head) to last (tail).
739     *
740     * <p>The returned iterator is a "weakly consistent" iterator that
741 jsr166 1.52 * will never throw {@link java.util.ConcurrentModificationException
742 jsr166 1.57 * ConcurrentModificationException}, and guarantees to traverse
743     * elements as they existed upon construction of the iterator, and
744     * may (but is not guaranteed to) reflect any modifications
745     * subsequent to construction.
746 dholmes 1.14 *
747 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
748 dholmes 1.14 */
749 dl 1.2 public Iterator<E> iterator() {
750 jsr166 1.59 return new Itr();
751 tim 1.1 }
752 dl 1.2
753     private class Itr implements Iterator<E> {
754 tim 1.12 /*
755 jsr166 1.51 * Basic weakly-consistent iterator. At all times hold the next
756 dl 1.4 * item to hand out so that if hasNext() reports true, we will
757     * still have it to return even if lost race with a take etc.
758     */
759 dl 1.31 private Node<E> current;
760     private Node<E> lastRet;
761     private E currentElement;
762 tim 1.12
763 dl 1.2 Itr() {
764 jsr166 1.51 fullyLock();
765 dl 1.2 try {
766     current = head.next;
767 dl 1.4 if (current != null)
768     currentElement = current.item;
769 tim 1.17 } finally {
770 jsr166 1.51 fullyUnlock();
771 dl 1.2 }
772     }
773 tim 1.12
774     public boolean hasNext() {
775 dl 1.2 return current != null;
776     }
777    
778 jsr166 1.51 /**
779 jsr166 1.53 * Returns the next live successor of p, or null if no such.
780     *
781     * Unlike other traversal methods, iterators need to handle both:
782 jsr166 1.51 * - dequeued nodes (p.next == p)
783 jsr166 1.53 * - (possibly multiple) interior removed nodes (p.item == null)
784 jsr166 1.51 */
785     private Node<E> nextNode(Node<E> p) {
786 jsr166 1.53 for (;;) {
787     Node<E> s = p.next;
788     if (s == p)
789     return head.next;
790     if (s == null || s.item != null)
791     return s;
792     p = s;
793     }
794 jsr166 1.51 }
795    
796 tim 1.12 public E next() {
797 jsr166 1.51 fullyLock();
798 dl 1.2 try {
799     if (current == null)
800     throw new NoSuchElementException();
801 dl 1.4 E x = currentElement;
802 dl 1.2 lastRet = current;
803 jsr166 1.51 current = nextNode(current);
804     currentElement = (current == null) ? null : current.item;
805 dl 1.2 return x;
806 tim 1.17 } finally {
807 jsr166 1.51 fullyUnlock();
808 dl 1.2 }
809     }
810    
811 tim 1.12 public void remove() {
812 dl 1.2 if (lastRet == null)
813 tim 1.12 throw new IllegalStateException();
814 jsr166 1.51 fullyLock();
815 dl 1.2 try {
816     Node<E> node = lastRet;
817     lastRet = null;
818 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
819     p != null;
820     trail = p, p = p.next) {
821     if (p == node) {
822     unlink(p, trail);
823     break;
824     }
825 dl 1.2 }
826 tim 1.17 } finally {
827 jsr166 1.51 fullyUnlock();
828 dl 1.2 }
829     }
830 tim 1.1 }
831 dl 1.2
832     /**
833 jsr166 1.66 * Saves the state to a stream (that is, serializes it).
834 dl 1.2 *
835     * @serialData The capacity is emitted (int), followed by all of
836 jsr166 1.51 * its elements (each an {@code Object}) in the proper order,
837 dl 1.2 * followed by a null
838 dl 1.6 * @param s the stream
839 dl 1.2 */
840     private void writeObject(java.io.ObjectOutputStream s)
841     throws java.io.IOException {
842    
843 tim 1.12 fullyLock();
844 dl 1.2 try {
845     // Write out any hidden stuff, plus capacity
846     s.defaultWriteObject();
847    
848     // Write out all elements in the proper order.
849 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
850 dl 1.2 s.writeObject(p.item);
851    
852     // Use trailing null as sentinel
853     s.writeObject(null);
854 tim 1.17 } finally {
855 dl 1.2 fullyUnlock();
856     }
857 tim 1.1 }
858    
859 dl 1.2 /**
860 jsr166 1.65 * Reconstitutes this queue from a stream (that is, deserializes it).
861 jsr166 1.51 *
862 dl 1.6 * @param s the stream
863 dl 1.2 */
864     private void readObject(java.io.ObjectInputStream s)
865     throws java.io.IOException, ClassNotFoundException {
866 tim 1.12 // Read in capacity, and any hidden stuff
867     s.defaultReadObject();
868 dl 1.2
869 dl 1.19 count.set(0);
870     last = head = new Node<E>(null);
871    
872 dl 1.6 // Read in all elements and place in queue
873 dl 1.2 for (;;) {
874 jsr166 1.51 @SuppressWarnings("unchecked")
875 dl 1.2 E item = (E)s.readObject();
876     if (item == null)
877     break;
878     add(item);
879     }
880 tim 1.1 }
881     }