ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.53
Committed: Mon Aug 31 21:48:14 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.52: +12 -10 lines
Log Message:
6871697: LinkedBlockingQueue Iterator/remove/poll race

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.51
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14     import java.util.Iterator;
15     import java.util.NoSuchElementException;
16 tim 1.1
17     /**
18 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
19 dholmes 1.8 * linked nodes.
20     * This queue orders elements FIFO (first-in-first-out).
21 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
22 dholmes 1.8 * queue the longest time.
23     * The <em>tail</em> of the queue is that element that has been on the
24 dl 1.20 * queue the shortest time. New elements
25     * are inserted at the tail of the queue, and the queue retrieval
26     * operations obtain elements at the head of the queue.
27 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
28     * less predictable performance in most concurrent applications.
29 tim 1.12 *
30 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
31 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
32     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
33 dl 1.3 * dynamically created upon each insertion unless this would bring the
34     * queue above capacity.
35 dholmes 1.8 *
36 dl 1.36 * <p>This class and its iterator implement all of the
37     * <em>optional</em> methods of the {@link Collection} and {@link
38 dl 1.38 * Iterator} interfaces.
39 dl 1.21 *
40 dl 1.34 * <p>This class is a member of the
41 jsr166 1.48 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 dl 1.34 * Java Collections Framework</a>.
43     *
44 dl 1.6 * @since 1.5
45     * @author Doug Lea
46 dl 1.27 * @param <E> the type of elements held in this collection
47 tim 1.12 *
48 jsr166 1.40 */
49 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
50 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
51 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
52 tim 1.1
53 dl 1.2 /*
54     * A variant of the "two lock queue" algorithm. The putLock gates
55     * entry to put (and offer), and has an associated condition for
56     * waiting puts. Similarly for the takeLock. The "count" field
57     * that they both rely on is maintained as an atomic to avoid
58     * needing to get both locks in most cases. Also, to minimize need
59     * for puts to get takeLock and vice-versa, cascading notifies are
60     * used. When a put notices that it has enabled at least one take,
61     * it signals taker. That taker in turn signals others if more
62     * items have been entered since the signal. And symmetrically for
63 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
64 dl 1.2 * iterators acquire both locks.
65 jsr166 1.51 *
66     * Visibility between writers and readers is provided as follows:
67     *
68     * Whenever an element is enqueued, the putLock is acquired and
69     * count updated. A subsequent reader guarantees visibility to the
70     * enqueued Node by either acquiring the putLock (via fullyLock)
71     * or by acquiring the takeLock, and then reading n = count.get();
72     * this gives visibility to the first n items.
73     *
74     * To implement weakly consistent iterators, it appears we need to
75     * keep all Nodes GC-reachable from a predecessor dequeued Node.
76     * That would cause two problems:
77     * - allow a rogue Iterator to cause unbounded memory retention
78     * - cause cross-generational linking of old Nodes to new Nodes if
79     * a Node was tenured while live, which generational GCs have a
80     * hard time dealing with, causing repeated major collections.
81     * However, only non-deleted Nodes need to be reachable from
82     * dequeued Nodes, and reachability does not necessarily have to
83     * be of the kind understood by the GC. We use the trick of
84     * linking a Node that has just been dequeued to itself. Such a
85     * self-link implicitly means to advance to head.next.
86 dl 1.38 */
87 dl 1.2
88 dl 1.6 /**
89     * Linked list node class
90     */
91 dl 1.2 static class Node<E> {
92 jsr166 1.51 E item;
93    
94     /**
95     * One of:
96     * - the real successor Node
97     * - this Node, meaning the successor is head.next
98     * - null, meaning there is no successor (this is the last node)
99     */
100 dl 1.2 Node<E> next;
101 jsr166 1.51
102 dl 1.2 Node(E x) { item = x; }
103     }
104    
105 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
106 dl 1.2 private final int capacity;
107 dl 1.6
108     /** Current number of elements */
109 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
110 dl 1.2
111 jsr166 1.51 /**
112     * Head of linked list.
113     * Invariant: head.item == null
114     */
115 dl 1.6 private transient Node<E> head;
116    
117 jsr166 1.51 /**
118     * Tail of linked list.
119     * Invariant: last.next == null
120     */
121 dl 1.6 private transient Node<E> last;
122 dl 1.2
123 dl 1.6 /** Lock held by take, poll, etc */
124 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
125 dl 1.6
126     /** Wait queue for waiting takes */
127 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
128 dl 1.2
129 dl 1.6 /** Lock held by put, offer, etc */
130 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
131 dl 1.6
132     /** Wait queue for waiting puts */
133 dl 1.32 private final Condition notFull = putLock.newCondition();
134 dl 1.2
135     /**
136 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
137 dl 1.4 * otherwise ordinarily lock takeLock.)
138 dl 1.2 */
139     private void signalNotEmpty() {
140 dl 1.31 final ReentrantLock takeLock = this.takeLock;
141 dl 1.2 takeLock.lock();
142     try {
143     notEmpty.signal();
144 tim 1.17 } finally {
145 dl 1.2 takeLock.unlock();
146     }
147     }
148    
149     /**
150 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
151 dl 1.2 */
152     private void signalNotFull() {
153 dl 1.31 final ReentrantLock putLock = this.putLock;
154 dl 1.2 putLock.lock();
155     try {
156     notFull.signal();
157 tim 1.17 } finally {
158 dl 1.2 putLock.unlock();
159     }
160     }
161    
162     /**
163 jsr166 1.40 * Creates a node and links it at end of queue.
164 jsr166 1.51 *
165 dl 1.6 * @param x the item
166 dl 1.2 */
167 jsr166 1.51 private void enqueue(E x) {
168     // assert putLock.isHeldByCurrentThread();
169     // assert last.next == null;
170 dl 1.2 last = last.next = new Node<E>(x);
171     }
172    
173     /**
174 jsr166 1.51 * Removes a node from head of queue.
175     *
176 dl 1.6 * @return the node
177 dl 1.2 */
178 jsr166 1.51 private E dequeue() {
179     // assert takeLock.isHeldByCurrentThread();
180     // assert head.item == null;
181 dl 1.50 Node<E> h = head;
182     Node<E> first = h.next;
183 jsr166 1.51 h.next = h; // help GC
184 dl 1.2 head = first;
185 dl 1.28 E x = first.item;
186 dl 1.2 first.item = null;
187     return x;
188     }
189    
190     /**
191 tim 1.12 * Lock to prevent both puts and takes.
192 dl 1.2 */
193 jsr166 1.51 void fullyLock() {
194 dl 1.2 putLock.lock();
195     takeLock.lock();
196 tim 1.1 }
197 dl 1.2
198     /**
199 tim 1.12 * Unlock to allow both puts and takes.
200 dl 1.2 */
201 jsr166 1.51 void fullyUnlock() {
202 dl 1.2 takeLock.unlock();
203     putLock.unlock();
204     }
205    
206 jsr166 1.51 // /**
207     // * Tells whether both locks are held by current thread.
208     // */
209     // boolean isFullyLocked() {
210     // return (putLock.isHeldByCurrentThread() &&
211     // takeLock.isHeldByCurrentThread());
212     // }
213 dl 1.2
214     /**
215 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
216 dholmes 1.8 * {@link Integer#MAX_VALUE}.
217 dl 1.2 */
218     public LinkedBlockingQueue() {
219     this(Integer.MAX_VALUE);
220     }
221    
222     /**
223 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
224 tim 1.16 *
225 jsr166 1.43 * @param capacity the capacity of this queue
226 jsr166 1.51 * @throws IllegalArgumentException if {@code capacity} is not greater
227 jsr166 1.43 * than zero
228 dl 1.2 */
229     public LinkedBlockingQueue(int capacity) {
230 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
231 dl 1.2 this.capacity = capacity;
232 dl 1.6 last = head = new Node<E>(null);
233 dl 1.2 }
234    
235     /**
236 jsr166 1.51 * Creates a {@code LinkedBlockingQueue} with a capacity of
237 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
238 tim 1.12 * given collection,
239 dholmes 1.8 * added in traversal order of the collection's iterator.
240 jsr166 1.43 *
241 dholmes 1.9 * @param c the collection of elements to initially contain
242 jsr166 1.43 * @throws NullPointerException if the specified collection or any
243     * of its elements are null
244 dl 1.2 */
245 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
246 dl 1.2 this(Integer.MAX_VALUE);
247 jsr166 1.51 final ReentrantLock putLock = this.putLock;
248     putLock.lock(); // Never contended, but necessary for visibility
249     try {
250     int n = 0;
251     for (E e : c) {
252     if (e == null)
253     throw new NullPointerException();
254     if (n == capacity)
255     throw new IllegalStateException("Queue full");
256     enqueue(e);
257     ++n;
258     }
259     count.set(n);
260     } finally {
261     putLock.unlock();
262     }
263 dl 1.2 }
264    
265 dholmes 1.9
266 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
267     // greater in size than Integer.MAX_VALUE
268 tim 1.12 /**
269 dl 1.20 * Returns the number of elements in this queue.
270     *
271 jsr166 1.43 * @return the number of elements in this queue
272 dholmes 1.8 */
273 dl 1.2 public int size() {
274     return count.get();
275 tim 1.1 }
276 dl 1.2
277 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
278     // without the reference to unlimited queues.
279 tim 1.12 /**
280 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
281     * (in the absence of memory or resource constraints) accept without
282 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
283 jsr166 1.51 * less the current {@code size} of this queue.
284 jsr166 1.41 *
285     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
286 jsr166 1.51 * an element will succeed by inspecting {@code remainingCapacity}
287 jsr166 1.41 * because it may be the case that another thread is about to
288 jsr166 1.43 * insert or remove an element.
289 dholmes 1.8 */
290 dl 1.2 public int remainingCapacity() {
291     return capacity - count.get();
292     }
293    
294 dholmes 1.22 /**
295 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
296 dholmes 1.22 * necessary for space to become available.
297 jsr166 1.43 *
298     * @throws InterruptedException {@inheritDoc}
299     * @throws NullPointerException {@inheritDoc}
300 dholmes 1.22 */
301 jsr166 1.42 public void put(E e) throws InterruptedException {
302     if (e == null) throw new NullPointerException();
303 jsr166 1.51 // Note: convention in all put/take/etc is to preset local var
304     // holding count negative to indicate failure unless set.
305 tim 1.12 int c = -1;
306 dl 1.31 final ReentrantLock putLock = this.putLock;
307     final AtomicInteger count = this.count;
308 dl 1.2 putLock.lockInterruptibly();
309     try {
310     /*
311     * Note that count is used in wait guard even though it is
312     * not protected by lock. This works because count can
313     * only decrease at this point (all other puts are shut
314     * out by lock), and we (or some other waiting put) are
315 jsr166 1.51 * signalled if it ever changes from capacity. Similarly
316     * for all other uses of count in other wait guards.
317 dl 1.2 */
318 jsr166 1.51 while (count.get() == capacity) {
319     notFull.await();
320 dl 1.2 }
321 jsr166 1.51 enqueue(e);
322 dl 1.2 c = count.getAndIncrement();
323 dl 1.6 if (c + 1 < capacity)
324 dl 1.2 notFull.signal();
325 tim 1.17 } finally {
326 dl 1.2 putLock.unlock();
327     }
328 tim 1.12 if (c == 0)
329 dl 1.2 signalNotEmpty();
330 tim 1.1 }
331 dl 1.2
332 dholmes 1.22 /**
333     * Inserts the specified element at the tail of this queue, waiting if
334     * necessary up to the specified wait time for space to become available.
335 jsr166 1.43 *
336 jsr166 1.51 * @return {@code true} if successful, or {@code false} if
337 jsr166 1.43 * the specified waiting time elapses before space is available.
338     * @throws InterruptedException {@inheritDoc}
339     * @throws NullPointerException {@inheritDoc}
340 dholmes 1.22 */
341 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
342 dholmes 1.8 throws InterruptedException {
343 tim 1.12
344 jsr166 1.42 if (e == null) throw new NullPointerException();
345 dl 1.2 long nanos = unit.toNanos(timeout);
346     int c = -1;
347 dl 1.31 final ReentrantLock putLock = this.putLock;
348     final AtomicInteger count = this.count;
349 dholmes 1.8 putLock.lockInterruptibly();
350 dl 1.2 try {
351 jsr166 1.51 while (count.get() == capacity) {
352 dl 1.2 if (nanos <= 0)
353     return false;
354 jsr166 1.51 nanos = notFull.awaitNanos(nanos);
355 dl 1.2 }
356 jsr166 1.51 enqueue(e);
357     c = count.getAndIncrement();
358     if (c + 1 < capacity)
359     notFull.signal();
360 tim 1.17 } finally {
361 dl 1.2 putLock.unlock();
362     }
363 tim 1.12 if (c == 0)
364 dl 1.2 signalNotEmpty();
365     return true;
366 tim 1.1 }
367 dl 1.2
368 dl 1.23 /**
369 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
370     * possible to do so immediately without exceeding the queue's capacity,
371 jsr166 1.51 * returning {@code true} upon success and {@code false} if this queue
372 jsr166 1.44 * is full.
373     * When using a capacity-restricted queue, this method is generally
374     * preferable to method {@link BlockingQueue#add add}, which can fail to
375     * insert an element only by throwing an exception.
376 dl 1.23 *
377 jsr166 1.43 * @throws NullPointerException if the specified element is null
378 dl 1.23 */
379 jsr166 1.42 public boolean offer(E e) {
380     if (e == null) throw new NullPointerException();
381 dl 1.31 final AtomicInteger count = this.count;
382 dl 1.2 if (count.get() == capacity)
383     return false;
384 tim 1.12 int c = -1;
385 dl 1.31 final ReentrantLock putLock = this.putLock;
386 dholmes 1.8 putLock.lock();
387 dl 1.2 try {
388     if (count.get() < capacity) {
389 jsr166 1.51 enqueue(e);
390 dl 1.2 c = count.getAndIncrement();
391 dl 1.6 if (c + 1 < capacity)
392 dl 1.2 notFull.signal();
393     }
394 tim 1.17 } finally {
395 dl 1.2 putLock.unlock();
396     }
397 tim 1.12 if (c == 0)
398 dl 1.2 signalNotEmpty();
399     return c >= 0;
400 tim 1.1 }
401 dl 1.2
402    
403     public E take() throws InterruptedException {
404     E x;
405     int c = -1;
406 dl 1.31 final AtomicInteger count = this.count;
407     final ReentrantLock takeLock = this.takeLock;
408 dl 1.2 takeLock.lockInterruptibly();
409     try {
410 jsr166 1.51 while (count.get() == 0) {
411     notEmpty.await();
412 dl 1.2 }
413 jsr166 1.51 x = dequeue();
414 dl 1.2 c = count.getAndDecrement();
415     if (c > 1)
416     notEmpty.signal();
417 tim 1.17 } finally {
418 dl 1.2 takeLock.unlock();
419     }
420 tim 1.12 if (c == capacity)
421 dl 1.2 signalNotFull();
422     return x;
423     }
424    
425     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
426     E x = null;
427     int c = -1;
428 dholmes 1.8 long nanos = unit.toNanos(timeout);
429 dl 1.31 final AtomicInteger count = this.count;
430     final ReentrantLock takeLock = this.takeLock;
431 dl 1.2 takeLock.lockInterruptibly();
432     try {
433 jsr166 1.51 while (count.get() == 0) {
434 dl 1.2 if (nanos <= 0)
435     return null;
436 jsr166 1.51 nanos = notEmpty.awaitNanos(nanos);
437 dl 1.2 }
438 jsr166 1.51 x = dequeue();
439     c = count.getAndDecrement();
440     if (c > 1)
441     notEmpty.signal();
442 tim 1.17 } finally {
443 dl 1.2 takeLock.unlock();
444     }
445 tim 1.12 if (c == capacity)
446 dl 1.2 signalNotFull();
447     return x;
448     }
449    
450     public E poll() {
451 dl 1.31 final AtomicInteger count = this.count;
452 dl 1.2 if (count.get() == 0)
453     return null;
454     E x = null;
455 tim 1.12 int c = -1;
456 dl 1.31 final ReentrantLock takeLock = this.takeLock;
457 dl 1.30 takeLock.lock();
458 dl 1.2 try {
459     if (count.get() > 0) {
460 jsr166 1.51 x = dequeue();
461 dl 1.2 c = count.getAndDecrement();
462     if (c > 1)
463     notEmpty.signal();
464     }
465 tim 1.17 } finally {
466 dl 1.2 takeLock.unlock();
467     }
468 tim 1.12 if (c == capacity)
469 dl 1.2 signalNotFull();
470     return x;
471 tim 1.1 }
472 dl 1.2
473     public E peek() {
474     if (count.get() == 0)
475     return null;
476 dl 1.31 final ReentrantLock takeLock = this.takeLock;
477 dholmes 1.8 takeLock.lock();
478 dl 1.2 try {
479     Node<E> first = head.next;
480     if (first == null)
481     return null;
482     else
483     return first.item;
484 tim 1.17 } finally {
485 dl 1.2 takeLock.unlock();
486     }
487 tim 1.1 }
488    
489 dl 1.35 /**
490 jsr166 1.51 * Unlinks interior Node p with predecessor trail.
491     */
492     void unlink(Node<E> p, Node<E> trail) {
493     // assert isFullyLocked();
494     // p.next is not changed, to allow iterators that are
495     // traversing p to maintain their weak-consistency guarantee.
496     p.item = null;
497     trail.next = p.next;
498     if (last == p)
499     last = trail;
500     if (count.getAndDecrement() == capacity)
501     notFull.signal();
502     }
503    
504     /**
505 jsr166 1.44 * Removes a single instance of the specified element from this queue,
506 jsr166 1.51 * if it is present. More formally, removes an element {@code e} such
507     * that {@code o.equals(e)}, if this queue contains one or more such
508 jsr166 1.44 * elements.
509 jsr166 1.51 * Returns {@code true} if this queue contained the specified element
510 jsr166 1.44 * (or equivalently, if this queue changed as a result of the call).
511     *
512     * @param o element to be removed from this queue, if present
513 jsr166 1.51 * @return {@code true} if this queue changed as a result of the call
514 dl 1.35 */
515 dholmes 1.9 public boolean remove(Object o) {
516     if (o == null) return false;
517 dl 1.2 fullyLock();
518     try {
519 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
520     p != null;
521     trail = p, p = p.next) {
522 dholmes 1.9 if (o.equals(p.item)) {
523 jsr166 1.51 unlink(p, trail);
524     return true;
525 dl 1.2 }
526     }
527 jsr166 1.51 return false;
528 tim 1.17 } finally {
529 dl 1.2 fullyUnlock();
530     }
531 tim 1.1 }
532 dl 1.2
533 jsr166 1.43 /**
534     * Returns an array containing all of the elements in this queue, in
535     * proper sequence.
536     *
537     * <p>The returned array will be "safe" in that no references to it are
538     * maintained by this queue. (In other words, this method must allocate
539     * a new array). The caller is thus free to modify the returned array.
540 jsr166 1.45 *
541 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
542     * APIs.
543     *
544     * @return an array containing all of the elements in this queue
545     */
546 dl 1.2 public Object[] toArray() {
547     fullyLock();
548     try {
549     int size = count.get();
550 tim 1.12 Object[] a = new Object[size];
551 dl 1.2 int k = 0;
552 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
553 dl 1.2 a[k++] = p.item;
554     return a;
555 tim 1.17 } finally {
556 dl 1.2 fullyUnlock();
557     }
558 tim 1.1 }
559 dl 1.2
560 jsr166 1.43 /**
561     * Returns an array containing all of the elements in this queue, in
562     * proper sequence; the runtime type of the returned array is that of
563     * the specified array. If the queue fits in the specified array, it
564     * is returned therein. Otherwise, a new array is allocated with the
565     * runtime type of the specified array and the size of this queue.
566     *
567     * <p>If this queue fits in the specified array with room to spare
568     * (i.e., the array has more elements than this queue), the element in
569     * the array immediately following the end of the queue is set to
570 jsr166 1.51 * {@code null}.
571 jsr166 1.43 *
572     * <p>Like the {@link #toArray()} method, this method acts as bridge between
573     * array-based and collection-based APIs. Further, this method allows
574     * precise control over the runtime type of the output array, and may,
575     * under certain circumstances, be used to save allocation costs.
576     *
577 jsr166 1.51 * <p>Suppose {@code x} is a queue known to contain only strings.
578 jsr166 1.43 * The following code can be used to dump the queue into a newly
579 jsr166 1.51 * allocated array of {@code String}:
580 jsr166 1.43 *
581     * <pre>
582     * String[] y = x.toArray(new String[0]);</pre>
583     *
584 jsr166 1.51 * Note that {@code toArray(new Object[0])} is identical in function to
585     * {@code toArray()}.
586 jsr166 1.43 *
587     * @param a the array into which the elements of the queue are to
588     * be stored, if it is big enough; otherwise, a new array of the
589     * same runtime type is allocated for this purpose
590     * @return an array containing all of the elements in this queue
591     * @throws ArrayStoreException if the runtime type of the specified array
592     * is not a supertype of the runtime type of every element in
593     * this queue
594     * @throws NullPointerException if the specified array is null
595     */
596 jsr166 1.51 @SuppressWarnings("unchecked")
597 dl 1.2 public <T> T[] toArray(T[] a) {
598     fullyLock();
599     try {
600     int size = count.get();
601     if (a.length < size)
602 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
603     (a.getClass().getComponentType(), size);
604 tim 1.12
605 dl 1.2 int k = 0;
606 jsr166 1.51 for (Node<E> p = head.next; p != null; p = p.next)
607 dl 1.2 a[k++] = (T)p.item;
608 jsr166 1.47 if (a.length > k)
609     a[k] = null;
610 dl 1.2 return a;
611 tim 1.17 } finally {
612 dl 1.2 fullyUnlock();
613     }
614 tim 1.1 }
615 dl 1.2
616     public String toString() {
617     fullyLock();
618     try {
619     return super.toString();
620 tim 1.17 } finally {
621 dl 1.2 fullyUnlock();
622     }
623 tim 1.1 }
624 dl 1.2
625 dl 1.35 /**
626     * Atomically removes all of the elements from this queue.
627     * The queue will be empty after this call returns.
628     */
629 dl 1.24 public void clear() {
630     fullyLock();
631     try {
632 jsr166 1.51 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
633     h.next = h;
634     p.item = null;
635     }
636     head = last;
637     // assert head.item == null && head.next == null;
638 dl 1.24 if (count.getAndSet(0) == capacity)
639 jsr166 1.51 notFull.signal();
640 dl 1.24 } finally {
641     fullyUnlock();
642     }
643     }
644    
645 jsr166 1.43 /**
646     * @throws UnsupportedOperationException {@inheritDoc}
647     * @throws ClassCastException {@inheritDoc}
648     * @throws NullPointerException {@inheritDoc}
649     * @throws IllegalArgumentException {@inheritDoc}
650     */
651 dl 1.24 public int drainTo(Collection<? super E> c) {
652 jsr166 1.51 return drainTo(c, Integer.MAX_VALUE);
653 dl 1.24 }
654 jsr166 1.40
655 jsr166 1.43 /**
656     * @throws UnsupportedOperationException {@inheritDoc}
657     * @throws ClassCastException {@inheritDoc}
658     * @throws NullPointerException {@inheritDoc}
659     * @throws IllegalArgumentException {@inheritDoc}
660     */
661 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
662     if (c == null)
663     throw new NullPointerException();
664     if (c == this)
665     throw new IllegalArgumentException();
666 jsr166 1.51 boolean signalNotFull = false;
667     final ReentrantLock takeLock = this.takeLock;
668     takeLock.lock();
669 dl 1.24 try {
670 jsr166 1.51 int n = Math.min(maxElements, count.get());
671     // count.get provides visibility to first n Nodes
672     Node<E> h = head;
673     int i = 0;
674     try {
675     while (i < n) {
676     Node<E> p = h.next;
677     c.add(p.item);
678     p.item = null;
679     h.next = h;
680     h = p;
681     ++i;
682     }
683     return n;
684     } finally {
685     // Restore invariants even if c.add() threw
686     if (i > 0) {
687     // assert h.item == null;
688     head = h;
689     signalNotFull = (count.getAndAdd(-i) == capacity);
690     }
691 dl 1.24 }
692     } finally {
693 jsr166 1.51 takeLock.unlock();
694     if (signalNotFull)
695     signalNotFull();
696 dl 1.24 }
697     }
698    
699 dholmes 1.14 /**
700     * Returns an iterator over the elements in this queue in proper sequence.
701 jsr166 1.51 * The returned {@code Iterator} is a "weakly consistent" iterator that
702 jsr166 1.52 * will never throw {@link java.util.ConcurrentModificationException
703     * ConcurrentModificationException},
704 dl 1.15 * and guarantees to traverse elements as they existed upon
705     * construction of the iterator, and may (but is not guaranteed to)
706     * reflect any modifications subsequent to construction.
707 dholmes 1.14 *
708 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
709 dholmes 1.14 */
710 dl 1.2 public Iterator<E> iterator() {
711     return new Itr();
712 tim 1.1 }
713 dl 1.2
714     private class Itr implements Iterator<E> {
715 tim 1.12 /*
716 jsr166 1.51 * Basic weakly-consistent iterator. At all times hold the next
717 dl 1.4 * item to hand out so that if hasNext() reports true, we will
718     * still have it to return even if lost race with a take etc.
719     */
720 dl 1.31 private Node<E> current;
721     private Node<E> lastRet;
722     private E currentElement;
723 tim 1.12
724 dl 1.2 Itr() {
725 jsr166 1.51 fullyLock();
726 dl 1.2 try {
727     current = head.next;
728 dl 1.4 if (current != null)
729     currentElement = current.item;
730 tim 1.17 } finally {
731 jsr166 1.51 fullyUnlock();
732 dl 1.2 }
733     }
734 tim 1.12
735     public boolean hasNext() {
736 dl 1.2 return current != null;
737     }
738    
739 jsr166 1.51 /**
740 jsr166 1.53 * Returns the next live successor of p, or null if no such.
741     *
742     * Unlike other traversal methods, iterators need to handle both:
743 jsr166 1.51 * - dequeued nodes (p.next == p)
744 jsr166 1.53 * - (possibly multiple) interior removed nodes (p.item == null)
745 jsr166 1.51 */
746     private Node<E> nextNode(Node<E> p) {
747 jsr166 1.53 for (;;) {
748     Node<E> s = p.next;
749     if (s == p)
750     return head.next;
751     if (s == null || s.item != null)
752     return s;
753     p = s;
754     }
755 jsr166 1.51 }
756    
757 tim 1.12 public E next() {
758 jsr166 1.51 fullyLock();
759 dl 1.2 try {
760     if (current == null)
761     throw new NoSuchElementException();
762 dl 1.4 E x = currentElement;
763 dl 1.2 lastRet = current;
764 jsr166 1.51 current = nextNode(current);
765     currentElement = (current == null) ? null : current.item;
766 dl 1.2 return x;
767 tim 1.17 } finally {
768 jsr166 1.51 fullyUnlock();
769 dl 1.2 }
770     }
771    
772 tim 1.12 public void remove() {
773 dl 1.2 if (lastRet == null)
774 tim 1.12 throw new IllegalStateException();
775 jsr166 1.51 fullyLock();
776 dl 1.2 try {
777     Node<E> node = lastRet;
778     lastRet = null;
779 jsr166 1.51 for (Node<E> trail = head, p = trail.next;
780     p != null;
781     trail = p, p = p.next) {
782     if (p == node) {
783     unlink(p, trail);
784     break;
785     }
786 dl 1.2 }
787 tim 1.17 } finally {
788 jsr166 1.51 fullyUnlock();
789 dl 1.2 }
790     }
791 tim 1.1 }
792 dl 1.2
793     /**
794     * Save the state to a stream (that is, serialize it).
795     *
796     * @serialData The capacity is emitted (int), followed by all of
797 jsr166 1.51 * its elements (each an {@code Object}) in the proper order,
798 dl 1.2 * followed by a null
799 dl 1.6 * @param s the stream
800 dl 1.2 */
801     private void writeObject(java.io.ObjectOutputStream s)
802     throws java.io.IOException {
803    
804 tim 1.12 fullyLock();
805 dl 1.2 try {
806     // Write out any hidden stuff, plus capacity
807     s.defaultWriteObject();
808    
809     // Write out all elements in the proper order.
810 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
811 dl 1.2 s.writeObject(p.item);
812    
813     // Use trailing null as sentinel
814     s.writeObject(null);
815 tim 1.17 } finally {
816 dl 1.2 fullyUnlock();
817     }
818 tim 1.1 }
819    
820 dl 1.2 /**
821 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
822 dl 1.2 * deserialize it).
823 jsr166 1.51 *
824 dl 1.6 * @param s the stream
825 dl 1.2 */
826     private void readObject(java.io.ObjectInputStream s)
827     throws java.io.IOException, ClassNotFoundException {
828 tim 1.12 // Read in capacity, and any hidden stuff
829     s.defaultReadObject();
830 dl 1.2
831 dl 1.19 count.set(0);
832     last = head = new Node<E>(null);
833    
834 dl 1.6 // Read in all elements and place in queue
835 dl 1.2 for (;;) {
836 jsr166 1.51 @SuppressWarnings("unchecked")
837 dl 1.2 E item = (E)s.readObject();
838     if (item == null)
839     break;
840     add(item);
841     }
842 tim 1.1 }
843     }