ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.49
Committed: Mon May 2 08:35:49 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.48: +14 -14 lines
Log Message:
E o -> E e

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 dl 1.44 * synchronous queue does not have any internal capacity, not even a
15     * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16     * because an element is only present when you try to take it; you
17     * cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to
19     * iterate. The <em>head</em> of the queue is the element that the
20     * first queued thread is trying to add to the queue; if there are no
21     * queued threads then no element is being added and the head is
22     * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23     * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24     * as an empty collection. This queue does not permit <tt>null</tt>
25     * elements.
26 dl 1.18 *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.43 *
33     * <p> This class supports an optional fairness policy for ordering
34     * waiting producer and consumer threads. By default, this ordering
35     * is not guaranteed. However, a queue constructed with fairness set
36     * to <tt>true</tt> grants threads access in FIFO order. Fairness
37     * generally decreases throughput but reduces variability and avoids
38     * starvation.
39     *
40 dl 1.46 * <p>This class and its iterator implement all of the
41     * <em>optional</em> methods of the {@link Collection} and {@link
42 jsr166 1.48 * Iterator} interfaces.
43 dl 1.42 *
44     * <p>This class is a member of the
45     * <a href="{@docRoot}/../guide/collections/index.html">
46     * Java Collections Framework</a>.
47     *
48 dl 1.6 * @since 1.5
49     * @author Doug Lea
50 dl 1.24 * @param <E> the type of elements held in this collection
51 dl 1.23 */
52 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
53 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
54 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
55 tim 1.1
56 dl 1.2 /*
57     This implementation divides actions into two cases for puts:
58    
59 dl 1.43 * An arriving producer that does not already have a waiting consumer
60     creates a node holding item, and then waits for a consumer to take it.
61     * An arriving producer that does already have a waiting consumer fills
62     the slot node created by the consumer, and notifies it to continue.
63 dl 1.2
64     And symmetrically, two for takes:
65    
66 dl 1.43 * An arriving consumer that does not already have a waiting producer
67     creates an empty slot node, and then waits for a producer to fill it.
68     * An arriving consumer that does already have a waiting producer takes
69     item from the node created by the producer, and notifies it to continue.
70 tim 1.10
71 dl 1.2 When a put or take waiting for the actions of its counterpart
72     aborts due to interruption or timeout, it marks the node
73     it created as "CANCELLED", which causes its counterpart to retry
74     the entire put or take sequence.
75 dl 1.43
76     This requires keeping two simple queues, waitingProducers and
77     waitingConsumers. Each of these can be FIFO (preserves fairness)
78     or LIFO (improves throughput).
79 dl 1.2 */
80    
81 dl 1.43 /** Lock protecting both wait queues */
82     private final ReentrantLock qlock;
83     /** Queue holding waiting puts */
84     private final WaitQueue waitingProducers;
85     /** Queue holding waiting takes */
86     private final WaitQueue waitingConsumers;
87 dl 1.2
88 dl 1.43 /**
89     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
90     */
91     public SynchronousQueue() {
92 dl 1.44 this(false);
93 dl 1.43 }
94    
95     /**
96     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
97 dl 1.44 * @param fair if true, threads contend in FIFO order for access;
98     * otherwise the order is unspecified.
99 dl 1.43 */
100     public SynchronousQueue(boolean fair) {
101     if (fair) {
102     qlock = new ReentrantLock(true);
103     waitingProducers = new FifoWaitQueue();
104     waitingConsumers = new FifoWaitQueue();
105     }
106     else {
107     qlock = new ReentrantLock();
108     waitingProducers = new LifoWaitQueue();
109     waitingConsumers = new LifoWaitQueue();
110     }
111     }
112    
113     /**
114 dl 1.45 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
115 dl 1.43 * These queues have all transient fields, but are serializable
116 dl 1.44 * in order to recover fairness settings when deserialized.
117 dl 1.43 */
118     static abstract class WaitQueue implements java.io.Serializable {
119 jsr166 1.48 /** Creates, adds, and returns node for x. */
120 dl 1.43 abstract Node enq(Object x);
121 jsr166 1.48 /** Removes and returns node, or null if empty. */
122 dl 1.43 abstract Node deq();
123 jsr166 1.48 /** Removes a cancelled node to avoid garbage retention. */
124 dl 1.47 abstract void unlink(Node node);
125 jsr166 1.48 /** Returns true if a cancelled node might be on queue. */
126 dl 1.47 abstract boolean shouldUnlink(Node node);
127 dl 1.43 }
128    
129     /**
130     * FIFO queue to hold waiting puts/takes.
131     */
132     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
133     private static final long serialVersionUID = -3623113410248163686L;
134     private transient Node head;
135     private transient Node last;
136    
137     Node enq(Object x) {
138     Node p = new Node(x);
139     if (last == null)
140     last = head = p;
141     else
142     last = last.next = p;
143     return p;
144     }
145    
146     Node deq() {
147     Node p = head;
148     if (p != null) {
149     if ((head = p.next) == null)
150     last = null;
151     p.next = null;
152     }
153     return p;
154     }
155 dl 1.47
156     boolean shouldUnlink(Node node) {
157     return (node == last || node.next != null);
158     }
159    
160    
161     void unlink(Node node) {
162     Node p = head;
163     Node trail = null;
164     while (p != null) {
165     if (p == node) {
166     Node next = p.next;
167 jsr166 1.48 if (trail == null)
168 dl 1.47 head = next;
169     else
170     trail.next = next;
171     if (last == node)
172     last = trail;
173     break;
174     }
175     trail = p;
176     p = p.next;
177     }
178     }
179 dl 1.43 }
180    
181     /**
182     * LIFO queue to hold waiting puts/takes.
183 dl 1.2 */
184 dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185     private static final long serialVersionUID = -3633113410248163686L;
186     private transient Node head;
187 dl 1.2
188 dl 1.43 Node enq(Object x) {
189     return head = new Node(x, head);
190     }
191    
192     Node deq() {
193     Node p = head;
194 dl 1.44 if (p != null) {
195 dl 1.43 head = p.next;
196 dl 1.44 p.next = null;
197     }
198 dl 1.43 return p;
199     }
200 dl 1.47
201     boolean shouldUnlink(Node node) {
202     // Return false if already dequeued or is bottom node (in which
203     // case we might retain at most one garbage node)
204     return (node == head || node.next != null);
205     }
206    
207     void unlink(Node node) {
208     Node p = head;
209     Node trail = null;
210     while (p != null) {
211     if (p == node) {
212     Node next = p.next;
213 jsr166 1.48 if (trail == null)
214 dl 1.47 head = next;
215     else
216     trail.next = next;
217     break;
218     }
219     trail = p;
220     p = p.next;
221     }
222     }
223     }
224    
225     /*
226     * Unlink the given node from consumer queue. Called by cancelled
227     * (timeout, interrupt) waiters to avoid garbage retention in the
228 jsr166 1.48 * absence of producers.
229 dl 1.47 */
230     private void unlinkCancelledConsumer(Node node) {
231     // Use a form of double-check to avoid unnecessary locking and
232     // traversal. The first check outside lock might
233     // conservatively report true.
234     if (waitingConsumers.shouldUnlink(node)) {
235     qlock.lock();
236     try {
237 jsr166 1.48 if (waitingConsumers.shouldUnlink(node))
238 dl 1.47 waitingConsumers.unlink(node);
239     } finally {
240     qlock.unlock();
241     }
242     }
243 dl 1.43 }
244 dl 1.2
245 dl 1.47 /*
246     * Unlink the given node from producer queue. Symmetric
247     * to unlinkCancelledConsumer.
248     */
249     private void unlinkCancelledProducer(Node node) {
250     if (waitingProducers.shouldUnlink(node)) {
251     qlock.lock();
252     try {
253 jsr166 1.48 if (waitingProducers.shouldUnlink(node))
254 dl 1.47 waitingProducers.unlink(node);
255     } finally {
256     qlock.unlock();
257     }
258     }
259     }
260 jsr166 1.48
261 dl 1.2 /**
262     * Nodes each maintain an item and handle waits and signals for
263 dl 1.31 * getting and setting it. The class extends
264     * AbstractQueuedSynchronizer to manage blocking, using AQS state
265     * 0 for waiting, 1 for ack, -1 for cancelled.
266 dl 1.2 */
267 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
268 dl 1.35 /** Synchronization state value representing that node acked */
269     private static final int ACK = 1;
270     /** Synchronization state value representing that node cancelled */
271     private static final int CANCEL = -1;
272    
273 dl 1.6 /** The item being transferred */
274 dl 1.2 Object item;
275 dl 1.6 /** Next node in wait queue */
276 dl 1.2 Node next;
277 dl 1.35
278 dl 1.44 /** Creates a node with initial item */
279 dl 1.31 Node(Object x) { item = x; }
280    
281 dl 1.44 /** Creates a node with initial item and next */
282 dl 1.43 Node(Object x, Node n) { item = x; next = n; }
283    
284 dl 1.31 /**
285     * Implements AQS base acquire to succeed if not in WAITING state
286     */
287 dl 1.39 protected boolean tryAcquire(int ignore) {
288 dl 1.35 return getState() != 0;
289 dl 1.31 }
290    
291     /**
292 dl 1.34 * Implements AQS base release to signal if state changed
293 dl 1.31 */
294 dl 1.39 protected boolean tryRelease(int newState) {
295 dl 1.35 return compareAndSetState(0, newState);
296 dl 1.31 }
297    
298     /**
299 dl 1.44 * Takes item and nulls out field (for sake of GC)
300 dl 1.31 */
301 dl 1.35 private Object extract() {
302     Object x = item;
303     item = null;
304     return x;
305 dl 1.31 }
306    
307     /**
308 dl 1.44 * Tries to cancel on interrupt; if so rethrowing,
309 dl 1.35 * else setting interrupt state
310 dl 1.31 */
311 jsr166 1.48 private void checkCancellationOnInterrupt(InterruptedException ie)
312 dl 1.35 throws InterruptedException {
313 jsr166 1.48 if (release(CANCEL))
314 dl 1.35 throw ie;
315     Thread.currentThread().interrupt();
316 dl 1.31 }
317 dl 1.2
318     /**
319 dl 1.44 * Fills in the slot created by the consumer and signal consumer to
320 dl 1.2 * continue.
321     */
322 dl 1.31 boolean setItem(Object x) {
323 dl 1.35 item = x; // can place in slot even if cancelled
324 dl 1.39 return release(ACK);
325 dl 1.2 }
326    
327     /**
328 dl 1.44 * Removes item from slot created by producer and signal producer
329 dl 1.2 * to continue.
330     */
331 dl 1.31 Object getItem() {
332 dl 1.39 return (release(ACK))? extract() : null;
333 dl 1.35 }
334    
335     /**
336 dl 1.44 * Waits for a consumer to take item placed by producer.
337 dl 1.35 */
338     void waitForTake() throws InterruptedException {
339     try {
340 dl 1.39 acquireInterruptibly(0);
341 dl 1.35 } catch (InterruptedException ie) {
342     checkCancellationOnInterrupt(ie);
343     }
344     }
345    
346     /**
347 dl 1.44 * Waits for a producer to put item placed by consumer.
348 dl 1.35 */
349     Object waitForPut() throws InterruptedException {
350     try {
351 dl 1.39 acquireInterruptibly(0);
352 dl 1.35 } catch (InterruptedException ie) {
353     checkCancellationOnInterrupt(ie);
354     }
355     return extract();
356 dl 1.31 }
357    
358     /**
359 dl 1.44 * Waits for a consumer to take item placed by producer or time out.
360 dl 1.31 */
361 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
362 dl 1.2 try {
363 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
364     release(CANCEL))
365 dl 1.33 return false;
366 dl 1.31 } catch (InterruptedException ie) {
367 dl 1.35 checkCancellationOnInterrupt(ie);
368 dl 1.2 }
369 dl 1.35 return true;
370 dl 1.2 }
371    
372     /**
373 dl 1.44 * Waits for a producer to put item placed by consumer, or time out.
374 dl 1.31 */
375 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
376 dl 1.31 try {
377 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
378     release(CANCEL))
379 dl 1.33 return null;
380 dl 1.31 } catch (InterruptedException ie) {
381 dl 1.35 checkCancellationOnInterrupt(ie);
382 dl 1.2 }
383 dl 1.35 return extract();
384 dl 1.2 }
385     }
386    
387     /**
388 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
389     * another thread to receive it.
390 jsr166 1.49 * @param e the element to add
391 dl 1.35 * @throws InterruptedException if interrupted while waiting.
392     * @throws NullPointerException if the specified element is <tt>null</tt>.
393 tim 1.10 */
394 jsr166 1.49 public void put(E e) throws InterruptedException {
395     if (e == null) throw new NullPointerException();
396 dl 1.35 final ReentrantLock qlock = this.qlock;
397    
398 dl 1.2 for (;;) {
399     Node node;
400     boolean mustWait;
401 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
402     qlock.lock();
403 dl 1.2 try {
404 dl 1.43 node = waitingConsumers.deq();
405 dl 1.2 if ( (mustWait = (node == null)) )
406 jsr166 1.49 node = waitingProducers.enq(e);
407 tim 1.14 } finally {
408 dl 1.2 qlock.unlock();
409     }
410    
411 dl 1.31 if (mustWait) {
412 dl 1.47 try {
413     node.waitForTake();
414     return;
415     } catch (InterruptedException ex) {
416     unlinkCancelledProducer(node);
417     throw ex;
418     }
419 dl 1.2 }
420    
421 jsr166 1.49 else if (node.setItem(e))
422 dl 1.35 return;
423 dl 1.2
424 dl 1.43 // else consumer cancelled, so retry
425 dl 1.35 }
426 tim 1.1 }
427    
428 dholmes 1.11 /**
429 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
430 dl 1.18 * up to the specified wait time for another thread to receive it.
431 jsr166 1.49 * @param e the element to add
432 dl 1.18 * @param timeout how long to wait before giving up, in units of
433     * <tt>unit</tt>
434     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
435     * <tt>timeout</tt> parameter
436 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
437 dl 1.43 * the specified waiting time elapses before a consumer appears.
438 dl 1.18 * @throws InterruptedException if interrupted while waiting.
439     * @throws NullPointerException if the specified element is <tt>null</tt>.
440 dholmes 1.11 */
441 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
442     if (e == null) throw new NullPointerException();
443 dl 1.35 long nanos = unit.toNanos(timeout);
444     final ReentrantLock qlock = this.qlock;
445     for (;;) {
446     Node node;
447     boolean mustWait;
448 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
449     qlock.lock();
450 dl 1.35 try {
451 dl 1.43 node = waitingConsumers.deq();
452 dl 1.35 if ( (mustWait = (node == null)) )
453 jsr166 1.49 node = waitingProducers.enq(e);
454 dl 1.35 } finally {
455     qlock.unlock();
456     }
457    
458 dl 1.47 if (mustWait) {
459     try {
460     boolean x = node.waitForTake(nanos);
461 jsr166 1.48 if (!x)
462 dl 1.47 unlinkCancelledProducer(node);
463     return x;
464     } catch (InterruptedException ex) {
465     unlinkCancelledProducer(node);
466     throw ex;
467     }
468     }
469 dl 1.35
470 jsr166 1.49 else if (node.setItem(e))
471 dl 1.35 return true;
472    
473 dl 1.43 // else consumer cancelled, so retry
474 dl 1.35 }
475 tim 1.1 }
476    
477 dholmes 1.11 /**
478     * Retrieves and removes the head of this queue, waiting if necessary
479     * for another thread to insert it.
480 dl 1.40 * @throws InterruptedException if interrupted while waiting.
481 dholmes 1.11 * @return the head of this queue
482     */
483 dl 1.2 public E take() throws InterruptedException {
484 dl 1.35 final ReentrantLock qlock = this.qlock;
485     for (;;) {
486     Node node;
487     boolean mustWait;
488    
489 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
490     qlock.lock();
491 dl 1.35 try {
492 dl 1.43 node = waitingProducers.deq();
493 dl 1.35 if ( (mustWait = (node == null)) )
494 dl 1.43 node = waitingConsumers.enq(null);
495 dl 1.35 } finally {
496     qlock.unlock();
497     }
498    
499 dl 1.36 if (mustWait) {
500 dl 1.47 try {
501     Object x = node.waitForPut();
502     return (E)x;
503     } catch (InterruptedException ex) {
504     unlinkCancelledConsumer(node);
505     throw ex;
506     }
507 dl 1.36 }
508 dl 1.35 else {
509     Object x = node.getItem();
510     if (x != null)
511     return (E)x;
512     // else cancelled, so retry
513     }
514     }
515 tim 1.1 }
516 dl 1.2
517 dholmes 1.11 /**
518     * Retrieves and removes the head of this queue, waiting
519     * if necessary up to the specified wait time, for another thread
520     * to insert it.
521 dl 1.18 * @param timeout how long to wait before giving up, in units of
522     * <tt>unit</tt>
523     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
524     * <tt>timeout</tt> parameter
525     * @return the head of this queue, or <tt>null</tt> if the
526     * specified waiting time elapses before an element is present.
527     * @throws InterruptedException if interrupted while waiting.
528 dholmes 1.11 */
529 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
530 dl 1.35 long nanos = unit.toNanos(timeout);
531     final ReentrantLock qlock = this.qlock;
532    
533     for (;;) {
534     Node node;
535     boolean mustWait;
536    
537 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
538     qlock.lock();
539 dl 1.35 try {
540 dl 1.43 node = waitingProducers.deq();
541 dl 1.35 if ( (mustWait = (node == null)) )
542 dl 1.43 node = waitingConsumers.enq(null);
543 dl 1.35 } finally {
544     qlock.unlock();
545     }
546    
547 dl 1.36 if (mustWait) {
548 dl 1.47 try {
549     Object x = node.waitForPut(nanos);
550 jsr166 1.48 if (x == null)
551 dl 1.47 unlinkCancelledConsumer(node);
552     return (E)x;
553     } catch (InterruptedException ex) {
554     unlinkCancelledConsumer(node);
555     throw ex;
556     }
557 dl 1.36 }
558 dl 1.35 else {
559     Object x = node.getItem();
560     if (x != null)
561     return (E)x;
562     // else cancelled, so retry
563     }
564     }
565 tim 1.1 }
566 dl 1.2
567     // Untimed nonblocking versions
568    
569 dl 1.18 /**
570 dl 1.20 * Inserts the specified element into this queue, if another thread is
571 dl 1.18 * waiting to receive it.
572     *
573 jsr166 1.49 * @param e the element to add.
574 dl 1.18 * @return <tt>true</tt> if it was possible to add the element to
575     * this queue, else <tt>false</tt>
576 jsr166 1.48 * @throws NullPointerException if the specified element is <tt>null</tt>.
577 dl 1.18 */
578 jsr166 1.49 public boolean offer(E e) {
579     if (e == null) throw new NullPointerException();
580 dl 1.27 final ReentrantLock qlock = this.qlock;
581 tim 1.10
582     for (;;) {
583 tim 1.26 Node node;
584 dl 1.2 qlock.lock();
585     try {
586 dl 1.43 node = waitingConsumers.deq();
587 tim 1.14 } finally {
588 dl 1.2 qlock.unlock();
589     }
590     if (node == null)
591     return false;
592 tim 1.10
593 jsr166 1.49 else if (node.setItem(e))
594 dl 1.2 return true;
595     // else retry
596     }
597 tim 1.1 }
598 dl 1.2
599 dl 1.18 /**
600     * Retrieves and removes the head of this queue, if another thread
601     * is currently making an element available.
602     *
603     * @return the head of this queue, or <tt>null</tt> if no
604     * element is available.
605     */
606 dl 1.2 public E poll() {
607 dl 1.27 final ReentrantLock qlock = this.qlock;
608 dl 1.2 for (;;) {
609     Node node;
610     qlock.lock();
611     try {
612 dl 1.43 node = waitingProducers.deq();
613 tim 1.14 } finally {
614 dl 1.2 qlock.unlock();
615     }
616     if (node == null)
617     return null;
618    
619     else {
620 dl 1.31 Object x = node.getItem();
621 dl 1.2 if (x != null)
622     return (E)x;
623     // else retry
624     }
625     }
626 tim 1.1 }
627 dl 1.2
628 dl 1.5 /**
629 jsr166 1.48 * Always returns <tt>true</tt>.
630 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
631     * @return <tt>true</tt>
632 dl 1.5 */
633     public boolean isEmpty() {
634     return true;
635     }
636    
637     /**
638 dholmes 1.11 * Always returns zero.
639     * A <tt>SynchronousQueue</tt> has no internal capacity.
640 dl 1.5 * @return zero.
641     */
642     public int size() {
643     return 0;
644 tim 1.1 }
645 dl 1.2
646 dl 1.5 /**
647 dholmes 1.11 * Always returns zero.
648     * A <tt>SynchronousQueue</tt> has no internal capacity.
649 dl 1.5 * @return zero.
650     */
651     public int remainingCapacity() {
652     return 0;
653     }
654    
655     /**
656 dholmes 1.11 * Does nothing.
657     * A <tt>SynchronousQueue</tt> has no internal capacity.
658     */
659     public void clear() {}
660    
661     /**
662     * Always returns <tt>false</tt>.
663     * A <tt>SynchronousQueue</tt> has no internal capacity.
664 dl 1.18 * @param o the element
665 dholmes 1.11 * @return <tt>false</tt>
666     */
667     public boolean contains(Object o) {
668     return false;
669     }
670    
671     /**
672 dl 1.18 * Always returns <tt>false</tt>.
673     * A <tt>SynchronousQueue</tt> has no internal capacity.
674     *
675     * @param o the element to remove
676     * @return <tt>false</tt>
677     */
678     public boolean remove(Object o) {
679     return false;
680     }
681    
682     /**
683 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
684 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
685 dl 1.18 * @param c the collection
686 dl 1.16 * @return <tt>false</tt> unless given collection is empty
687 dholmes 1.11 */
688 dl 1.12 public boolean containsAll(Collection<?> c) {
689 dl 1.16 return c.isEmpty();
690 dholmes 1.11 }
691    
692     /**
693     * Always returns <tt>false</tt>.
694     * A <tt>SynchronousQueue</tt> has no internal capacity.
695 dl 1.18 * @param c the collection
696 dholmes 1.11 * @return <tt>false</tt>
697     */
698 dl 1.12 public boolean removeAll(Collection<?> c) {
699 dholmes 1.11 return false;
700     }
701    
702     /**
703     * Always returns <tt>false</tt>.
704     * A <tt>SynchronousQueue</tt> has no internal capacity.
705 dl 1.18 * @param c the collection
706 dholmes 1.11 * @return <tt>false</tt>
707     */
708 dl 1.12 public boolean retainAll(Collection<?> c) {
709 dholmes 1.11 return false;
710     }
711    
712     /**
713 jsr166 1.48 * Always returns <tt>null</tt>.
714 dholmes 1.11 * A <tt>SynchronousQueue</tt> does not return elements
715 dl 1.5 * unless actively waited on.
716 dholmes 1.11 * @return <tt>null</tt>
717 dl 1.5 */
718     public E peek() {
719     return null;
720     }
721    
722    
723     static class EmptyIterator<E> implements Iterator<E> {
724 dl 1.2 public boolean hasNext() {
725     return false;
726     }
727     public E next() {
728     throw new NoSuchElementException();
729     }
730     public void remove() {
731 dl 1.17 throw new IllegalStateException();
732 dl 1.2 }
733 tim 1.1 }
734 dl 1.2
735 dl 1.5 /**
736 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
737 tim 1.13 * <tt>false</tt>.
738     *
739 dholmes 1.11 * @return an empty iterator
740 dl 1.5 */
741 dl 1.2 public Iterator<E> iterator() {
742 dl 1.5 return new EmptyIterator<E>();
743 tim 1.1 }
744    
745 dl 1.2
746 dl 1.5 /**
747 dholmes 1.11 * Returns a zero-length array.
748     * @return a zero-length array
749 dl 1.5 */
750 dl 1.3 public Object[] toArray() {
751 dl 1.25 return new Object[0];
752 tim 1.1 }
753    
754 dholmes 1.11 /**
755     * Sets the zeroeth element of the specified array to <tt>null</tt>
756     * (if the array has non-zero length) and returns it.
757 dl 1.40 * @param a the array
758 dholmes 1.11 * @return the specified array
759     */
760 dl 1.2 public <T> T[] toArray(T[] a) {
761     if (a.length > 0)
762     a[0] = null;
763     return a;
764     }
765 dl 1.21
766    
767     public int drainTo(Collection<? super E> c) {
768     if (c == null)
769     throw new NullPointerException();
770     if (c == this)
771     throw new IllegalArgumentException();
772     int n = 0;
773     E e;
774     while ( (e = poll()) != null) {
775     c.add(e);
776     ++n;
777     }
778     return n;
779     }
780    
781     public int drainTo(Collection<? super E> c, int maxElements) {
782     if (c == null)
783     throw new NullPointerException();
784     if (c == this)
785     throw new IllegalArgumentException();
786     int n = 0;
787     E e;
788     while (n < maxElements && (e = poll()) != null) {
789     c.add(e);
790     ++n;
791     }
792     return n;
793     }
794 tim 1.1 }