ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.40
Committed: Tue Apr 26 01:17:18 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.39: +9 -9 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19 dl 1.20 * queue the shortest time. New elements
20     * are inserted at the tail of the queue, and the queue retrieval
21     * operations obtain elements at the head of the queue.
22 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
23     * less predictable performance in most concurrent applications.
24 tim 1.12 *
25 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
26 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
27     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 dl 1.3 * dynamically created upon each insertion unless this would bring the
29     * queue above capacity.
30 dholmes 1.8 *
31 dl 1.36 * <p>This class and its iterator implement all of the
32     * <em>optional</em> methods of the {@link Collection} and {@link
33 dl 1.38 * Iterator} interfaces.
34 dl 1.21 *
35 dl 1.34 * <p>This class is a member of the
36     * <a href="{@docRoot}/../guide/collections/index.html">
37     * Java Collections Framework</a>.
38     *
39 dl 1.6 * @since 1.5
40     * @author Doug Lea
41 dl 1.27 * @param <E> the type of elements held in this collection
42 tim 1.12 *
43 jsr166 1.40 */
44 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
46 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
47 tim 1.1
48 dl 1.2 /*
49     * A variant of the "two lock queue" algorithm. The putLock gates
50     * entry to put (and offer), and has an associated condition for
51     * waiting puts. Similarly for the takeLock. The "count" field
52     * that they both rely on is maintained as an atomic to avoid
53     * needing to get both locks in most cases. Also, to minimize need
54     * for puts to get takeLock and vice-versa, cascading notifies are
55     * used. When a put notices that it has enabled at least one take,
56     * it signals taker. That taker in turn signals others if more
57     * items have been entered since the signal. And symmetrically for
58 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
59 dl 1.2 * iterators acquire both locks.
60 dl 1.38 */
61 dl 1.2
62 dl 1.6 /**
63     * Linked list node class
64     */
65 dl 1.2 static class Node<E> {
66 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
67 dl 1.2 volatile E item;
68     Node<E> next;
69     Node(E x) { item = x; }
70     }
71    
72 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
73 dl 1.2 private final int capacity;
74 dl 1.6
75     /** Current number of elements */
76 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
77 dl 1.2
78 dl 1.6 /** Head of linked list */
79     private transient Node<E> head;
80    
81 dholmes 1.8 /** Tail of linked list */
82 dl 1.6 private transient Node<E> last;
83 dl 1.2
84 dl 1.6 /** Lock held by take, poll, etc */
85 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
86 dl 1.6
87     /** Wait queue for waiting takes */
88 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
89 dl 1.2
90 dl 1.6 /** Lock held by put, offer, etc */
91 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
92 dl 1.6
93     /** Wait queue for waiting puts */
94 dl 1.32 private final Condition notFull = putLock.newCondition();
95 dl 1.2
96     /**
97 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
98 dl 1.4 * otherwise ordinarily lock takeLock.)
99 dl 1.2 */
100     private void signalNotEmpty() {
101 dl 1.31 final ReentrantLock takeLock = this.takeLock;
102 dl 1.2 takeLock.lock();
103     try {
104     notEmpty.signal();
105 tim 1.17 } finally {
106 dl 1.2 takeLock.unlock();
107     }
108     }
109    
110     /**
111 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
112 dl 1.2 */
113     private void signalNotFull() {
114 dl 1.31 final ReentrantLock putLock = this.putLock;
115 dl 1.2 putLock.lock();
116     try {
117     notFull.signal();
118 tim 1.17 } finally {
119 dl 1.2 putLock.unlock();
120     }
121     }
122    
123     /**
124 jsr166 1.40 * Creates a node and links it at end of queue.
125 dl 1.6 * @param x the item
126 dl 1.2 */
127     private void insert(E x) {
128     last = last.next = new Node<E>(x);
129     }
130    
131     /**
132 jsr166 1.40 * Removes a node from head of queue,
133 dl 1.6 * @return the node
134 dl 1.2 */
135     private E extract() {
136     Node<E> first = head.next;
137     head = first;
138 dl 1.28 E x = first.item;
139 dl 1.2 first.item = null;
140     return x;
141     }
142    
143     /**
144 tim 1.12 * Lock to prevent both puts and takes.
145 dl 1.2 */
146     private void fullyLock() {
147     putLock.lock();
148     takeLock.lock();
149 tim 1.1 }
150 dl 1.2
151     /**
152 tim 1.12 * Unlock to allow both puts and takes.
153 dl 1.2 */
154     private void fullyUnlock() {
155     takeLock.unlock();
156     putLock.unlock();
157     }
158    
159    
160     /**
161 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 dholmes 1.8 * {@link Integer#MAX_VALUE}.
163 dl 1.2 */
164     public LinkedBlockingQueue() {
165     this(Integer.MAX_VALUE);
166     }
167    
168     /**
169 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170     *
171 dholmes 1.8 * @param capacity the capacity of this queue.
172     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173 tim 1.16 * than zero.
174 dl 1.2 */
175     public LinkedBlockingQueue(int capacity) {
176 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
177 dl 1.2 this.capacity = capacity;
178 dl 1.6 last = head = new Node<E>(null);
179 dl 1.2 }
180    
181     /**
182 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
184 tim 1.12 * given collection,
185 dholmes 1.8 * added in traversal order of the collection's iterator.
186 dholmes 1.9 * @param c the collection of elements to initially contain
187     * @throws NullPointerException if <tt>c</tt> or any element within it
188 jsr166 1.40 * is <tt>null</tt>.
189 dl 1.2 */
190 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
191 dl 1.2 this(Integer.MAX_VALUE);
192 dl 1.38 for (E e : c)
193     add(e);
194 dl 1.2 }
195    
196 dholmes 1.9
197 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
198     // greater in size than Integer.MAX_VALUE
199 tim 1.12 /**
200 dl 1.20 * Returns the number of elements in this queue.
201     *
202     * @return the number of elements in this queue.
203 dholmes 1.8 */
204 dl 1.2 public int size() {
205     return count.get();
206 tim 1.1 }
207 dl 1.2
208 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
209     // without the reference to unlimited queues.
210 tim 1.12 /**
211 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
212 dholmes 1.8 * the absence of memory or resource constraints) accept without
213     * blocking. This is always equal to the initial capacity of this queue
214     * less the current <tt>size</tt> of this queue.
215     * <p>Note that you <em>cannot</em> always tell if
216     * an attempt to <tt>add</tt> an element will succeed by
217     * inspecting <tt>remainingCapacity</tt> because it may be the
218     * case that a waiting consumer is ready to <tt>take</tt> an
219     * element out of an otherwise full queue.
220     */
221 dl 1.2 public int remainingCapacity() {
222     return capacity - count.get();
223     }
224    
225 dholmes 1.22 /**
226     * Adds the specified element to the tail of this queue, waiting if
227     * necessary for space to become available.
228 dl 1.23 * @param o the element to add
229     * @throws InterruptedException if interrupted while waiting.
230 dholmes 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
231     */
232 dholmes 1.14 public void put(E o) throws InterruptedException {
233     if (o == null) throw new NullPointerException();
234 dl 1.2 // Note: convention in all put/take/etc is to preset
235     // local var holding count negative to indicate failure unless set.
236 tim 1.12 int c = -1;
237 dl 1.31 final ReentrantLock putLock = this.putLock;
238     final AtomicInteger count = this.count;
239 dl 1.2 putLock.lockInterruptibly();
240     try {
241     /*
242     * Note that count is used in wait guard even though it is
243     * not protected by lock. This works because count can
244     * only decrease at this point (all other puts are shut
245     * out by lock), and we (or some other waiting put) are
246     * signalled if it ever changes from
247     * capacity. Similarly for all other uses of count in
248     * other wait guards.
249     */
250     try {
251 tim 1.12 while (count.get() == capacity)
252 dl 1.2 notFull.await();
253 tim 1.17 } catch (InterruptedException ie) {
254 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
255     throw ie;
256     }
257 dholmes 1.14 insert(o);
258 dl 1.2 c = count.getAndIncrement();
259 dl 1.6 if (c + 1 < capacity)
260 dl 1.2 notFull.signal();
261 tim 1.17 } finally {
262 dl 1.2 putLock.unlock();
263     }
264 tim 1.12 if (c == 0)
265 dl 1.2 signalNotEmpty();
266 tim 1.1 }
267 dl 1.2
268 dholmes 1.22 /**
269     * Inserts the specified element at the tail of this queue, waiting if
270     * necessary up to the specified wait time for space to become available.
271 dl 1.23 * @param o the element to add
272     * @param timeout how long to wait before giving up, in units of
273     * <tt>unit</tt>
274     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
275     * <tt>timeout</tt> parameter
276     * @return <tt>true</tt> if successful, or <tt>false</tt> if
277     * the specified waiting time elapses before space is available.
278     * @throws InterruptedException if interrupted while waiting.
279 dholmes 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
280     */
281 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
282 dholmes 1.8 throws InterruptedException {
283 tim 1.12
284 dholmes 1.14 if (o == null) throw new NullPointerException();
285 dl 1.2 long nanos = unit.toNanos(timeout);
286     int c = -1;
287 dl 1.31 final ReentrantLock putLock = this.putLock;
288     final AtomicInteger count = this.count;
289 dholmes 1.8 putLock.lockInterruptibly();
290 dl 1.2 try {
291     for (;;) {
292     if (count.get() < capacity) {
293 dholmes 1.14 insert(o);
294 dl 1.2 c = count.getAndIncrement();
295 dl 1.6 if (c + 1 < capacity)
296 dl 1.2 notFull.signal();
297     break;
298     }
299     if (nanos <= 0)
300     return false;
301     try {
302     nanos = notFull.awaitNanos(nanos);
303 tim 1.17 } catch (InterruptedException ie) {
304 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
305     throw ie;
306     }
307     }
308 tim 1.17 } finally {
309 dl 1.2 putLock.unlock();
310     }
311 tim 1.12 if (c == 0)
312 dl 1.2 signalNotEmpty();
313     return true;
314 tim 1.1 }
315 dl 1.2
316 dl 1.23 /**
317     * Inserts the specified element at the tail of this queue if possible,
318     * returning immediately if this queue is full.
319     *
320     * @param o the element to add.
321     * @return <tt>true</tt> if it was possible to add the element to
322     * this queue, else <tt>false</tt>
323 jsr166 1.40 * @throws NullPointerException if the specified element is <tt>null</tt>.
324 dl 1.23 */
325 dholmes 1.14 public boolean offer(E o) {
326     if (o == null) throw new NullPointerException();
327 dl 1.31 final AtomicInteger count = this.count;
328 dl 1.2 if (count.get() == capacity)
329     return false;
330 tim 1.12 int c = -1;
331 dl 1.31 final ReentrantLock putLock = this.putLock;
332 dholmes 1.8 putLock.lock();
333 dl 1.2 try {
334     if (count.get() < capacity) {
335 dholmes 1.14 insert(o);
336 dl 1.2 c = count.getAndIncrement();
337 dl 1.6 if (c + 1 < capacity)
338 dl 1.2 notFull.signal();
339     }
340 tim 1.17 } finally {
341 dl 1.2 putLock.unlock();
342     }
343 tim 1.12 if (c == 0)
344 dl 1.2 signalNotEmpty();
345     return c >= 0;
346 tim 1.1 }
347 dl 1.2
348    
349     public E take() throws InterruptedException {
350     E x;
351     int c = -1;
352 dl 1.31 final AtomicInteger count = this.count;
353     final ReentrantLock takeLock = this.takeLock;
354 dl 1.2 takeLock.lockInterruptibly();
355     try {
356     try {
357 tim 1.12 while (count.get() == 0)
358 dl 1.2 notEmpty.await();
359 tim 1.17 } catch (InterruptedException ie) {
360 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
361     throw ie;
362     }
363    
364     x = extract();
365     c = count.getAndDecrement();
366     if (c > 1)
367     notEmpty.signal();
368 tim 1.17 } finally {
369 dl 1.2 takeLock.unlock();
370     }
371 tim 1.12 if (c == capacity)
372 dl 1.2 signalNotFull();
373     return x;
374     }
375    
376     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
377     E x = null;
378     int c = -1;
379 dholmes 1.8 long nanos = unit.toNanos(timeout);
380 dl 1.31 final AtomicInteger count = this.count;
381     final ReentrantLock takeLock = this.takeLock;
382 dl 1.2 takeLock.lockInterruptibly();
383     try {
384     for (;;) {
385     if (count.get() > 0) {
386     x = extract();
387     c = count.getAndDecrement();
388     if (c > 1)
389     notEmpty.signal();
390     break;
391     }
392     if (nanos <= 0)
393     return null;
394     try {
395     nanos = notEmpty.awaitNanos(nanos);
396 tim 1.17 } catch (InterruptedException ie) {
397 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
398     throw ie;
399     }
400     }
401 tim 1.17 } finally {
402 dl 1.2 takeLock.unlock();
403     }
404 tim 1.12 if (c == capacity)
405 dl 1.2 signalNotFull();
406     return x;
407     }
408    
409     public E poll() {
410 dl 1.31 final AtomicInteger count = this.count;
411 dl 1.2 if (count.get() == 0)
412     return null;
413     E x = null;
414 tim 1.12 int c = -1;
415 dl 1.31 final ReentrantLock takeLock = this.takeLock;
416 dl 1.30 takeLock.lock();
417 dl 1.2 try {
418     if (count.get() > 0) {
419     x = extract();
420     c = count.getAndDecrement();
421     if (c > 1)
422     notEmpty.signal();
423     }
424 tim 1.17 } finally {
425 dl 1.2 takeLock.unlock();
426     }
427 tim 1.12 if (c == capacity)
428 dl 1.2 signalNotFull();
429     return x;
430 tim 1.1 }
431 dl 1.2
432    
433     public E peek() {
434     if (count.get() == 0)
435     return null;
436 dl 1.31 final ReentrantLock takeLock = this.takeLock;
437 dholmes 1.8 takeLock.lock();
438 dl 1.2 try {
439     Node<E> first = head.next;
440     if (first == null)
441     return null;
442     else
443     return first.item;
444 tim 1.17 } finally {
445 dl 1.2 takeLock.unlock();
446     }
447 tim 1.1 }
448    
449 dl 1.35 /**
450     * Removes a single instance of the specified element from this
451 dl 1.36 * queue, if it is present.
452 dl 1.35 */
453 dholmes 1.9 public boolean remove(Object o) {
454     if (o == null) return false;
455 dl 1.2 boolean removed = false;
456     fullyLock();
457     try {
458     Node<E> trail = head;
459     Node<E> p = head.next;
460     while (p != null) {
461 dholmes 1.9 if (o.equals(p.item)) {
462 dl 1.2 removed = true;
463     break;
464     }
465     trail = p;
466     p = p.next;
467     }
468     if (removed) {
469     p.item = null;
470     trail.next = p.next;
471 dl 1.39 if (last == p)
472     last = trail;
473 dl 1.2 if (count.getAndDecrement() == capacity)
474     notFull.signalAll();
475     }
476 tim 1.17 } finally {
477 dl 1.2 fullyUnlock();
478     }
479     return removed;
480 tim 1.1 }
481 dl 1.2
482     public Object[] toArray() {
483     fullyLock();
484     try {
485     int size = count.get();
486 tim 1.12 Object[] a = new Object[size];
487 dl 1.2 int k = 0;
488 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
489 dl 1.2 a[k++] = p.item;
490     return a;
491 tim 1.17 } finally {
492 dl 1.2 fullyUnlock();
493     }
494 tim 1.1 }
495 dl 1.2
496     public <T> T[] toArray(T[] a) {
497     fullyLock();
498     try {
499     int size = count.get();
500     if (a.length < size)
501 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
502     (a.getClass().getComponentType(), size);
503 tim 1.12
504 dl 1.2 int k = 0;
505 tim 1.12 for (Node p = head.next; p != null; p = p.next)
506 dl 1.2 a[k++] = (T)p.item;
507     return a;
508 tim 1.17 } finally {
509 dl 1.2 fullyUnlock();
510     }
511 tim 1.1 }
512 dl 1.2
513     public String toString() {
514     fullyLock();
515     try {
516     return super.toString();
517 tim 1.17 } finally {
518 dl 1.2 fullyUnlock();
519     }
520 tim 1.1 }
521 dl 1.2
522 dl 1.35 /**
523     * Atomically removes all of the elements from this queue.
524     * The queue will be empty after this call returns.
525     */
526 dl 1.24 public void clear() {
527     fullyLock();
528     try {
529     head.next = null;
530 dl 1.38 assert head.item == null;
531     last = head;
532 dl 1.24 if (count.getAndSet(0) == capacity)
533     notFull.signalAll();
534     } finally {
535     fullyUnlock();
536     }
537     }
538    
539     public int drainTo(Collection<? super E> c) {
540     if (c == null)
541     throw new NullPointerException();
542     if (c == this)
543     throw new IllegalArgumentException();
544     Node first;
545     fullyLock();
546     try {
547     first = head.next;
548     head.next = null;
549 dl 1.38 assert head.item == null;
550     last = head;
551 dl 1.24 if (count.getAndSet(0) == capacity)
552     notFull.signalAll();
553     } finally {
554     fullyUnlock();
555     }
556     // Transfer the elements outside of locks
557     int n = 0;
558 dl 1.29 for (Node<E> p = first; p != null; p = p.next) {
559     c.add(p.item);
560 dl 1.24 p.item = null;
561     ++n;
562     }
563     return n;
564     }
565 jsr166 1.40
566 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
567     if (c == null)
568     throw new NullPointerException();
569     if (c == this)
570     throw new IllegalArgumentException();
571     fullyLock();
572     try {
573     int n = 0;
574 dl 1.29 Node<E> p = head.next;
575 dl 1.24 while (p != null && n < maxElements) {
576 dl 1.29 c.add(p.item);
577 dl 1.24 p.item = null;
578     p = p.next;
579     ++n;
580     }
581     if (n != 0) {
582     head.next = p;
583 dl 1.38 assert head.item == null;
584     if (p == null)
585     last = head;
586 dl 1.24 if (count.getAndAdd(-n) == capacity)
587     notFull.signalAll();
588     }
589     return n;
590     } finally {
591     fullyUnlock();
592     }
593     }
594    
595 dholmes 1.14 /**
596     * Returns an iterator over the elements in this queue in proper sequence.
597 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
598 jsr166 1.40 * will never throw {@link ConcurrentModificationException},
599 dl 1.15 * and guarantees to traverse elements as they existed upon
600     * construction of the iterator, and may (but is not guaranteed to)
601     * reflect any modifications subsequent to construction.
602 dholmes 1.14 *
603     * @return an iterator over the elements in this queue in proper sequence.
604     */
605 dl 1.2 public Iterator<E> iterator() {
606     return new Itr();
607 tim 1.1 }
608 dl 1.2
609     private class Itr implements Iterator<E> {
610 tim 1.12 /*
611 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
612     * item to hand out so that if hasNext() reports true, we will
613     * still have it to return even if lost race with a take etc.
614     */
615 dl 1.31 private Node<E> current;
616     private Node<E> lastRet;
617     private E currentElement;
618 tim 1.12
619 dl 1.2 Itr() {
620 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
621     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
622     putLock.lock();
623     takeLock.lock();
624 dl 1.2 try {
625     current = head.next;
626 dl 1.4 if (current != null)
627     currentElement = current.item;
628 tim 1.17 } finally {
629 dl 1.31 takeLock.unlock();
630     putLock.unlock();
631 dl 1.2 }
632     }
633 tim 1.12
634     public boolean hasNext() {
635 dl 1.2 return current != null;
636     }
637    
638 tim 1.12 public E next() {
639 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
640     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
641     putLock.lock();
642     takeLock.lock();
643 dl 1.2 try {
644     if (current == null)
645     throw new NoSuchElementException();
646 dl 1.4 E x = currentElement;
647 dl 1.2 lastRet = current;
648     current = current.next;
649 dl 1.4 if (current != null)
650     currentElement = current.item;
651 dl 1.2 return x;
652 tim 1.17 } finally {
653 dl 1.31 takeLock.unlock();
654     putLock.unlock();
655 dl 1.2 }
656     }
657    
658 tim 1.12 public void remove() {
659 dl 1.2 if (lastRet == null)
660 tim 1.12 throw new IllegalStateException();
661 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
662     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
663     putLock.lock();
664     takeLock.lock();
665 dl 1.2 try {
666     Node<E> node = lastRet;
667     lastRet = null;
668     Node<E> trail = head;
669     Node<E> p = head.next;
670     while (p != null && p != node) {
671     trail = p;
672     p = p.next;
673     }
674     if (p == node) {
675     p.item = null;
676     trail.next = p.next;
677 dl 1.39 if (last == p)
678     last = trail;
679 dl 1.2 int c = count.getAndDecrement();
680     if (c == capacity)
681     notFull.signalAll();
682     }
683 tim 1.17 } finally {
684 dl 1.31 takeLock.unlock();
685     putLock.unlock();
686 dl 1.2 }
687     }
688 tim 1.1 }
689 dl 1.2
690     /**
691     * Save the state to a stream (that is, serialize it).
692     *
693     * @serialData The capacity is emitted (int), followed by all of
694     * its elements (each an <tt>Object</tt>) in the proper order,
695     * followed by a null
696 dl 1.6 * @param s the stream
697 dl 1.2 */
698     private void writeObject(java.io.ObjectOutputStream s)
699     throws java.io.IOException {
700    
701 tim 1.12 fullyLock();
702 dl 1.2 try {
703     // Write out any hidden stuff, plus capacity
704     s.defaultWriteObject();
705    
706     // Write out all elements in the proper order.
707 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
708 dl 1.2 s.writeObject(p.item);
709    
710     // Use trailing null as sentinel
711     s.writeObject(null);
712 tim 1.17 } finally {
713 dl 1.2 fullyUnlock();
714     }
715 tim 1.1 }
716    
717 dl 1.2 /**
718 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
719 dl 1.2 * deserialize it).
720 dl 1.6 * @param s the stream
721 dl 1.2 */
722     private void readObject(java.io.ObjectInputStream s)
723     throws java.io.IOException, ClassNotFoundException {
724 tim 1.12 // Read in capacity, and any hidden stuff
725     s.defaultReadObject();
726 dl 1.2
727 dl 1.19 count.set(0);
728     last = head = new Node<E>(null);
729    
730 dl 1.6 // Read in all elements and place in queue
731 dl 1.2 for (;;) {
732     E item = (E)s.readObject();
733     if (item == null)
734     break;
735     add(item);
736     }
737 tim 1.1 }
738     }