ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.46
Committed: Wed May 25 14:05:27 2005 UTC (19 years ago) by dl
Branch: MAIN
Changes since 1.45: +1 -1 lines
Log Message:
Avoid generics warnings; clarify javadocs

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19 dl 1.20 * queue the shortest time. New elements
20     * are inserted at the tail of the queue, and the queue retrieval
21     * operations obtain elements at the head of the queue.
22 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
23     * less predictable performance in most concurrent applications.
24 tim 1.12 *
25 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
26 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
27     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 dl 1.3 * dynamically created upon each insertion unless this would bring the
29     * queue above capacity.
30 dholmes 1.8 *
31 dl 1.36 * <p>This class and its iterator implement all of the
32     * <em>optional</em> methods of the {@link Collection} and {@link
33 dl 1.38 * Iterator} interfaces.
34 dl 1.21 *
35 dl 1.34 * <p>This class is a member of the
36     * <a href="{@docRoot}/../guide/collections/index.html">
37     * Java Collections Framework</a>.
38     *
39 dl 1.6 * @since 1.5
40     * @author Doug Lea
41 dl 1.27 * @param <E> the type of elements held in this collection
42 tim 1.12 *
43 jsr166 1.40 */
44 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
46 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
47 tim 1.1
48 dl 1.2 /*
49     * A variant of the "two lock queue" algorithm. The putLock gates
50     * entry to put (and offer), and has an associated condition for
51     * waiting puts. Similarly for the takeLock. The "count" field
52     * that they both rely on is maintained as an atomic to avoid
53     * needing to get both locks in most cases. Also, to minimize need
54     * for puts to get takeLock and vice-versa, cascading notifies are
55     * used. When a put notices that it has enabled at least one take,
56     * it signals taker. That taker in turn signals others if more
57     * items have been entered since the signal. And symmetrically for
58 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
59 dl 1.2 * iterators acquire both locks.
60 dl 1.38 */
61 dl 1.2
62 dl 1.6 /**
63     * Linked list node class
64     */
65 dl 1.2 static class Node<E> {
66 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
67 dl 1.2 volatile E item;
68     Node<E> next;
69     Node(E x) { item = x; }
70     }
71    
72 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
73 dl 1.2 private final int capacity;
74 dl 1.6
75     /** Current number of elements */
76 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
77 dl 1.2
78 dl 1.6 /** Head of linked list */
79     private transient Node<E> head;
80    
81 dholmes 1.8 /** Tail of linked list */
82 dl 1.6 private transient Node<E> last;
83 dl 1.2
84 dl 1.6 /** Lock held by take, poll, etc */
85 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
86 dl 1.6
87     /** Wait queue for waiting takes */
88 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
89 dl 1.2
90 dl 1.6 /** Lock held by put, offer, etc */
91 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
92 dl 1.6
93     /** Wait queue for waiting puts */
94 dl 1.32 private final Condition notFull = putLock.newCondition();
95 dl 1.2
96     /**
97 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
98 dl 1.4 * otherwise ordinarily lock takeLock.)
99 dl 1.2 */
100     private void signalNotEmpty() {
101 dl 1.31 final ReentrantLock takeLock = this.takeLock;
102 dl 1.2 takeLock.lock();
103     try {
104     notEmpty.signal();
105 tim 1.17 } finally {
106 dl 1.2 takeLock.unlock();
107     }
108     }
109    
110     /**
111 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
112 dl 1.2 */
113     private void signalNotFull() {
114 dl 1.31 final ReentrantLock putLock = this.putLock;
115 dl 1.2 putLock.lock();
116     try {
117     notFull.signal();
118 tim 1.17 } finally {
119 dl 1.2 putLock.unlock();
120     }
121     }
122    
123     /**
124 jsr166 1.40 * Creates a node and links it at end of queue.
125 dl 1.6 * @param x the item
126 dl 1.2 */
127     private void insert(E x) {
128     last = last.next = new Node<E>(x);
129     }
130    
131     /**
132 jsr166 1.40 * Removes a node from head of queue,
133 dl 1.6 * @return the node
134 dl 1.2 */
135     private E extract() {
136     Node<E> first = head.next;
137     head = first;
138 dl 1.28 E x = first.item;
139 dl 1.2 first.item = null;
140     return x;
141     }
142    
143     /**
144 tim 1.12 * Lock to prevent both puts and takes.
145 dl 1.2 */
146     private void fullyLock() {
147     putLock.lock();
148     takeLock.lock();
149 tim 1.1 }
150 dl 1.2
151     /**
152 tim 1.12 * Unlock to allow both puts and takes.
153 dl 1.2 */
154     private void fullyUnlock() {
155     takeLock.unlock();
156     putLock.unlock();
157     }
158    
159    
160     /**
161 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 dholmes 1.8 * {@link Integer#MAX_VALUE}.
163 dl 1.2 */
164     public LinkedBlockingQueue() {
165     this(Integer.MAX_VALUE);
166     }
167    
168     /**
169 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170     *
171 jsr166 1.43 * @param capacity the capacity of this queue
172 dholmes 1.8 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173 jsr166 1.43 * than zero
174 dl 1.2 */
175     public LinkedBlockingQueue(int capacity) {
176 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
177 dl 1.2 this.capacity = capacity;
178 dl 1.6 last = head = new Node<E>(null);
179 dl 1.2 }
180    
181     /**
182 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
184 tim 1.12 * given collection,
185 dholmes 1.8 * added in traversal order of the collection's iterator.
186 jsr166 1.43 *
187 dholmes 1.9 * @param c the collection of elements to initially contain
188 jsr166 1.43 * @throws NullPointerException if the specified collection or any
189     * of its elements are null
190 dl 1.2 */
191 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
192 dl 1.2 this(Integer.MAX_VALUE);
193 dl 1.38 for (E e : c)
194     add(e);
195 dl 1.2 }
196    
197 dholmes 1.9
198 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
199     // greater in size than Integer.MAX_VALUE
200 tim 1.12 /**
201 dl 1.20 * Returns the number of elements in this queue.
202     *
203 jsr166 1.43 * @return the number of elements in this queue
204 dholmes 1.8 */
205 dl 1.2 public int size() {
206     return count.get();
207 tim 1.1 }
208 dl 1.2
209 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
210     // without the reference to unlimited queues.
211 tim 1.12 /**
212 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
213     * (in the absence of memory or resource constraints) accept without
214 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
215     * less the current <tt>size</tt> of this queue.
216 jsr166 1.41 *
217     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
218     * an element will succeed by inspecting <tt>remainingCapacity</tt>
219     * because it may be the case that another thread is about to
220 jsr166 1.43 * insert or remove an element.
221 dholmes 1.8 */
222 dl 1.2 public int remainingCapacity() {
223     return capacity - count.get();
224     }
225    
226 dholmes 1.22 /**
227 jsr166 1.44 * Inserts the specified element at the tail of this queue, waiting if
228 dholmes 1.22 * necessary for space to become available.
229 jsr166 1.43 *
230     * @throws InterruptedException {@inheritDoc}
231     * @throws NullPointerException {@inheritDoc}
232 dholmes 1.22 */
233 jsr166 1.42 public void put(E e) throws InterruptedException {
234     if (e == null) throw new NullPointerException();
235 dl 1.2 // Note: convention in all put/take/etc is to preset
236     // local var holding count negative to indicate failure unless set.
237 tim 1.12 int c = -1;
238 dl 1.31 final ReentrantLock putLock = this.putLock;
239     final AtomicInteger count = this.count;
240 dl 1.2 putLock.lockInterruptibly();
241     try {
242     /*
243     * Note that count is used in wait guard even though it is
244     * not protected by lock. This works because count can
245     * only decrease at this point (all other puts are shut
246     * out by lock), and we (or some other waiting put) are
247     * signalled if it ever changes from
248     * capacity. Similarly for all other uses of count in
249     * other wait guards.
250     */
251     try {
252 tim 1.12 while (count.get() == capacity)
253 dl 1.2 notFull.await();
254 tim 1.17 } catch (InterruptedException ie) {
255 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
256     throw ie;
257     }
258 jsr166 1.42 insert(e);
259 dl 1.2 c = count.getAndIncrement();
260 dl 1.6 if (c + 1 < capacity)
261 dl 1.2 notFull.signal();
262 tim 1.17 } finally {
263 dl 1.2 putLock.unlock();
264     }
265 tim 1.12 if (c == 0)
266 dl 1.2 signalNotEmpty();
267 tim 1.1 }
268 dl 1.2
269 dholmes 1.22 /**
270     * Inserts the specified element at the tail of this queue, waiting if
271     * necessary up to the specified wait time for space to become available.
272 jsr166 1.43 *
273 dl 1.23 * @return <tt>true</tt> if successful, or <tt>false</tt> if
274 jsr166 1.43 * the specified waiting time elapses before space is available.
275     * @throws InterruptedException {@inheritDoc}
276     * @throws NullPointerException {@inheritDoc}
277 dholmes 1.22 */
278 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
279 dholmes 1.8 throws InterruptedException {
280 tim 1.12
281 jsr166 1.42 if (e == null) throw new NullPointerException();
282 dl 1.2 long nanos = unit.toNanos(timeout);
283     int c = -1;
284 dl 1.31 final ReentrantLock putLock = this.putLock;
285     final AtomicInteger count = this.count;
286 dholmes 1.8 putLock.lockInterruptibly();
287 dl 1.2 try {
288     for (;;) {
289     if (count.get() < capacity) {
290 jsr166 1.42 insert(e);
291 dl 1.2 c = count.getAndIncrement();
292 dl 1.6 if (c + 1 < capacity)
293 dl 1.2 notFull.signal();
294     break;
295     }
296     if (nanos <= 0)
297     return false;
298     try {
299     nanos = notFull.awaitNanos(nanos);
300 tim 1.17 } catch (InterruptedException ie) {
301 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
302     throw ie;
303     }
304     }
305 tim 1.17 } finally {
306 dl 1.2 putLock.unlock();
307     }
308 tim 1.12 if (c == 0)
309 dl 1.2 signalNotEmpty();
310     return true;
311 tim 1.1 }
312 dl 1.2
313 dl 1.23 /**
314 jsr166 1.44 * Inserts the specified element at the tail of this queue if it is
315     * possible to do so immediately without exceeding the queue's capacity,
316     * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
317     * is full.
318     * When using a capacity-restricted queue, this method is generally
319     * preferable to method {@link BlockingQueue#add add}, which can fail to
320     * insert an element only by throwing an exception.
321 dl 1.23 *
322 jsr166 1.43 * @throws NullPointerException if the specified element is null
323 dl 1.23 */
324 jsr166 1.42 public boolean offer(E e) {
325     if (e == null) throw new NullPointerException();
326 dl 1.31 final AtomicInteger count = this.count;
327 dl 1.2 if (count.get() == capacity)
328     return false;
329 tim 1.12 int c = -1;
330 dl 1.31 final ReentrantLock putLock = this.putLock;
331 dholmes 1.8 putLock.lock();
332 dl 1.2 try {
333     if (count.get() < capacity) {
334 jsr166 1.42 insert(e);
335 dl 1.2 c = count.getAndIncrement();
336 dl 1.6 if (c + 1 < capacity)
337 dl 1.2 notFull.signal();
338     }
339 tim 1.17 } finally {
340 dl 1.2 putLock.unlock();
341     }
342 tim 1.12 if (c == 0)
343 dl 1.2 signalNotEmpty();
344     return c >= 0;
345 tim 1.1 }
346 dl 1.2
347    
348     public E take() throws InterruptedException {
349     E x;
350     int c = -1;
351 dl 1.31 final AtomicInteger count = this.count;
352     final ReentrantLock takeLock = this.takeLock;
353 dl 1.2 takeLock.lockInterruptibly();
354     try {
355     try {
356 tim 1.12 while (count.get() == 0)
357 dl 1.2 notEmpty.await();
358 tim 1.17 } catch (InterruptedException ie) {
359 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
360     throw ie;
361     }
362    
363     x = extract();
364     c = count.getAndDecrement();
365     if (c > 1)
366     notEmpty.signal();
367 tim 1.17 } finally {
368 dl 1.2 takeLock.unlock();
369     }
370 tim 1.12 if (c == capacity)
371 dl 1.2 signalNotFull();
372     return x;
373     }
374    
375     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376     E x = null;
377     int c = -1;
378 dholmes 1.8 long nanos = unit.toNanos(timeout);
379 dl 1.31 final AtomicInteger count = this.count;
380     final ReentrantLock takeLock = this.takeLock;
381 dl 1.2 takeLock.lockInterruptibly();
382     try {
383     for (;;) {
384     if (count.get() > 0) {
385     x = extract();
386     c = count.getAndDecrement();
387     if (c > 1)
388     notEmpty.signal();
389     break;
390     }
391     if (nanos <= 0)
392     return null;
393     try {
394     nanos = notEmpty.awaitNanos(nanos);
395 tim 1.17 } catch (InterruptedException ie) {
396 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
397     throw ie;
398     }
399     }
400 tim 1.17 } finally {
401 dl 1.2 takeLock.unlock();
402     }
403 tim 1.12 if (c == capacity)
404 dl 1.2 signalNotFull();
405     return x;
406     }
407    
408     public E poll() {
409 dl 1.31 final AtomicInteger count = this.count;
410 dl 1.2 if (count.get() == 0)
411     return null;
412     E x = null;
413 tim 1.12 int c = -1;
414 dl 1.31 final ReentrantLock takeLock = this.takeLock;
415 dl 1.30 takeLock.lock();
416 dl 1.2 try {
417     if (count.get() > 0) {
418     x = extract();
419     c = count.getAndDecrement();
420     if (c > 1)
421     notEmpty.signal();
422     }
423 tim 1.17 } finally {
424 dl 1.2 takeLock.unlock();
425     }
426 tim 1.12 if (c == capacity)
427 dl 1.2 signalNotFull();
428     return x;
429 tim 1.1 }
430 dl 1.2
431    
432     public E peek() {
433     if (count.get() == 0)
434     return null;
435 dl 1.31 final ReentrantLock takeLock = this.takeLock;
436 dholmes 1.8 takeLock.lock();
437 dl 1.2 try {
438     Node<E> first = head.next;
439     if (first == null)
440     return null;
441     else
442     return first.item;
443 tim 1.17 } finally {
444 dl 1.2 takeLock.unlock();
445     }
446 tim 1.1 }
447    
448 dl 1.35 /**
449 jsr166 1.44 * Removes a single instance of the specified element from this queue,
450     * if it is present. More formally, removes an element <tt>e</tt> such
451     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
452     * elements.
453     * Returns <tt>true</tt> if this queue contained the specified element
454     * (or equivalently, if this queue changed as a result of the call).
455     *
456     * @param o element to be removed from this queue, if present
457     * @return <tt>true</tt> if this queue changed as a result of the call
458 dl 1.35 */
459 dholmes 1.9 public boolean remove(Object o) {
460     if (o == null) return false;
461 dl 1.2 boolean removed = false;
462     fullyLock();
463     try {
464     Node<E> trail = head;
465     Node<E> p = head.next;
466     while (p != null) {
467 dholmes 1.9 if (o.equals(p.item)) {
468 dl 1.2 removed = true;
469     break;
470     }
471     trail = p;
472     p = p.next;
473     }
474     if (removed) {
475     p.item = null;
476     trail.next = p.next;
477 dl 1.39 if (last == p)
478     last = trail;
479 dl 1.2 if (count.getAndDecrement() == capacity)
480     notFull.signalAll();
481     }
482 tim 1.17 } finally {
483 dl 1.2 fullyUnlock();
484     }
485     return removed;
486 tim 1.1 }
487 dl 1.2
488 jsr166 1.43 /**
489     * Returns an array containing all of the elements in this queue, in
490     * proper sequence.
491     *
492     * <p>The returned array will be "safe" in that no references to it are
493     * maintained by this queue. (In other words, this method must allocate
494     * a new array). The caller is thus free to modify the returned array.
495 jsr166 1.45 *
496 jsr166 1.43 * <p>This method acts as bridge between array-based and collection-based
497     * APIs.
498     *
499     * @return an array containing all of the elements in this queue
500     */
501 dl 1.2 public Object[] toArray() {
502     fullyLock();
503     try {
504     int size = count.get();
505 tim 1.12 Object[] a = new Object[size];
506 dl 1.2 int k = 0;
507 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
508 dl 1.2 a[k++] = p.item;
509     return a;
510 tim 1.17 } finally {
511 dl 1.2 fullyUnlock();
512     }
513 tim 1.1 }
514 dl 1.2
515 jsr166 1.43 /**
516     * Returns an array containing all of the elements in this queue, in
517     * proper sequence; the runtime type of the returned array is that of
518     * the specified array. If the queue fits in the specified array, it
519     * is returned therein. Otherwise, a new array is allocated with the
520     * runtime type of the specified array and the size of this queue.
521     *
522     * <p>If this queue fits in the specified array with room to spare
523     * (i.e., the array has more elements than this queue), the element in
524     * the array immediately following the end of the queue is set to
525     * <tt>null</tt>.
526     *
527     * <p>Like the {@link #toArray()} method, this method acts as bridge between
528     * array-based and collection-based APIs. Further, this method allows
529     * precise control over the runtime type of the output array, and may,
530     * under certain circumstances, be used to save allocation costs.
531     *
532     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
533     * The following code can be used to dump the queue into a newly
534     * allocated array of <tt>String</tt>:
535     *
536     * <pre>
537     * String[] y = x.toArray(new String[0]);</pre>
538     *
539     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
540     * <tt>toArray()</tt>.
541     *
542     * @param a the array into which the elements of the queue are to
543     * be stored, if it is big enough; otherwise, a new array of the
544     * same runtime type is allocated for this purpose
545     * @return an array containing all of the elements in this queue
546     * @throws ArrayStoreException if the runtime type of the specified array
547     * is not a supertype of the runtime type of every element in
548     * this queue
549     * @throws NullPointerException if the specified array is null
550     */
551 dl 1.2 public <T> T[] toArray(T[] a) {
552     fullyLock();
553     try {
554     int size = count.get();
555     if (a.length < size)
556 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
557     (a.getClass().getComponentType(), size);
558 tim 1.12
559 dl 1.2 int k = 0;
560 tim 1.12 for (Node p = head.next; p != null; p = p.next)
561 dl 1.2 a[k++] = (T)p.item;
562     return a;
563 tim 1.17 } finally {
564 dl 1.2 fullyUnlock();
565     }
566 tim 1.1 }
567 dl 1.2
568     public String toString() {
569     fullyLock();
570     try {
571     return super.toString();
572 tim 1.17 } finally {
573 dl 1.2 fullyUnlock();
574     }
575 tim 1.1 }
576 dl 1.2
577 dl 1.35 /**
578     * Atomically removes all of the elements from this queue.
579     * The queue will be empty after this call returns.
580     */
581 dl 1.24 public void clear() {
582     fullyLock();
583     try {
584     head.next = null;
585 dl 1.38 assert head.item == null;
586     last = head;
587 dl 1.24 if (count.getAndSet(0) == capacity)
588     notFull.signalAll();
589     } finally {
590     fullyUnlock();
591     }
592     }
593    
594 jsr166 1.43 /**
595     * @throws UnsupportedOperationException {@inheritDoc}
596     * @throws ClassCastException {@inheritDoc}
597     * @throws NullPointerException {@inheritDoc}
598     * @throws IllegalArgumentException {@inheritDoc}
599     */
600 dl 1.24 public int drainTo(Collection<? super E> c) {
601     if (c == null)
602     throw new NullPointerException();
603     if (c == this)
604     throw new IllegalArgumentException();
605 dl 1.46 Node<E> first;
606 dl 1.24 fullyLock();
607     try {
608     first = head.next;
609     head.next = null;
610 dl 1.38 assert head.item == null;
611     last = head;
612 dl 1.24 if (count.getAndSet(0) == capacity)
613     notFull.signalAll();
614     } finally {
615     fullyUnlock();
616     }
617     // Transfer the elements outside of locks
618     int n = 0;
619 dl 1.29 for (Node<E> p = first; p != null; p = p.next) {
620     c.add(p.item);
621 dl 1.24 p.item = null;
622     ++n;
623     }
624     return n;
625     }
626 jsr166 1.40
627 jsr166 1.43 /**
628     * @throws UnsupportedOperationException {@inheritDoc}
629     * @throws ClassCastException {@inheritDoc}
630     * @throws NullPointerException {@inheritDoc}
631     * @throws IllegalArgumentException {@inheritDoc}
632     */
633 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
634     if (c == null)
635     throw new NullPointerException();
636     if (c == this)
637     throw new IllegalArgumentException();
638     fullyLock();
639     try {
640     int n = 0;
641 dl 1.29 Node<E> p = head.next;
642 dl 1.24 while (p != null && n < maxElements) {
643 dl 1.29 c.add(p.item);
644 dl 1.24 p.item = null;
645     p = p.next;
646     ++n;
647     }
648     if (n != 0) {
649     head.next = p;
650 dl 1.38 assert head.item == null;
651     if (p == null)
652     last = head;
653 dl 1.24 if (count.getAndAdd(-n) == capacity)
654     notFull.signalAll();
655     }
656     return n;
657     } finally {
658     fullyUnlock();
659     }
660     }
661    
662 dholmes 1.14 /**
663     * Returns an iterator over the elements in this queue in proper sequence.
664 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
665 jsr166 1.40 * will never throw {@link ConcurrentModificationException},
666 dl 1.15 * and guarantees to traverse elements as they existed upon
667     * construction of the iterator, and may (but is not guaranteed to)
668     * reflect any modifications subsequent to construction.
669 dholmes 1.14 *
670 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
671 dholmes 1.14 */
672 dl 1.2 public Iterator<E> iterator() {
673     return new Itr();
674 tim 1.1 }
675 dl 1.2
676     private class Itr implements Iterator<E> {
677 tim 1.12 /*
678 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
679     * item to hand out so that if hasNext() reports true, we will
680     * still have it to return even if lost race with a take etc.
681     */
682 dl 1.31 private Node<E> current;
683     private Node<E> lastRet;
684     private E currentElement;
685 tim 1.12
686 dl 1.2 Itr() {
687 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
688     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
689     putLock.lock();
690     takeLock.lock();
691 dl 1.2 try {
692     current = head.next;
693 dl 1.4 if (current != null)
694     currentElement = current.item;
695 tim 1.17 } finally {
696 dl 1.31 takeLock.unlock();
697     putLock.unlock();
698 dl 1.2 }
699     }
700 tim 1.12
701     public boolean hasNext() {
702 dl 1.2 return current != null;
703     }
704    
705 tim 1.12 public E next() {
706 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
707     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
708     putLock.lock();
709     takeLock.lock();
710 dl 1.2 try {
711     if (current == null)
712     throw new NoSuchElementException();
713 dl 1.4 E x = currentElement;
714 dl 1.2 lastRet = current;
715     current = current.next;
716 dl 1.4 if (current != null)
717     currentElement = current.item;
718 dl 1.2 return x;
719 tim 1.17 } finally {
720 dl 1.31 takeLock.unlock();
721     putLock.unlock();
722 dl 1.2 }
723     }
724    
725 tim 1.12 public void remove() {
726 dl 1.2 if (lastRet == null)
727 tim 1.12 throw new IllegalStateException();
728 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
729     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
730     putLock.lock();
731     takeLock.lock();
732 dl 1.2 try {
733     Node<E> node = lastRet;
734     lastRet = null;
735     Node<E> trail = head;
736     Node<E> p = head.next;
737     while (p != null && p != node) {
738     trail = p;
739     p = p.next;
740     }
741     if (p == node) {
742     p.item = null;
743     trail.next = p.next;
744 dl 1.39 if (last == p)
745     last = trail;
746 dl 1.2 int c = count.getAndDecrement();
747     if (c == capacity)
748     notFull.signalAll();
749     }
750 tim 1.17 } finally {
751 dl 1.31 takeLock.unlock();
752     putLock.unlock();
753 dl 1.2 }
754     }
755 tim 1.1 }
756 dl 1.2
757     /**
758     * Save the state to a stream (that is, serialize it).
759     *
760     * @serialData The capacity is emitted (int), followed by all of
761     * its elements (each an <tt>Object</tt>) in the proper order,
762     * followed by a null
763 dl 1.6 * @param s the stream
764 dl 1.2 */
765     private void writeObject(java.io.ObjectOutputStream s)
766     throws java.io.IOException {
767    
768 tim 1.12 fullyLock();
769 dl 1.2 try {
770     // Write out any hidden stuff, plus capacity
771     s.defaultWriteObject();
772    
773     // Write out all elements in the proper order.
774 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
775 dl 1.2 s.writeObject(p.item);
776    
777     // Use trailing null as sentinel
778     s.writeObject(null);
779 tim 1.17 } finally {
780 dl 1.2 fullyUnlock();
781     }
782 tim 1.1 }
783    
784 dl 1.2 /**
785 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
786 dl 1.2 * deserialize it).
787 dl 1.6 * @param s the stream
788 dl 1.2 */
789     private void readObject(java.io.ObjectInputStream s)
790     throws java.io.IOException, ClassNotFoundException {
791 tim 1.12 // Read in capacity, and any hidden stuff
792     s.defaultReadObject();
793 dl 1.2
794 dl 1.19 count.set(0);
795     last = head = new Node<E>(null);
796    
797 dl 1.6 // Read in all elements and place in queue
798 dl 1.2 for (;;) {
799     E item = (E)s.readObject();
800     if (item == null)
801     break;
802     add(item);
803     }
804 tim 1.1 }
805     }