ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.50
Committed: Tue May 17 06:07:40 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.49: +58 -43 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 dl 1.44 * synchronous queue does not have any internal capacity, not even a
15 jsr166 1.50 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16 dl 1.44 * because an element is only present when you try to take it; you
17     * cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to
19     * iterate. The <em>head</em> of the queue is the element that the
20     * first queued thread is trying to add to the queue; if there are no
21     * queued threads then no element is being added and the head is
22     * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23     * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24     * as an empty collection. This queue does not permit <tt>null</tt>
25     * elements.
26 dl 1.18 *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.43 *
33     * <p> This class supports an optional fairness policy for ordering
34     * waiting producer and consumer threads. By default, this ordering
35     * is not guaranteed. However, a queue constructed with fairness set
36     * to <tt>true</tt> grants threads access in FIFO order. Fairness
37     * generally decreases throughput but reduces variability and avoids
38     * starvation.
39     *
40 dl 1.46 * <p>This class and its iterator implement all of the
41     * <em>optional</em> methods of the {@link Collection} and {@link
42 jsr166 1.48 * Iterator} interfaces.
43 dl 1.42 *
44     * <p>This class is a member of the
45     * <a href="{@docRoot}/../guide/collections/index.html">
46     * Java Collections Framework</a>.
47     *
48 dl 1.6 * @since 1.5
49     * @author Doug Lea
50 dl 1.24 * @param <E> the type of elements held in this collection
51 dl 1.23 */
52 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
53 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
54 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
55 tim 1.1
56 dl 1.2 /*
57     This implementation divides actions into two cases for puts:
58    
59 dl 1.43 * An arriving producer that does not already have a waiting consumer
60 jsr166 1.50 creates a node holding item, and then waits for a consumer to take it.
61 dl 1.43 * An arriving producer that does already have a waiting consumer fills
62 jsr166 1.50 the slot node created by the consumer, and notifies it to continue.
63 dl 1.2
64     And symmetrically, two for takes:
65    
66 dl 1.43 * An arriving consumer that does not already have a waiting producer
67 jsr166 1.50 creates an empty slot node, and then waits for a producer to fill it.
68 dl 1.43 * An arriving consumer that does already have a waiting producer takes
69 jsr166 1.50 item from the node created by the producer, and notifies it to continue.
70 tim 1.10
71 dl 1.2 When a put or take waiting for the actions of its counterpart
72     aborts due to interruption or timeout, it marks the node
73     it created as "CANCELLED", which causes its counterpart to retry
74     the entire put or take sequence.
75 dl 1.43
76     This requires keeping two simple queues, waitingProducers and
77     waitingConsumers. Each of these can be FIFO (preserves fairness)
78     or LIFO (improves throughput).
79 dl 1.2 */
80    
81 dl 1.43 /** Lock protecting both wait queues */
82     private final ReentrantLock qlock;
83     /** Queue holding waiting puts */
84     private final WaitQueue waitingProducers;
85     /** Queue holding waiting takes */
86     private final WaitQueue waitingConsumers;
87 dl 1.2
88 dl 1.43 /**
89     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
90     */
91     public SynchronousQueue() {
92 dl 1.44 this(false);
93 dl 1.43 }
94    
95     /**
96     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
97 dl 1.44 * @param fair if true, threads contend in FIFO order for access;
98     * otherwise the order is unspecified.
99 dl 1.43 */
100     public SynchronousQueue(boolean fair) {
101     if (fair) {
102     qlock = new ReentrantLock(true);
103     waitingProducers = new FifoWaitQueue();
104     waitingConsumers = new FifoWaitQueue();
105     }
106     else {
107     qlock = new ReentrantLock();
108     waitingProducers = new LifoWaitQueue();
109     waitingConsumers = new LifoWaitQueue();
110     }
111     }
112    
113     /**
114 dl 1.45 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
115 dl 1.43 * These queues have all transient fields, but are serializable
116 dl 1.44 * in order to recover fairness settings when deserialized.
117 dl 1.43 */
118     static abstract class WaitQueue implements java.io.Serializable {
119 jsr166 1.48 /** Creates, adds, and returns node for x. */
120 dl 1.43 abstract Node enq(Object x);
121 jsr166 1.48 /** Removes and returns node, or null if empty. */
122 dl 1.43 abstract Node deq();
123 jsr166 1.48 /** Removes a cancelled node to avoid garbage retention. */
124 dl 1.47 abstract void unlink(Node node);
125 jsr166 1.48 /** Returns true if a cancelled node might be on queue. */
126 dl 1.47 abstract boolean shouldUnlink(Node node);
127 dl 1.43 }
128    
129     /**
130     * FIFO queue to hold waiting puts/takes.
131     */
132     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
133     private static final long serialVersionUID = -3623113410248163686L;
134     private transient Node head;
135     private transient Node last;
136    
137     Node enq(Object x) {
138     Node p = new Node(x);
139     if (last == null)
140     last = head = p;
141     else
142     last = last.next = p;
143     return p;
144     }
145    
146     Node deq() {
147     Node p = head;
148     if (p != null) {
149     if ((head = p.next) == null)
150     last = null;
151     p.next = null;
152     }
153     return p;
154     }
155 dl 1.47
156     boolean shouldUnlink(Node node) {
157     return (node == last || node.next != null);
158     }
159    
160     void unlink(Node node) {
161     Node p = head;
162     Node trail = null;
163     while (p != null) {
164     if (p == node) {
165     Node next = p.next;
166 jsr166 1.48 if (trail == null)
167 dl 1.47 head = next;
168     else
169     trail.next = next;
170     if (last == node)
171     last = trail;
172     break;
173     }
174     trail = p;
175     p = p.next;
176     }
177     }
178 dl 1.43 }
179    
180     /**
181     * LIFO queue to hold waiting puts/takes.
182 dl 1.2 */
183 dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
184     private static final long serialVersionUID = -3633113410248163686L;
185     private transient Node head;
186 dl 1.2
187 dl 1.43 Node enq(Object x) {
188     return head = new Node(x, head);
189     }
190    
191     Node deq() {
192     Node p = head;
193 dl 1.44 if (p != null) {
194 dl 1.43 head = p.next;
195 dl 1.44 p.next = null;
196     }
197 dl 1.43 return p;
198     }
199 dl 1.47
200     boolean shouldUnlink(Node node) {
201     // Return false if already dequeued or is bottom node (in which
202     // case we might retain at most one garbage node)
203     return (node == head || node.next != null);
204     }
205    
206     void unlink(Node node) {
207     Node p = head;
208     Node trail = null;
209     while (p != null) {
210     if (p == node) {
211     Node next = p.next;
212 jsr166 1.48 if (trail == null)
213 dl 1.47 head = next;
214     else
215     trail.next = next;
216     break;
217     }
218     trail = p;
219     p = p.next;
220     }
221     }
222     }
223    
224 jsr166 1.50 /**
225     * Unlinks the given node from consumer queue. Called by cancelled
226 dl 1.47 * (timeout, interrupt) waiters to avoid garbage retention in the
227 jsr166 1.48 * absence of producers.
228 dl 1.47 */
229     private void unlinkCancelledConsumer(Node node) {
230     // Use a form of double-check to avoid unnecessary locking and
231     // traversal. The first check outside lock might
232     // conservatively report true.
233     if (waitingConsumers.shouldUnlink(node)) {
234     qlock.lock();
235     try {
236 jsr166 1.48 if (waitingConsumers.shouldUnlink(node))
237 dl 1.47 waitingConsumers.unlink(node);
238     } finally {
239     qlock.unlock();
240     }
241     }
242 dl 1.43 }
243 dl 1.2
244 jsr166 1.50 /**
245     * Unlinks the given node from producer queue. Symmetric
246 dl 1.47 * to unlinkCancelledConsumer.
247     */
248     private void unlinkCancelledProducer(Node node) {
249     if (waitingProducers.shouldUnlink(node)) {
250     qlock.lock();
251     try {
252 jsr166 1.48 if (waitingProducers.shouldUnlink(node))
253 dl 1.47 waitingProducers.unlink(node);
254     } finally {
255     qlock.unlock();
256     }
257     }
258     }
259 jsr166 1.48
260 dl 1.2 /**
261     * Nodes each maintain an item and handle waits and signals for
262 dl 1.31 * getting and setting it. The class extends
263     * AbstractQueuedSynchronizer to manage blocking, using AQS state
264     * 0 for waiting, 1 for ack, -1 for cancelled.
265 dl 1.2 */
266 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
267 dl 1.35 /** Synchronization state value representing that node acked */
268     private static final int ACK = 1;
269     /** Synchronization state value representing that node cancelled */
270     private static final int CANCEL = -1;
271    
272 dl 1.6 /** The item being transferred */
273 dl 1.2 Object item;
274 dl 1.6 /** Next node in wait queue */
275 dl 1.2 Node next;
276 dl 1.35
277 dl 1.44 /** Creates a node with initial item */
278 dl 1.31 Node(Object x) { item = x; }
279    
280 dl 1.44 /** Creates a node with initial item and next */
281 dl 1.43 Node(Object x, Node n) { item = x; next = n; }
282    
283 dl 1.31 /**
284     * Implements AQS base acquire to succeed if not in WAITING state
285     */
286 dl 1.39 protected boolean tryAcquire(int ignore) {
287 dl 1.35 return getState() != 0;
288 dl 1.31 }
289    
290     /**
291 dl 1.34 * Implements AQS base release to signal if state changed
292 dl 1.31 */
293 dl 1.39 protected boolean tryRelease(int newState) {
294 dl 1.35 return compareAndSetState(0, newState);
295 dl 1.31 }
296    
297     /**
298 dl 1.44 * Takes item and nulls out field (for sake of GC)
299 dl 1.31 */
300 dl 1.35 private Object extract() {
301     Object x = item;
302     item = null;
303     return x;
304 dl 1.31 }
305    
306     /**
307 dl 1.44 * Tries to cancel on interrupt; if so rethrowing,
308 dl 1.35 * else setting interrupt state
309 dl 1.31 */
310 jsr166 1.48 private void checkCancellationOnInterrupt(InterruptedException ie)
311 dl 1.35 throws InterruptedException {
312 jsr166 1.48 if (release(CANCEL))
313 dl 1.35 throw ie;
314     Thread.currentThread().interrupt();
315 dl 1.31 }
316 dl 1.2
317     /**
318 dl 1.44 * Fills in the slot created by the consumer and signal consumer to
319 dl 1.2 * continue.
320     */
321 dl 1.31 boolean setItem(Object x) {
322 dl 1.35 item = x; // can place in slot even if cancelled
323 dl 1.39 return release(ACK);
324 dl 1.2 }
325    
326     /**
327 dl 1.44 * Removes item from slot created by producer and signal producer
328 dl 1.2 * to continue.
329     */
330 dl 1.31 Object getItem() {
331 dl 1.39 return (release(ACK))? extract() : null;
332 dl 1.35 }
333    
334     /**
335 dl 1.44 * Waits for a consumer to take item placed by producer.
336 dl 1.35 */
337     void waitForTake() throws InterruptedException {
338     try {
339 dl 1.39 acquireInterruptibly(0);
340 dl 1.35 } catch (InterruptedException ie) {
341     checkCancellationOnInterrupt(ie);
342     }
343     }
344    
345     /**
346 dl 1.44 * Waits for a producer to put item placed by consumer.
347 dl 1.35 */
348     Object waitForPut() throws InterruptedException {
349     try {
350 dl 1.39 acquireInterruptibly(0);
351 dl 1.35 } catch (InterruptedException ie) {
352     checkCancellationOnInterrupt(ie);
353     }
354     return extract();
355 dl 1.31 }
356    
357     /**
358 dl 1.44 * Waits for a consumer to take item placed by producer or time out.
359 dl 1.31 */
360 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
361 dl 1.2 try {
362 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
363     release(CANCEL))
364 dl 1.33 return false;
365 dl 1.31 } catch (InterruptedException ie) {
366 dl 1.35 checkCancellationOnInterrupt(ie);
367 dl 1.2 }
368 dl 1.35 return true;
369 dl 1.2 }
370    
371     /**
372 dl 1.44 * Waits for a producer to put item placed by consumer, or time out.
373 dl 1.31 */
374 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
375 dl 1.31 try {
376 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
377     release(CANCEL))
378 dl 1.33 return null;
379 dl 1.31 } catch (InterruptedException ie) {
380 dl 1.35 checkCancellationOnInterrupt(ie);
381 dl 1.2 }
382 dl 1.35 return extract();
383 dl 1.2 }
384     }
385    
386     /**
387 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
388     * another thread to receive it.
389 jsr166 1.50 *
390     * @throws InterruptedException {@inheritDoc}
391     * @throws NullPointerException {@inheritDoc}
392 tim 1.10 */
393 jsr166 1.49 public void put(E e) throws InterruptedException {
394     if (e == null) throw new NullPointerException();
395 dl 1.35 final ReentrantLock qlock = this.qlock;
396    
397 dl 1.2 for (;;) {
398     Node node;
399     boolean mustWait;
400 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
401     qlock.lock();
402 dl 1.2 try {
403 dl 1.43 node = waitingConsumers.deq();
404 dl 1.2 if ( (mustWait = (node == null)) )
405 jsr166 1.49 node = waitingProducers.enq(e);
406 tim 1.14 } finally {
407 dl 1.2 qlock.unlock();
408     }
409    
410 dl 1.31 if (mustWait) {
411 dl 1.47 try {
412     node.waitForTake();
413     return;
414     } catch (InterruptedException ex) {
415     unlinkCancelledProducer(node);
416     throw ex;
417     }
418 dl 1.2 }
419    
420 jsr166 1.49 else if (node.setItem(e))
421 dl 1.35 return;
422 dl 1.2
423 dl 1.43 // else consumer cancelled, so retry
424 dl 1.35 }
425 tim 1.1 }
426    
427 dholmes 1.11 /**
428 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
429 dl 1.18 * up to the specified wait time for another thread to receive it.
430 jsr166 1.50 *
431     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
432     * specified waiting time elapses before a consumer appears.
433     * @throws InterruptedException {@inheritDoc}
434     * @throws NullPointerException {@inheritDoc}
435 dholmes 1.11 */
436 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
437     if (e == null) throw new NullPointerException();
438 dl 1.35 long nanos = unit.toNanos(timeout);
439     final ReentrantLock qlock = this.qlock;
440     for (;;) {
441     Node node;
442     boolean mustWait;
443 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
444     qlock.lock();
445 dl 1.35 try {
446 dl 1.43 node = waitingConsumers.deq();
447 dl 1.35 if ( (mustWait = (node == null)) )
448 jsr166 1.49 node = waitingProducers.enq(e);
449 dl 1.35 } finally {
450     qlock.unlock();
451     }
452    
453 dl 1.47 if (mustWait) {
454     try {
455     boolean x = node.waitForTake(nanos);
456 jsr166 1.48 if (!x)
457 dl 1.47 unlinkCancelledProducer(node);
458     return x;
459     } catch (InterruptedException ex) {
460     unlinkCancelledProducer(node);
461     throw ex;
462     }
463     }
464 dl 1.35
465 jsr166 1.49 else if (node.setItem(e))
466 dl 1.35 return true;
467    
468 dl 1.43 // else consumer cancelled, so retry
469 dl 1.35 }
470 tim 1.1 }
471    
472 dholmes 1.11 /**
473     * Retrieves and removes the head of this queue, waiting if necessary
474     * for another thread to insert it.
475 jsr166 1.50 *
476 dholmes 1.11 * @return the head of this queue
477 jsr166 1.50 * @throws InterruptedException {@inheritDoc}
478 dholmes 1.11 */
479 dl 1.2 public E take() throws InterruptedException {
480 dl 1.35 final ReentrantLock qlock = this.qlock;
481     for (;;) {
482     Node node;
483     boolean mustWait;
484    
485 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
486     qlock.lock();
487 dl 1.35 try {
488 dl 1.43 node = waitingProducers.deq();
489 dl 1.35 if ( (mustWait = (node == null)) )
490 dl 1.43 node = waitingConsumers.enq(null);
491 dl 1.35 } finally {
492     qlock.unlock();
493     }
494    
495 dl 1.36 if (mustWait) {
496 dl 1.47 try {
497     Object x = node.waitForPut();
498     return (E)x;
499     } catch (InterruptedException ex) {
500     unlinkCancelledConsumer(node);
501     throw ex;
502     }
503 dl 1.36 }
504 dl 1.35 else {
505     Object x = node.getItem();
506     if (x != null)
507     return (E)x;
508     // else cancelled, so retry
509     }
510     }
511 tim 1.1 }
512 dl 1.2
513 dholmes 1.11 /**
514     * Retrieves and removes the head of this queue, waiting
515     * if necessary up to the specified wait time, for another thread
516     * to insert it.
517 jsr166 1.50 *
518 dl 1.18 * @return the head of this queue, or <tt>null</tt> if the
519 jsr166 1.50 * specified waiting time elapses before an element is present.
520     * @throws InterruptedException {@inheritDoc}
521 dholmes 1.11 */
522 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
523 dl 1.35 long nanos = unit.toNanos(timeout);
524     final ReentrantLock qlock = this.qlock;
525    
526     for (;;) {
527     Node node;
528     boolean mustWait;
529    
530 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
531     qlock.lock();
532 dl 1.35 try {
533 dl 1.43 node = waitingProducers.deq();
534 dl 1.35 if ( (mustWait = (node == null)) )
535 dl 1.43 node = waitingConsumers.enq(null);
536 dl 1.35 } finally {
537     qlock.unlock();
538     }
539    
540 dl 1.36 if (mustWait) {
541 dl 1.47 try {
542     Object x = node.waitForPut(nanos);
543 jsr166 1.48 if (x == null)
544 dl 1.47 unlinkCancelledConsumer(node);
545     return (E)x;
546     } catch (InterruptedException ex) {
547     unlinkCancelledConsumer(node);
548     throw ex;
549     }
550 dl 1.36 }
551 dl 1.35 else {
552     Object x = node.getItem();
553     if (x != null)
554     return (E)x;
555     // else cancelled, so retry
556     }
557     }
558 tim 1.1 }
559 dl 1.2
560     // Untimed nonblocking versions
561    
562 jsr166 1.50 /**
563     * Inserts the specified element into this queue, if another thread is
564     * waiting to receive it.
565     *
566     * @param e the element to add
567     * @return <tt>true</tt> if it was possible to add the element to
568     * this queue, else <tt>false</tt>
569     * @throws NullPointerException if the specified element is null
570     */
571 jsr166 1.49 public boolean offer(E e) {
572     if (e == null) throw new NullPointerException();
573 dl 1.27 final ReentrantLock qlock = this.qlock;
574 tim 1.10
575     for (;;) {
576 tim 1.26 Node node;
577 dl 1.2 qlock.lock();
578     try {
579 dl 1.43 node = waitingConsumers.deq();
580 tim 1.14 } finally {
581 dl 1.2 qlock.unlock();
582     }
583     if (node == null)
584     return false;
585 tim 1.10
586 jsr166 1.49 else if (node.setItem(e))
587 dl 1.2 return true;
588     // else retry
589     }
590 tim 1.1 }
591 dl 1.2
592 dl 1.18 /**
593     * Retrieves and removes the head of this queue, if another thread
594     * is currently making an element available.
595     *
596     * @return the head of this queue, or <tt>null</tt> if no
597     * element is available.
598     */
599 dl 1.2 public E poll() {
600 dl 1.27 final ReentrantLock qlock = this.qlock;
601 dl 1.2 for (;;) {
602     Node node;
603     qlock.lock();
604     try {
605 dl 1.43 node = waitingProducers.deq();
606 tim 1.14 } finally {
607 dl 1.2 qlock.unlock();
608     }
609     if (node == null)
610     return null;
611    
612     else {
613 dl 1.31 Object x = node.getItem();
614 dl 1.2 if (x != null)
615     return (E)x;
616     // else retry
617     }
618     }
619 tim 1.1 }
620 dl 1.2
621 dl 1.5 /**
622 jsr166 1.48 * Always returns <tt>true</tt>.
623 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
624 jsr166 1.50 *
625 dholmes 1.11 * @return <tt>true</tt>
626 dl 1.5 */
627     public boolean isEmpty() {
628     return true;
629     }
630    
631     /**
632 dholmes 1.11 * Always returns zero.
633     * A <tt>SynchronousQueue</tt> has no internal capacity.
634 jsr166 1.50 *
635     * @return zero
636 dl 1.5 */
637     public int size() {
638     return 0;
639 tim 1.1 }
640 dl 1.2
641 dl 1.5 /**
642 dholmes 1.11 * Always returns zero.
643     * A <tt>SynchronousQueue</tt> has no internal capacity.
644 jsr166 1.50 *
645     * @return zero
646 dl 1.5 */
647     public int remainingCapacity() {
648     return 0;
649     }
650    
651     /**
652 dholmes 1.11 * Does nothing.
653     * A <tt>SynchronousQueue</tt> has no internal capacity.
654     */
655     public void clear() {}
656    
657     /**
658     * Always returns <tt>false</tt>.
659     * A <tt>SynchronousQueue</tt> has no internal capacity.
660 jsr166 1.50 *
661 dl 1.18 * @param o the element
662 dholmes 1.11 * @return <tt>false</tt>
663     */
664     public boolean contains(Object o) {
665     return false;
666     }
667    
668     /**
669 dl 1.18 * Always returns <tt>false</tt>.
670     * A <tt>SynchronousQueue</tt> has no internal capacity.
671     *
672     * @param o the element to remove
673     * @return <tt>false</tt>
674     */
675     public boolean remove(Object o) {
676     return false;
677     }
678    
679     /**
680 jsr166 1.50 * Returns <tt>false</tt> unless the given collection is empty.
681 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
682 jsr166 1.50 *
683 dl 1.18 * @param c the collection
684 jsr166 1.50 * @return <tt>false</tt> unless the given collection is empty
685     * @throws NullPointerException if the specified collection is null
686 dholmes 1.11 */
687 dl 1.12 public boolean containsAll(Collection<?> c) {
688 dl 1.16 return c.isEmpty();
689 dholmes 1.11 }
690    
691     /**
692     * Always returns <tt>false</tt>.
693     * A <tt>SynchronousQueue</tt> has no internal capacity.
694 jsr166 1.50 *
695 dl 1.18 * @param c the collection
696 dholmes 1.11 * @return <tt>false</tt>
697     */
698 dl 1.12 public boolean removeAll(Collection<?> c) {
699 dholmes 1.11 return false;
700     }
701    
702     /**
703     * Always returns <tt>false</tt>.
704     * A <tt>SynchronousQueue</tt> has no internal capacity.
705 jsr166 1.50 *
706 dl 1.18 * @param c the collection
707 dholmes 1.11 * @return <tt>false</tt>
708     */
709 dl 1.12 public boolean retainAll(Collection<?> c) {
710 dholmes 1.11 return false;
711     }
712    
713     /**
714 jsr166 1.48 * Always returns <tt>null</tt>.
715 dholmes 1.11 * A <tt>SynchronousQueue</tt> does not return elements
716 dl 1.5 * unless actively waited on.
717 jsr166 1.50 *
718 dholmes 1.11 * @return <tt>null</tt>
719 dl 1.5 */
720     public E peek() {
721     return null;
722     }
723    
724    
725     static class EmptyIterator<E> implements Iterator<E> {
726 dl 1.2 public boolean hasNext() {
727     return false;
728     }
729     public E next() {
730     throw new NoSuchElementException();
731     }
732     public void remove() {
733 dl 1.17 throw new IllegalStateException();
734 dl 1.2 }
735 tim 1.1 }
736 dl 1.2
737 dl 1.5 /**
738 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
739 tim 1.13 * <tt>false</tt>.
740     *
741 dholmes 1.11 * @return an empty iterator
742 dl 1.5 */
743 dl 1.2 public Iterator<E> iterator() {
744 dl 1.5 return new EmptyIterator<E>();
745 tim 1.1 }
746    
747 dl 1.2
748 dl 1.5 /**
749 dholmes 1.11 * Returns a zero-length array.
750     * @return a zero-length array
751 dl 1.5 */
752 dl 1.3 public Object[] toArray() {
753 dl 1.25 return new Object[0];
754 tim 1.1 }
755    
756 dholmes 1.11 /**
757     * Sets the zeroeth element of the specified array to <tt>null</tt>
758     * (if the array has non-zero length) and returns it.
759 jsr166 1.50 *
760 dl 1.40 * @param a the array
761 dholmes 1.11 * @return the specified array
762 jsr166 1.50 * @throws NullPointerException if the specified array is null
763 dholmes 1.11 */
764 dl 1.2 public <T> T[] toArray(T[] a) {
765     if (a.length > 0)
766     a[0] = null;
767     return a;
768     }
769 dl 1.21
770 jsr166 1.50 /**
771     * @throws UnsupportedOperationException {@inheritDoc}
772     * @throws ClassCastException {@inheritDoc}
773     * @throws NullPointerException {@inheritDoc}
774     * @throws IllegalArgumentException {@inheritDoc}
775     */
776 dl 1.21 public int drainTo(Collection<? super E> c) {
777     if (c == null)
778     throw new NullPointerException();
779     if (c == this)
780     throw new IllegalArgumentException();
781     int n = 0;
782     E e;
783     while ( (e = poll()) != null) {
784     c.add(e);
785     ++n;
786     }
787     return n;
788     }
789    
790 jsr166 1.50 /**
791     * @throws UnsupportedOperationException {@inheritDoc}
792     * @throws ClassCastException {@inheritDoc}
793     * @throws NullPointerException {@inheritDoc}
794     * @throws IllegalArgumentException {@inheritDoc}
795     */
796 dl 1.21 public int drainTo(Collection<? super E> c, int maxElements) {
797     if (c == null)
798     throw new NullPointerException();
799     if (c == this)
800     throw new IllegalArgumentException();
801     int n = 0;
802     E e;
803     while (n < maxElements && (e = poll()) != null) {
804     c.add(e);
805     ++n;
806     }
807     return n;
808     }
809 tim 1.1 }