ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.52
Committed: Wed May 18 19:05:07 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.51: +15 -14 lines
Log Message:
doc clarifications

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 jsr166 1.52 * A {@linkplain BlockingQueue blocking queue} in which each insert
13     * operation must wait for a corresponding remove operation by another
14     * thread, and vice versa. A synchronous queue does not have any
15     * internal capacity, not even a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to remove it; you cannot insert an element
18     * (using any method) unless another thread is trying to remove it;
19     * you cannot iterate as there is nothing to iterate. The
20     * <em>head</em> of the queue is the element that the first queued
21     * inserting thread is trying to add to the queue; if there is no such
22     * queued thread then no element is available for removal and
23     * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
24     * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
25     * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
26     * does not permit <tt>null</tt> elements.
27 dl 1.18 *
28     * <p>Synchronous queues are similar to rendezvous channels used in
29     * CSP and Ada. They are well suited for handoff designs, in which an
30 dl 1.30 * object running in one thread must sync up with an object running
31 dl 1.18 * in another thread in order to hand it some information, event, or
32     * task.
33 dl 1.43 *
34     * <p> This class supports an optional fairness policy for ordering
35     * waiting producer and consumer threads. By default, this ordering
36     * is not guaranteed. However, a queue constructed with fairness set
37     * to <tt>true</tt> grants threads access in FIFO order. Fairness
38     * generally decreases throughput but reduces variability and avoids
39     * starvation.
40     *
41 dl 1.46 * <p>This class and its iterator implement all of the
42     * <em>optional</em> methods of the {@link Collection} and {@link
43 jsr166 1.48 * Iterator} interfaces.
44 dl 1.42 *
45     * <p>This class is a member of the
46     * <a href="{@docRoot}/../guide/collections/index.html">
47     * Java Collections Framework</a>.
48     *
49 dl 1.6 * @since 1.5
50     * @author Doug Lea
51 dl 1.24 * @param <E> the type of elements held in this collection
52 dl 1.23 */
53 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
54 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
55 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
56 tim 1.1
57 dl 1.2 /*
58     This implementation divides actions into two cases for puts:
59    
60 dl 1.43 * An arriving producer that does not already have a waiting consumer
61 jsr166 1.50 creates a node holding item, and then waits for a consumer to take it.
62 dl 1.43 * An arriving producer that does already have a waiting consumer fills
63 jsr166 1.50 the slot node created by the consumer, and notifies it to continue.
64 dl 1.2
65     And symmetrically, two for takes:
66    
67 dl 1.43 * An arriving consumer that does not already have a waiting producer
68 jsr166 1.50 creates an empty slot node, and then waits for a producer to fill it.
69 dl 1.43 * An arriving consumer that does already have a waiting producer takes
70 jsr166 1.50 item from the node created by the producer, and notifies it to continue.
71 tim 1.10
72 dl 1.2 When a put or take waiting for the actions of its counterpart
73     aborts due to interruption or timeout, it marks the node
74     it created as "CANCELLED", which causes its counterpart to retry
75     the entire put or take sequence.
76 dl 1.43
77     This requires keeping two simple queues, waitingProducers and
78     waitingConsumers. Each of these can be FIFO (preserves fairness)
79     or LIFO (improves throughput).
80 dl 1.2 */
81    
82 dl 1.43 /** Lock protecting both wait queues */
83     private final ReentrantLock qlock;
84     /** Queue holding waiting puts */
85     private final WaitQueue waitingProducers;
86     /** Queue holding waiting takes */
87     private final WaitQueue waitingConsumers;
88 dl 1.2
89 dl 1.43 /**
90     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
91     */
92     public SynchronousQueue() {
93 dl 1.44 this(false);
94 dl 1.43 }
95    
96     /**
97     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98 dl 1.44 * @param fair if true, threads contend in FIFO order for access;
99     * otherwise the order is unspecified.
100 dl 1.43 */
101     public SynchronousQueue(boolean fair) {
102     if (fair) {
103     qlock = new ReentrantLock(true);
104     waitingProducers = new FifoWaitQueue();
105     waitingConsumers = new FifoWaitQueue();
106     }
107     else {
108     qlock = new ReentrantLock();
109     waitingProducers = new LifoWaitQueue();
110     waitingConsumers = new LifoWaitQueue();
111     }
112     }
113    
114     /**
115 dl 1.45 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
116 dl 1.43 * These queues have all transient fields, but are serializable
117 dl 1.44 * in order to recover fairness settings when deserialized.
118 dl 1.43 */
119     static abstract class WaitQueue implements java.io.Serializable {
120 jsr166 1.48 /** Creates, adds, and returns node for x. */
121 dl 1.43 abstract Node enq(Object x);
122 jsr166 1.48 /** Removes and returns node, or null if empty. */
123 dl 1.43 abstract Node deq();
124 jsr166 1.48 /** Removes a cancelled node to avoid garbage retention. */
125 dl 1.47 abstract void unlink(Node node);
126 jsr166 1.48 /** Returns true if a cancelled node might be on queue. */
127 dl 1.47 abstract boolean shouldUnlink(Node node);
128 dl 1.43 }
129    
130     /**
131     * FIFO queue to hold waiting puts/takes.
132     */
133     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
134     private static final long serialVersionUID = -3623113410248163686L;
135     private transient Node head;
136     private transient Node last;
137    
138     Node enq(Object x) {
139     Node p = new Node(x);
140     if (last == null)
141     last = head = p;
142     else
143     last = last.next = p;
144     return p;
145     }
146    
147     Node deq() {
148     Node p = head;
149     if (p != null) {
150     if ((head = p.next) == null)
151     last = null;
152     p.next = null;
153     }
154     return p;
155     }
156 dl 1.47
157     boolean shouldUnlink(Node node) {
158     return (node == last || node.next != null);
159     }
160    
161     void unlink(Node node) {
162     Node p = head;
163     Node trail = null;
164     while (p != null) {
165     if (p == node) {
166     Node next = p.next;
167 jsr166 1.48 if (trail == null)
168 dl 1.47 head = next;
169     else
170     trail.next = next;
171     if (last == node)
172     last = trail;
173     break;
174     }
175     trail = p;
176     p = p.next;
177     }
178     }
179 dl 1.43 }
180    
181     /**
182     * LIFO queue to hold waiting puts/takes.
183 dl 1.2 */
184 dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185     private static final long serialVersionUID = -3633113410248163686L;
186     private transient Node head;
187 dl 1.2
188 dl 1.43 Node enq(Object x) {
189     return head = new Node(x, head);
190     }
191    
192     Node deq() {
193     Node p = head;
194 dl 1.44 if (p != null) {
195 dl 1.43 head = p.next;
196 dl 1.44 p.next = null;
197     }
198 dl 1.43 return p;
199     }
200 dl 1.47
201     boolean shouldUnlink(Node node) {
202     // Return false if already dequeued or is bottom node (in which
203     // case we might retain at most one garbage node)
204     return (node == head || node.next != null);
205     }
206    
207     void unlink(Node node) {
208     Node p = head;
209     Node trail = null;
210     while (p != null) {
211     if (p == node) {
212     Node next = p.next;
213 jsr166 1.48 if (trail == null)
214 dl 1.47 head = next;
215     else
216     trail.next = next;
217     break;
218     }
219     trail = p;
220     p = p.next;
221     }
222     }
223     }
224    
225 jsr166 1.50 /**
226     * Unlinks the given node from consumer queue. Called by cancelled
227 dl 1.47 * (timeout, interrupt) waiters to avoid garbage retention in the
228 jsr166 1.48 * absence of producers.
229 dl 1.47 */
230     private void unlinkCancelledConsumer(Node node) {
231     // Use a form of double-check to avoid unnecessary locking and
232     // traversal. The first check outside lock might
233     // conservatively report true.
234     if (waitingConsumers.shouldUnlink(node)) {
235     qlock.lock();
236     try {
237 jsr166 1.48 if (waitingConsumers.shouldUnlink(node))
238 dl 1.47 waitingConsumers.unlink(node);
239     } finally {
240     qlock.unlock();
241     }
242     }
243 dl 1.43 }
244 dl 1.2
245 jsr166 1.50 /**
246     * Unlinks the given node from producer queue. Symmetric
247 dl 1.47 * to unlinkCancelledConsumer.
248     */
249     private void unlinkCancelledProducer(Node node) {
250     if (waitingProducers.shouldUnlink(node)) {
251     qlock.lock();
252     try {
253 jsr166 1.48 if (waitingProducers.shouldUnlink(node))
254 dl 1.47 waitingProducers.unlink(node);
255     } finally {
256     qlock.unlock();
257     }
258     }
259     }
260 jsr166 1.48
261 dl 1.2 /**
262     * Nodes each maintain an item and handle waits and signals for
263 dl 1.31 * getting and setting it. The class extends
264     * AbstractQueuedSynchronizer to manage blocking, using AQS state
265     * 0 for waiting, 1 for ack, -1 for cancelled.
266 dl 1.2 */
267 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
268 dl 1.35 /** Synchronization state value representing that node acked */
269     private static final int ACK = 1;
270     /** Synchronization state value representing that node cancelled */
271     private static final int CANCEL = -1;
272    
273 dl 1.6 /** The item being transferred */
274 dl 1.2 Object item;
275 dl 1.6 /** Next node in wait queue */
276 dl 1.2 Node next;
277 dl 1.35
278 dl 1.44 /** Creates a node with initial item */
279 dl 1.31 Node(Object x) { item = x; }
280    
281 dl 1.44 /** Creates a node with initial item and next */
282 dl 1.43 Node(Object x, Node n) { item = x; next = n; }
283    
284 dl 1.31 /**
285     * Implements AQS base acquire to succeed if not in WAITING state
286     */
287 dl 1.39 protected boolean tryAcquire(int ignore) {
288 dl 1.35 return getState() != 0;
289 dl 1.31 }
290    
291     /**
292 dl 1.34 * Implements AQS base release to signal if state changed
293 dl 1.31 */
294 dl 1.39 protected boolean tryRelease(int newState) {
295 dl 1.35 return compareAndSetState(0, newState);
296 dl 1.31 }
297    
298     /**
299 dl 1.44 * Takes item and nulls out field (for sake of GC)
300 dl 1.31 */
301 dl 1.35 private Object extract() {
302     Object x = item;
303     item = null;
304     return x;
305 dl 1.31 }
306    
307     /**
308 dl 1.44 * Tries to cancel on interrupt; if so rethrowing,
309 dl 1.35 * else setting interrupt state
310 dl 1.31 */
311 jsr166 1.48 private void checkCancellationOnInterrupt(InterruptedException ie)
312 dl 1.35 throws InterruptedException {
313 jsr166 1.48 if (release(CANCEL))
314 dl 1.35 throw ie;
315     Thread.currentThread().interrupt();
316 dl 1.31 }
317 dl 1.2
318     /**
319 dl 1.44 * Fills in the slot created by the consumer and signal consumer to
320 dl 1.2 * continue.
321     */
322 dl 1.31 boolean setItem(Object x) {
323 dl 1.35 item = x; // can place in slot even if cancelled
324 dl 1.39 return release(ACK);
325 dl 1.2 }
326    
327     /**
328 dl 1.44 * Removes item from slot created by producer and signal producer
329 dl 1.2 * to continue.
330     */
331 dl 1.31 Object getItem() {
332 dl 1.39 return (release(ACK))? extract() : null;
333 dl 1.35 }
334    
335     /**
336 dl 1.44 * Waits for a consumer to take item placed by producer.
337 dl 1.35 */
338     void waitForTake() throws InterruptedException {
339     try {
340 dl 1.39 acquireInterruptibly(0);
341 dl 1.35 } catch (InterruptedException ie) {
342     checkCancellationOnInterrupt(ie);
343     }
344     }
345    
346     /**
347 dl 1.44 * Waits for a producer to put item placed by consumer.
348 dl 1.35 */
349     Object waitForPut() throws InterruptedException {
350     try {
351 dl 1.39 acquireInterruptibly(0);
352 dl 1.35 } catch (InterruptedException ie) {
353     checkCancellationOnInterrupt(ie);
354     }
355     return extract();
356 dl 1.31 }
357    
358     /**
359 dl 1.44 * Waits for a consumer to take item placed by producer or time out.
360 dl 1.31 */
361 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
362 dl 1.2 try {
363 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
364     release(CANCEL))
365 dl 1.33 return false;
366 dl 1.31 } catch (InterruptedException ie) {
367 dl 1.35 checkCancellationOnInterrupt(ie);
368 dl 1.2 }
369 dl 1.35 return true;
370 dl 1.2 }
371    
372     /**
373 dl 1.44 * Waits for a producer to put item placed by consumer, or time out.
374 dl 1.31 */
375 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
376 dl 1.31 try {
377 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
378     release(CANCEL))
379 dl 1.33 return null;
380 dl 1.31 } catch (InterruptedException ie) {
381 dl 1.35 checkCancellationOnInterrupt(ie);
382 dl 1.2 }
383 dl 1.35 return extract();
384 dl 1.2 }
385     }
386    
387     /**
388 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
389     * another thread to receive it.
390 jsr166 1.50 *
391     * @throws InterruptedException {@inheritDoc}
392     * @throws NullPointerException {@inheritDoc}
393 tim 1.10 */
394 jsr166 1.49 public void put(E e) throws InterruptedException {
395     if (e == null) throw new NullPointerException();
396 dl 1.35 final ReentrantLock qlock = this.qlock;
397    
398 dl 1.2 for (;;) {
399     Node node;
400     boolean mustWait;
401 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
402     qlock.lock();
403 dl 1.2 try {
404 dl 1.43 node = waitingConsumers.deq();
405 dl 1.2 if ( (mustWait = (node == null)) )
406 jsr166 1.49 node = waitingProducers.enq(e);
407 tim 1.14 } finally {
408 dl 1.2 qlock.unlock();
409     }
410    
411 dl 1.31 if (mustWait) {
412 dl 1.47 try {
413     node.waitForTake();
414     return;
415     } catch (InterruptedException ex) {
416     unlinkCancelledProducer(node);
417     throw ex;
418     }
419 dl 1.2 }
420    
421 jsr166 1.49 else if (node.setItem(e))
422 dl 1.35 return;
423 dl 1.2
424 dl 1.43 // else consumer cancelled, so retry
425 dl 1.35 }
426 tim 1.1 }
427    
428 dholmes 1.11 /**
429 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
430 dl 1.18 * up to the specified wait time for another thread to receive it.
431 jsr166 1.50 *
432     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
433     * specified waiting time elapses before a consumer appears.
434     * @throws InterruptedException {@inheritDoc}
435     * @throws NullPointerException {@inheritDoc}
436 dholmes 1.11 */
437 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
438     if (e == null) throw new NullPointerException();
439 dl 1.35 long nanos = unit.toNanos(timeout);
440     final ReentrantLock qlock = this.qlock;
441     for (;;) {
442     Node node;
443     boolean mustWait;
444 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
445     qlock.lock();
446 dl 1.35 try {
447 dl 1.43 node = waitingConsumers.deq();
448 dl 1.35 if ( (mustWait = (node == null)) )
449 jsr166 1.49 node = waitingProducers.enq(e);
450 dl 1.35 } finally {
451     qlock.unlock();
452     }
453    
454 dl 1.47 if (mustWait) {
455     try {
456     boolean x = node.waitForTake(nanos);
457 jsr166 1.48 if (!x)
458 dl 1.47 unlinkCancelledProducer(node);
459     return x;
460     } catch (InterruptedException ex) {
461     unlinkCancelledProducer(node);
462     throw ex;
463     }
464     }
465 dl 1.35
466 jsr166 1.49 else if (node.setItem(e))
467 dl 1.35 return true;
468    
469 dl 1.43 // else consumer cancelled, so retry
470 dl 1.35 }
471 tim 1.1 }
472    
473 dholmes 1.11 /**
474     * Retrieves and removes the head of this queue, waiting if necessary
475     * for another thread to insert it.
476 jsr166 1.50 *
477 dholmes 1.11 * @return the head of this queue
478 jsr166 1.50 * @throws InterruptedException {@inheritDoc}
479 dholmes 1.11 */
480 dl 1.2 public E take() throws InterruptedException {
481 dl 1.35 final ReentrantLock qlock = this.qlock;
482     for (;;) {
483     Node node;
484     boolean mustWait;
485    
486 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
487     qlock.lock();
488 dl 1.35 try {
489 dl 1.43 node = waitingProducers.deq();
490 dl 1.35 if ( (mustWait = (node == null)) )
491 dl 1.43 node = waitingConsumers.enq(null);
492 dl 1.35 } finally {
493     qlock.unlock();
494     }
495    
496 dl 1.36 if (mustWait) {
497 dl 1.47 try {
498     Object x = node.waitForPut();
499     return (E)x;
500     } catch (InterruptedException ex) {
501     unlinkCancelledConsumer(node);
502     throw ex;
503     }
504 dl 1.36 }
505 dl 1.35 else {
506     Object x = node.getItem();
507     if (x != null)
508     return (E)x;
509     // else cancelled, so retry
510     }
511     }
512 tim 1.1 }
513 dl 1.2
514 dholmes 1.11 /**
515     * Retrieves and removes the head of this queue, waiting
516     * if necessary up to the specified wait time, for another thread
517     * to insert it.
518 jsr166 1.50 *
519 dl 1.18 * @return the head of this queue, or <tt>null</tt> if the
520 jsr166 1.50 * specified waiting time elapses before an element is present.
521     * @throws InterruptedException {@inheritDoc}
522 dholmes 1.11 */
523 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
524 dl 1.35 long nanos = unit.toNanos(timeout);
525     final ReentrantLock qlock = this.qlock;
526    
527     for (;;) {
528     Node node;
529     boolean mustWait;
530    
531 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
532     qlock.lock();
533 dl 1.35 try {
534 dl 1.43 node = waitingProducers.deq();
535 dl 1.35 if ( (mustWait = (node == null)) )
536 dl 1.43 node = waitingConsumers.enq(null);
537 dl 1.35 } finally {
538     qlock.unlock();
539     }
540    
541 dl 1.36 if (mustWait) {
542 dl 1.47 try {
543     Object x = node.waitForPut(nanos);
544 jsr166 1.48 if (x == null)
545 dl 1.47 unlinkCancelledConsumer(node);
546     return (E)x;
547     } catch (InterruptedException ex) {
548     unlinkCancelledConsumer(node);
549     throw ex;
550     }
551 dl 1.36 }
552 dl 1.35 else {
553     Object x = node.getItem();
554     if (x != null)
555     return (E)x;
556     // else cancelled, so retry
557     }
558     }
559 tim 1.1 }
560 dl 1.2
561     // Untimed nonblocking versions
562    
563 jsr166 1.50 /**
564     * Inserts the specified element into this queue, if another thread is
565     * waiting to receive it.
566     *
567     * @param e the element to add
568 jsr166 1.51 * @return <tt>true</tt> if the element was added to this queue, else
569     * <tt>false</tt>
570 jsr166 1.50 * @throws NullPointerException if the specified element is null
571     */
572 jsr166 1.49 public boolean offer(E e) {
573     if (e == null) throw new NullPointerException();
574 dl 1.27 final ReentrantLock qlock = this.qlock;
575 tim 1.10
576     for (;;) {
577 tim 1.26 Node node;
578 dl 1.2 qlock.lock();
579     try {
580 dl 1.43 node = waitingConsumers.deq();
581 tim 1.14 } finally {
582 dl 1.2 qlock.unlock();
583     }
584     if (node == null)
585     return false;
586 tim 1.10
587 jsr166 1.49 else if (node.setItem(e))
588 dl 1.2 return true;
589     // else retry
590     }
591 tim 1.1 }
592 dl 1.2
593 dl 1.18 /**
594     * Retrieves and removes the head of this queue, if another thread
595     * is currently making an element available.
596     *
597     * @return the head of this queue, or <tt>null</tt> if no
598     * element is available.
599     */
600 dl 1.2 public E poll() {
601 dl 1.27 final ReentrantLock qlock = this.qlock;
602 dl 1.2 for (;;) {
603     Node node;
604     qlock.lock();
605     try {
606 dl 1.43 node = waitingProducers.deq();
607 tim 1.14 } finally {
608 dl 1.2 qlock.unlock();
609     }
610     if (node == null)
611     return null;
612    
613     else {
614 dl 1.31 Object x = node.getItem();
615 dl 1.2 if (x != null)
616     return (E)x;
617     // else retry
618     }
619     }
620 tim 1.1 }
621 dl 1.2
622 dl 1.5 /**
623 jsr166 1.48 * Always returns <tt>true</tt>.
624 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
625 jsr166 1.50 *
626 dholmes 1.11 * @return <tt>true</tt>
627 dl 1.5 */
628     public boolean isEmpty() {
629     return true;
630     }
631    
632     /**
633 dholmes 1.11 * Always returns zero.
634     * A <tt>SynchronousQueue</tt> has no internal capacity.
635 jsr166 1.50 *
636     * @return zero
637 dl 1.5 */
638     public int size() {
639     return 0;
640 tim 1.1 }
641 dl 1.2
642 dl 1.5 /**
643 dholmes 1.11 * Always returns zero.
644     * A <tt>SynchronousQueue</tt> has no internal capacity.
645 jsr166 1.50 *
646     * @return zero
647 dl 1.5 */
648     public int remainingCapacity() {
649     return 0;
650     }
651    
652     /**
653 dholmes 1.11 * Does nothing.
654     * A <tt>SynchronousQueue</tt> has no internal capacity.
655     */
656     public void clear() {}
657    
658     /**
659     * Always returns <tt>false</tt>.
660     * A <tt>SynchronousQueue</tt> has no internal capacity.
661 jsr166 1.50 *
662 dl 1.18 * @param o the element
663 dholmes 1.11 * @return <tt>false</tt>
664     */
665     public boolean contains(Object o) {
666     return false;
667     }
668    
669     /**
670 dl 1.18 * Always returns <tt>false</tt>.
671     * A <tt>SynchronousQueue</tt> has no internal capacity.
672     *
673     * @param o the element to remove
674     * @return <tt>false</tt>
675     */
676     public boolean remove(Object o) {
677     return false;
678     }
679    
680     /**
681 jsr166 1.50 * Returns <tt>false</tt> unless the given collection is empty.
682 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
683 jsr166 1.50 *
684 dl 1.18 * @param c the collection
685 jsr166 1.50 * @return <tt>false</tt> unless the given collection is empty
686     * @throws NullPointerException if the specified collection is null
687 dholmes 1.11 */
688 dl 1.12 public boolean containsAll(Collection<?> c) {
689 dl 1.16 return c.isEmpty();
690 dholmes 1.11 }
691    
692     /**
693     * Always returns <tt>false</tt>.
694     * A <tt>SynchronousQueue</tt> has no internal capacity.
695 jsr166 1.50 *
696 dl 1.18 * @param c the collection
697 dholmes 1.11 * @return <tt>false</tt>
698     */
699 dl 1.12 public boolean removeAll(Collection<?> c) {
700 dholmes 1.11 return false;
701     }
702    
703     /**
704     * Always returns <tt>false</tt>.
705     * A <tt>SynchronousQueue</tt> has no internal capacity.
706 jsr166 1.50 *
707 dl 1.18 * @param c the collection
708 dholmes 1.11 * @return <tt>false</tt>
709     */
710 dl 1.12 public boolean retainAll(Collection<?> c) {
711 dholmes 1.11 return false;
712     }
713    
714     /**
715 jsr166 1.48 * Always returns <tt>null</tt>.
716 dholmes 1.11 * A <tt>SynchronousQueue</tt> does not return elements
717 dl 1.5 * unless actively waited on.
718 jsr166 1.50 *
719 dholmes 1.11 * @return <tt>null</tt>
720 dl 1.5 */
721     public E peek() {
722     return null;
723     }
724    
725    
726     static class EmptyIterator<E> implements Iterator<E> {
727 dl 1.2 public boolean hasNext() {
728     return false;
729     }
730     public E next() {
731     throw new NoSuchElementException();
732     }
733     public void remove() {
734 dl 1.17 throw new IllegalStateException();
735 dl 1.2 }
736 tim 1.1 }
737 dl 1.2
738 dl 1.5 /**
739 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
740 tim 1.13 * <tt>false</tt>.
741     *
742 dholmes 1.11 * @return an empty iterator
743 dl 1.5 */
744 dl 1.2 public Iterator<E> iterator() {
745 dl 1.5 return new EmptyIterator<E>();
746 tim 1.1 }
747    
748 dl 1.2
749 dl 1.5 /**
750 dholmes 1.11 * Returns a zero-length array.
751     * @return a zero-length array
752 dl 1.5 */
753 dl 1.3 public Object[] toArray() {
754 dl 1.25 return new Object[0];
755 tim 1.1 }
756    
757 dholmes 1.11 /**
758     * Sets the zeroeth element of the specified array to <tt>null</tt>
759     * (if the array has non-zero length) and returns it.
760 jsr166 1.50 *
761 dl 1.40 * @param a the array
762 dholmes 1.11 * @return the specified array
763 jsr166 1.50 * @throws NullPointerException if the specified array is null
764 dholmes 1.11 */
765 dl 1.2 public <T> T[] toArray(T[] a) {
766     if (a.length > 0)
767     a[0] = null;
768     return a;
769     }
770 dl 1.21
771 jsr166 1.50 /**
772     * @throws UnsupportedOperationException {@inheritDoc}
773     * @throws ClassCastException {@inheritDoc}
774     * @throws NullPointerException {@inheritDoc}
775     * @throws IllegalArgumentException {@inheritDoc}
776     */
777 dl 1.21 public int drainTo(Collection<? super E> c) {
778     if (c == null)
779     throw new NullPointerException();
780     if (c == this)
781     throw new IllegalArgumentException();
782     int n = 0;
783     E e;
784     while ( (e = poll()) != null) {
785     c.add(e);
786     ++n;
787     }
788     return n;
789     }
790    
791 jsr166 1.50 /**
792     * @throws UnsupportedOperationException {@inheritDoc}
793     * @throws ClassCastException {@inheritDoc}
794     * @throws NullPointerException {@inheritDoc}
795     * @throws IllegalArgumentException {@inheritDoc}
796     */
797 dl 1.21 public int drainTo(Collection<? super E> c, int maxElements) {
798     if (c == null)
799     throw new NullPointerException();
800     if (c == this)
801     throw new IllegalArgumentException();
802     int n = 0;
803     E e;
804     while (n < maxElements && (e = poll()) != null) {
805     c.add(e);
806     ++n;
807     }
808     return n;
809     }
810 tim 1.1 }