[cvs] / jsr166 / src / main / java / util / concurrent / SynchronousQueue.java Repository:
ViewVC logotype

Annotation of /jsr166/src/main/java/util/concurrent/SynchronousQueue.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.54 - (view) (download)

1 : dl 1.2 /*
2 :     * Written by Doug Lea with assistance from members of JCP JSR-166
3 : dl 1.29 * Expert Group and released to the public domain, as explained at
4 :     * http://creativecommons.org/licenses/publicdomain
5 : dl 1.2 */
6 :    
7 : tim 1.1 package java.util.concurrent;
8 : dl 1.8 import java.util.concurrent.locks.*;
9 : tim 1.1 import java.util.*;
10 :    
11 :     /**
12 : jsr166 1.52 * A {@linkplain BlockingQueue blocking queue} in which each insert
13 :     * operation must wait for a corresponding remove operation by another
14 :     * thread, and vice versa. A synchronous queue does not have any
15 :     * internal capacity, not even a capacity of one. You cannot
16 :     * <tt>peek</tt> at a synchronous queue because an element is only
17 :     * present when you try to remove it; you cannot insert an element
18 :     * (using any method) unless another thread is trying to remove it;
19 :     * you cannot iterate as there is nothing to iterate. The
20 :     * <em>head</em> of the queue is the element that the first queued
21 :     * inserting thread is trying to add to the queue; if there is no such
22 :     * queued thread then no element is available for removal and
23 :     * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
24 :     * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
25 :     * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
26 :     * does not permit <tt>null</tt> elements.
27 : dl 1.18 *
28 :     * <p>Synchronous queues are similar to rendezvous channels used in
29 :     * CSP and Ada. They are well suited for handoff designs, in which an
30 : dl 1.30 * object running in one thread must sync up with an object running
31 : dl 1.18 * in another thread in order to hand it some information, event, or
32 :     * task.
33 : dl 1.43 *
34 :     * <p> This class supports an optional fairness policy for ordering
35 :     * waiting producer and consumer threads. By default, this ordering
36 :     * is not guaranteed. However, a queue constructed with fairness set
37 :     * to <tt>true</tt> grants threads access in FIFO order. Fairness
38 :     * generally decreases throughput but reduces variability and avoids
39 :     * starvation.
40 :     *
41 : dl 1.46 * <p>This class and its iterator implement all of the
42 :     * <em>optional</em> methods of the {@link Collection} and {@link
43 : jsr166 1.48 * Iterator} interfaces.
44 : dl 1.42 *
45 :     * <p>This class is a member of the
46 :     * <a href="{@docRoot}/../guide/collections/index.html">
47 :     * Java Collections Framework</a>.
48 :     *
49 : dl 1.6 * @since 1.5
50 :     * @author Doug Lea
51 : dl 1.24 * @param <E> the type of elements held in this collection
52 : dl 1.23 */
53 : dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
54 : tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
55 : dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
56 : tim 1.1
57 : dl 1.2 /*
58 :     This implementation divides actions into two cases for puts:
59 :    
60 : dl 1.43 * An arriving producer that does not already have a waiting consumer
61 : jsr166 1.50 creates a node holding item, and then waits for a consumer to take it.
62 : dl 1.43 * An arriving producer that does already have a waiting consumer fills
63 : jsr166 1.50 the slot node created by the consumer, and notifies it to continue.
64 : dl 1.2
65 :     And symmetrically, two for takes:
66 :    
67 : dl 1.43 * An arriving consumer that does not already have a waiting producer
68 : jsr166 1.50 creates an empty slot node, and then waits for a producer to fill it.
69 : dl 1.43 * An arriving consumer that does already have a waiting producer takes
70 : jsr166 1.50 item from the node created by the producer, and notifies it to continue.
71 : tim 1.10
72 : dl 1.2 When a put or take waiting for the actions of its counterpart
73 :     aborts due to interruption or timeout, it marks the node
74 :     it created as "CANCELLED", which causes its counterpart to retry
75 :     the entire put or take sequence.
76 : dl 1.43
77 :     This requires keeping two simple queues, waitingProducers and
78 :     waitingConsumers. Each of these can be FIFO (preserves fairness)
79 :     or LIFO (improves throughput).
80 : dl 1.2 */
81 :    
82 : dl 1.43 /** Lock protecting both wait queues */
83 :     private final ReentrantLock qlock;
84 :     /** Queue holding waiting puts */
85 :     private final WaitQueue waitingProducers;
86 :     /** Queue holding waiting takes */
87 :     private final WaitQueue waitingConsumers;
88 : dl 1.2
89 : dl 1.43 /**
90 :     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
91 :     */
92 :     public SynchronousQueue() {
93 : dl 1.44 this(false);
94 : dl 1.43 }
95 :    
96 :     /**
97 :     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98 : dl 1.44 * @param fair if true, threads contend in FIFO order for access;
99 :     * otherwise the order is unspecified.
100 : dl 1.43 */
101 :     public SynchronousQueue(boolean fair) {
102 :     if (fair) {
103 :     qlock = new ReentrantLock(true);
104 :     waitingProducers = new FifoWaitQueue();
105 :     waitingConsumers = new FifoWaitQueue();
106 :     }
107 :     else {
108 :     qlock = new ReentrantLock();
109 :     waitingProducers = new LifoWaitQueue();
110 :     waitingConsumers = new LifoWaitQueue();
111 :     }
112 :     }
113 :    
114 :     /**
115 : dl 1.45 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
116 : dl 1.43 * These queues have all transient fields, but are serializable
117 : dl 1.44 * in order to recover fairness settings when deserialized.
118 : dl 1.43 */
119 :     static abstract class WaitQueue implements java.io.Serializable {
120 : jsr166 1.48 /** Creates, adds, and returns node for x. */
121 : dl 1.43 abstract Node enq(Object x);
122 : jsr166 1.48 /** Removes and returns node, or null if empty. */
123 : dl 1.43 abstract Node deq();
124 : jsr166 1.48 /** Removes a cancelled node to avoid garbage retention. */
125 : dl 1.47 abstract void unlink(Node node);
126 : jsr166 1.48 /** Returns true if a cancelled node might be on queue. */
127 : dl 1.47 abstract boolean shouldUnlink(Node node);
128 : dl 1.43 }
129 :    
130 :     /**
131 :     * FIFO queue to hold waiting puts/takes.
132 :     */
133 :     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
134 :     private static final long serialVersionUID = -3623113410248163686L;
135 :     private transient Node head;
136 :     private transient Node last;
137 :    
138 :     Node enq(Object x) {
139 :     Node p = new Node(x);
140 :     if (last == null)
141 :     last = head = p;
142 :     else
143 :     last = last.next = p;
144 :     return p;
145 :     }
146 :    
147 :     Node deq() {
148 :     Node p = head;
149 :     if (p != null) {
150 :     if ((head = p.next) == null)
151 :     last = null;
152 :     p.next = null;
153 :     }
154 :     return p;
155 :     }
156 : dl 1.47
157 :     boolean shouldUnlink(Node node) {
158 :     return (node == last || node.next != null);
159 :     }
160 :    
161 :     void unlink(Node node) {
162 :     Node p = head;
163 :     Node trail = null;
164 :     while (p != null) {
165 :     if (p == node) {
166 :     Node next = p.next;
167 : jsr166 1.48 if (trail == null)
168 : dl 1.47 head = next;
169 :     else
170 :     trail.next = next;
171 :     if (last == node)
172 :     last = trail;
173 :     break;
174 :     }
175 :     trail = p;
176 :     p = p.next;
177 :     }
178 :     }
179 : dl 1.43 }
180 :    
181 :     /**
182 :     * LIFO queue to hold waiting puts/takes.
183 : dl 1.2 */
184 : dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185 :     private static final long serialVersionUID = -3633113410248163686L;
186 :     private transient Node head;
187 : dl 1.2
188 : dl 1.43 Node enq(Object x) {
189 :     return head = new Node(x, head);
190 :     }
191 :    
192 :     Node deq() {
193 :     Node p = head;
194 : dl 1.44 if (p != null) {
195 : dl 1.43 head = p.next;
196 : dl 1.44 p.next = null;
197 :     }
198 : dl 1.43 return p;
199 :     }
200 : dl 1.47
201 :     boolean shouldUnlink(Node node) {
202 :     // Return false if already dequeued or is bottom node (in which
203 :     // case we might retain at most one garbage node)
204 :     return (node == head || node.next != null);
205 :     }
206 :    
207 :     void unlink(Node node) {
208 :     Node p = head;
209 :     Node trail = null;
210 :     while (p != null) {
211 :     if (p == node) {
212 :     Node next = p.next;
213 : jsr166 1.48 if (trail == null)
214 : dl 1.47 head = next;
215 :     else
216 :     trail.next = next;
217 :     break;
218 :     }
219 :     trail = p;
220 :     p = p.next;
221 :     }
222 :     }
223 :     }
224 :    
225 : jsr166 1.50 /**
226 :     * Unlinks the given node from consumer queue. Called by cancelled
227 : dl 1.47 * (timeout, interrupt) waiters to avoid garbage retention in the
228 : jsr166 1.48 * absence of producers.
229 : dl 1.47 */
230 :     private void unlinkCancelledConsumer(Node node) {
231 :     // Use a form of double-check to avoid unnecessary locking and
232 :     // traversal. The first check outside lock might
233 :     // conservatively report true.
234 :     if (waitingConsumers.shouldUnlink(node)) {
235 :     qlock.lock();
236 :     try {
237 : jsr166 1.48 if (waitingConsumers.shouldUnlink(node))
238 : dl 1.47 waitingConsumers.unlink(node);
239 :     } finally {
240 :     qlock.unlock();
241 :     }
242 :     }
243 : dl 1.43 }
244 : dl 1.2
245 : jsr166 1.50 /**
246 :     * Unlinks the given node from producer queue. Symmetric
247 : dl 1.47 * to unlinkCancelledConsumer.
248 :     */
249 :     private void unlinkCancelledProducer(Node node) {
250 :     if (waitingProducers.shouldUnlink(node)) {
251 :     qlock.lock();
252 :     try {
253 : jsr166 1.48 if (waitingProducers.shouldUnlink(node))
254 : dl 1.47 waitingProducers.unlink(node);
255 :     } finally {
256 :     qlock.unlock();
257 :     }
258 :     }
259 :     }
260 : jsr166 1.48
261 : dl 1.2 /**
262 :     * Nodes each maintain an item and handle waits and signals for
263 : dl 1.31 * getting and setting it. The class extends
264 :     * AbstractQueuedSynchronizer to manage blocking, using AQS state
265 :     * 0 for waiting, 1 for ack, -1 for cancelled.
266 : dl 1.2 */
267 : dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
268 : dl 1.53 private static final long serialVersionUID = -2631493897867746127L;
269 :    
270 : dl 1.35 /** Synchronization state value representing that node acked */
271 :     private static final int ACK = 1;
272 :     /** Synchronization state value representing that node cancelled */
273 :     private static final int CANCEL = -1;
274 :    
275 : dl 1.6 /** The item being transferred */
276 : dl 1.2 Object item;
277 : dl 1.6 /** Next node in wait queue */
278 : dl 1.2 Node next;
279 : dl 1.35
280 : dl 1.44 /** Creates a node with initial item */
281 : dl 1.31 Node(Object x) { item = x; }
282 :    
283 : dl 1.44 /** Creates a node with initial item and next */
284 : dl 1.43 Node(Object x, Node n) { item = x; next = n; }
285 :    
286 : dl 1.31 /**
287 :     * Implements AQS base acquire to succeed if not in WAITING state
288 :     */
289 : dl 1.39 protected boolean tryAcquire(int ignore) {
290 : dl 1.35 return getState() != 0;
291 : dl 1.31 }
292 :    
293 :     /**
294 : dl 1.34 * Implements AQS base release to signal if state changed
295 : dl 1.31 */
296 : dl 1.39 protected boolean tryRelease(int newState) {
297 : dl 1.35 return compareAndSetState(0, newState);
298 : dl 1.31 }
299 :    
300 :     /**
301 : dl 1.44 * Takes item and nulls out field (for sake of GC)
302 : dl 1.31 */
303 : dl 1.35 private Object extract() {
304 :     Object x = item;
305 :     item = null;
306 :     return x;
307 : dl 1.31 }
308 :    
309 :     /**
310 : dl 1.44 * Tries to cancel on interrupt; if so rethrowing,
311 : dl 1.35 * else setting interrupt state
312 : dl 1.31 */
313 : jsr166 1.48 private void checkCancellationOnInterrupt(InterruptedException ie)
314 : dl 1.35 throws InterruptedException {
315 : jsr166 1.48 if (release(CANCEL))
316 : dl 1.35 throw ie;
317 :     Thread.currentThread().interrupt();
318 : dl 1.31 }
319 : dl 1.2
320 :     /**
321 : dl 1.44 * Fills in the slot created by the consumer and signal consumer to
322 : dl 1.2 * continue.
323 :     */
324 : dl 1.31 boolean setItem(Object x) {
325 : dl 1.35 item = x; // can place in slot even if cancelled
326 : dl 1.39 return release(ACK);
327 : dl 1.2 }
328 :    
329 :     /**
330 : dl 1.44 * Removes item from slot created by producer and signal producer
331 : dl 1.2 * to continue.
332 :     */
333 : dl 1.31 Object getItem() {
334 : dl 1.39 return (release(ACK))? extract() : null;
335 : dl 1.35 }
336 :    
337 :     /**
338 : dl 1.44 * Waits for a consumer to take item placed by producer.
339 : dl 1.35 */
340 :     void waitForTake() throws InterruptedException {
341 :     try {
342 : dl 1.39 acquireInterruptibly(0);
343 : dl 1.35 } catch (InterruptedException ie) {
344 :     checkCancellationOnInterrupt(ie);
345 :     }
346 :     }
347 :    
348 :     /**
349 : dl 1.44 * Waits for a producer to put item placed by consumer.
350 : dl 1.35 */
351 :     Object waitForPut() throws InterruptedException {
352 :     try {
353 : dl 1.39 acquireInterruptibly(0);
354 : dl 1.35 } catch (InterruptedException ie) {
355 :     checkCancellationOnInterrupt(ie);
356 :     }
357 :     return extract();
358 : dl 1.31 }
359 :    
360 :     /**
361 : dl 1.44 * Waits for a consumer to take item placed by producer or time out.
362 : dl 1.31 */
363 : dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
364 : dl 1.2 try {
365 : dl 1.39 if (!tryAcquireNanos(0, nanos) &&
366 :     release(CANCEL))
367 : dl 1.33 return false;
368 : dl 1.31 } catch (InterruptedException ie) {
369 : dl 1.35 checkCancellationOnInterrupt(ie);
370 : dl 1.2 }
371 : dl 1.35 return true;
372 : dl 1.2 }
373 :    
374 :     /**
375 : dl 1.44 * Waits for a producer to put item placed by consumer, or time out.
376 : dl 1.31 */
377 : dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
378 : dl 1.31 try {
379 : dl 1.39 if (!tryAcquireNanos(0, nanos) &&
380 :     release(CANCEL))
381 : dl 1.33 return null;
382 : dl 1.31 } catch (InterruptedException ie) {
383 : dl 1.35 checkCancellationOnInterrupt(ie);
384 : dl 1.2 }
385 : dl 1.35 return extract();
386 : dl 1.2 }
387 :     }
388 :    
389 :     /**
390 : dl 1.35 * Adds the specified element to this queue, waiting if necessary for
391 :     * another thread to receive it.
392 : jsr166 1.50 *
393 :     * @throws InterruptedException {@inheritDoc}
394 :     * @throws NullPointerException {@inheritDoc}
395 : tim 1.10 */
396 : jsr166 1.49 public void put(E e) throws InterruptedException {
397 :     if (e == null) throw new NullPointerException();
398 : dl 1.35 final ReentrantLock qlock = this.qlock;
399 :    
400 : dl 1.2 for (;;) {
401 :     Node node;
402 :     boolean mustWait;
403 : dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
404 :     qlock.lock();
405 : dl 1.2 try {
406 : dl 1.43 node = waitingConsumers.deq();
407 : dl 1.2 if ( (mustWait = (node == null)) )
408 : jsr166 1.49 node = waitingProducers.enq(e);
409 : tim 1.14 } finally {
410 : dl 1.2 qlock.unlock();
411 :     }
412 :    
413 : dl 1.31 if (mustWait) {
414 : dl 1.47 try {
415 :     node.waitForTake();
416 :     return;
417 :     } catch (InterruptedException ex) {
418 :     unlinkCancelledProducer(node);
419 :     throw ex;
420 :     }
421 : dl 1.2 }
422 :    
423 : jsr166 1.49 else if (node.setItem(e))
424 : dl 1.35 return;
425 : dl 1.2
426 : dl 1.43 // else consumer cancelled, so retry
427 : dl 1.35 }
428 : tim 1.1 }
429 :    
430 : dholmes 1.11 /**
431 : dl 1.20 * Inserts the specified element into this queue, waiting if necessary
432 : dl 1.18 * up to the specified wait time for another thread to receive it.
433 : jsr166 1.50 *
434 :     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
435 :     * specified waiting time elapses before a consumer appears.
436 :     * @throws InterruptedException {@inheritDoc}
437 :     * @throws NullPointerException {@inheritDoc}
438 : dholmes 1.11 */
439 : jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
440 :     if (e == null) throw new NullPointerException();
441 : dl 1.35 long nanos = unit.toNanos(timeout);
442 :     final ReentrantLock qlock = this.qlock;
443 :     for (;;) {
444 :     Node node;
445 :     boolean mustWait;
446 : dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
447 :     qlock.lock();
448 : dl 1.35 try {
449 : dl 1.43 node = waitingConsumers.deq();
450 : dl 1.35 if ( (mustWait = (node == null)) )
451 : jsr166 1.49 node = waitingProducers.enq(e);
452 : dl 1.35 } finally {
453 :     qlock.unlock();
454 :     }
455 :    
456 : dl 1.47 if (mustWait) {
457 :     try {
458 :     boolean x = node.waitForTake(nanos);
459 : jsr166 1.48 if (!x)
460 : dl 1.47 unlinkCancelledProducer(node);
461 :     return x;
462 :     } catch (InterruptedException ex) {
463 :     unlinkCancelledProducer(node);
464 :     throw ex;
465 :     }
466 :     }
467 : dl 1.35
468 : jsr166 1.49 else if (node.setItem(e))
469 : dl 1.35 return true;
470 :    
471 : dl 1.43 // else consumer cancelled, so retry
472 : dl 1.35 }
473 : tim 1.1 }
474 :    
475 : dholmes 1.11 /**
476 :     * Retrieves and removes the head of this queue, waiting if necessary
477 :     * for another thread to insert it.
478 : jsr166 1.50 *
479 : dholmes 1.11 * @return the head of this queue
480 : jsr166 1.50 * @throws InterruptedException {@inheritDoc}
481 : dholmes 1.11 */
482 : dl 1.2 public E take() throws InterruptedException {
483 : dl 1.35 final ReentrantLock qlock = this.qlock;
484 :     for (;;) {
485 :     Node node;
486 :     boolean mustWait;
487 :    
488 : dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
489 :     qlock.lock();
490 : dl 1.35 try {
491 : dl 1.43 node = waitingProducers.deq();
492 : dl 1.35 if ( (mustWait = (node == null)) )
493 : dl 1.43 node = waitingConsumers.enq(null);
494 : dl 1.35 } finally {
495 :     qlock.unlock();
496 :     }
497 :    
498 : dl 1.36 if (mustWait) {
499 : dl 1.47 try {
500 :     Object x = node.waitForPut();
501 :     return (E)x;
502 :     } catch (InterruptedException ex) {
503 :     unlinkCancelledConsumer(node);
504 :     throw ex;
505 :     }
506 : dl 1.36 }
507 : dl 1.35 else {
508 :     Object x = node.getItem();
509 :     if (x != null)
510 :     return (E)x;
511 :     // else cancelled, so retry
512 :     }
513 :     }
514 : tim 1.1 }
515 : dl 1.2
516 : dholmes 1.11 /**
517 :     * Retrieves and removes the head of this queue, waiting
518 :     * if necessary up to the specified wait time, for another thread
519 :     * to insert it.
520 : jsr166 1.50 *
521 : dl 1.18 * @return the head of this queue, or <tt>null</tt> if the
522 : jsr166 1.50 * specified waiting time elapses before an element is present.
523 :     * @throws InterruptedException {@inheritDoc}
524 : dholmes 1.11 */
525 : dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
526 : dl 1.35 long nanos = unit.toNanos(timeout);
527 :     final ReentrantLock qlock = this.qlock;
528 :    
529 :     for (;;) {
530 :     Node node;
531 :     boolean mustWait;
532 :    
533 : dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
534 :     qlock.lock();
535 : dl 1.35 try {
536 : dl 1.43 node = waitingProducers.deq();
537 : dl 1.35 if ( (mustWait = (node == null)) )
538 : dl 1.43 node = waitingConsumers.enq(null);
539 : dl 1.35 } finally {
540 :     qlock.unlock();
541 :     }
542 :    
543 : dl 1.36 if (mustWait) {
544 : dl 1.47 try {
545 :     Object x = node.waitForPut(nanos);
546 : jsr166 1.48 if (x == null)
547 : dl 1.47 unlinkCancelledConsumer(node);
548 :     return (E)x;
549 :     } catch (InterruptedException ex) {
550 :     unlinkCancelledConsumer(node);
551 :     throw ex;
552 :     }
553 : dl 1.36 }
554 : dl 1.35 else {
555 :     Object x = node.getItem();
556 :     if (x != null)
557 :     return (E)x;
558 :     // else cancelled, so retry
559 :     }
560 :     }
561 : tim 1.1 }
562 : dl 1.2
563 :     // Untimed nonblocking versions
564 :    
565 : jsr166 1.50 /**
566 :     * Inserts the specified element into this queue, if another thread is
567 :     * waiting to receive it.
568 :     *
569 :     * @param e the element to add
570 : jsr166 1.51 * @return <tt>true</tt> if the element was added to this queue, else
571 :     * <tt>false</tt>
572 : jsr166 1.50 * @throws NullPointerException if the specified element is null
573 :     */
574 : jsr166 1.49 public boolean offer(E e) {
575 :     if (e == null) throw new NullPointerException();
576 : dl 1.27 final ReentrantLock qlock = this.qlock;
577 : tim 1.10
578 :     for (;;) {
579 : tim 1.26 Node node;
580 : dl 1.2 qlock.lock();
581 :     try {
582 : dl 1.43 node = waitingConsumers.deq();
583 : tim 1.14 } finally {
584 : dl 1.2 qlock.unlock();
585 :     }
586 :     if (node == null)
587 :     return false;
588 : tim 1.10
589 : jsr166 1.49 else if (node.setItem(e))
590 : dl 1.2 return true;
591 :     // else retry
592 :     }
593 : tim 1.1 }
594 : dl 1.2
595 : dl 1.18 /**
596 :     * Retrieves and removes the head of this queue, if another thread
597 :     * is currently making an element available.
598 :     *
599 :     * @return the head of this queue, or <tt>null</tt> if no
600 :     * element is available.
601 :     */
602 : dl 1.2 public E poll() {
603 : dl 1.27 final ReentrantLock qlock = this.qlock;
604 : dl 1.2 for (;;) {
605 :     Node node;
606 :     qlock.lock();
607 :     try {
608 : dl 1.43 node = waitingProducers.deq();
609 : tim 1.14 } finally {
610 : dl 1.2 qlock.unlock();
611 :     }
612 :     if (node == null)
613 :     return null;
614 :    
615 :     else {
616 : dl 1.31 Object x = node.getItem();
617 : dl 1.2 if (x != null)
618 :     return (E)x;
619 :     // else retry
620 :     }
621 :     }
622 : tim 1.1 }
623 : dl 1.2
624 : dl 1.5 /**
625 : jsr166 1.48 * Always returns <tt>true</tt>.
626 : dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
627 : jsr166 1.50 *
628 : dholmes 1.11 * @return <tt>true</tt>
629 : dl 1.5 */
630 :     public boolean isEmpty() {
631 :     return true;
632 :     }
633 :    
634 :     /**
635 : dholmes 1.11 * Always returns zero.
636 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
637 : jsr166 1.50 *
638 :     * @return zero
639 : dl 1.5 */
640 :     public int size() {
641 :     return 0;
642 : tim 1.1 }
643 : dl 1.2
644 : dl 1.5 /**
645 : dholmes 1.11 * Always returns zero.
646 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
647 : jsr166 1.50 *
648 :     * @return zero
649 : dl 1.5 */
650 :     public int remainingCapacity() {
651 :     return 0;
652 :     }
653 :    
654 :     /**
655 : dholmes 1.11 * Does nothing.
656 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
657 :     */
658 :     public void clear() {}
659 :    
660 :     /**
661 :     * Always returns <tt>false</tt>.
662 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
663 : jsr166 1.50 *
664 : jsr166 1.54 * @param o object to be checked for containment in this queue
665 : dholmes 1.11 * @return <tt>false</tt>
666 :     */
667 :     public boolean contains(Object o) {
668 :     return false;
669 :     }
670 :    
671 :     /**
672 : dl 1.18 * Always returns <tt>false</tt>.
673 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
674 :     *
675 :     * @param o the element to remove
676 :     * @return <tt>false</tt>
677 :     */
678 :     public boolean remove(Object o) {
679 :     return false;
680 :     }
681 :    
682 :     /**
683 : jsr166 1.50 * Returns <tt>false</tt> unless the given collection is empty.
684 : dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
685 : jsr166 1.50 *
686 : dl 1.18 * @param c the collection
687 : jsr166 1.50 * @return <tt>false</tt> unless the given collection is empty
688 :     * @throws NullPointerException if the specified collection is null
689 : dholmes 1.11 */
690 : dl 1.12 public boolean containsAll(Collection<?> c) {
691 : dl 1.16 return c.isEmpty();
692 : dholmes 1.11 }
693 :    
694 :     /**
695 :     * Always returns <tt>false</tt>.
696 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
697 : jsr166 1.50 *
698 : dl 1.18 * @param c the collection
699 : dholmes 1.11 * @return <tt>false</tt>
700 :     */
701 : dl 1.12 public boolean removeAll(Collection<?> c) {
702 : dholmes 1.11 return false;
703 :     }
704 :    
705 :     /**
706 :     * Always returns <tt>false</tt>.
707 :     * A <tt>SynchronousQueue</tt> has no internal capacity.
708 : jsr166 1.50 *
709 : dl 1.18 * @param c the collection
710 : dholmes 1.11 * @return <tt>false</tt>
711 :     */
712 : dl 1.12 public boolean retainAll(Collection<?> c) {
713 : dholmes 1.11 return false;
714 :     }
715 :    
716 :     /**
717 : jsr166 1.48 * Always returns <tt>null</tt>.
718 : dholmes 1.11 * A <tt>SynchronousQueue</tt> does not return elements
719 : dl 1.5 * unless actively waited on.
720 : jsr166 1.50 *
721 : dholmes 1.11 * @return <tt>null</tt>
722 : dl 1.5 */
723 :     public E peek() {
724 :     return null;
725 :     }
726 :    
727 :    
728 :     static class EmptyIterator<E> implements Iterator<E> {
729 : dl 1.2 public boolean hasNext() {
730 :     return false;
731 :     }
732 :     public E next() {
733 :     throw new NoSuchElementException();
734 :     }
735 :     public void remove() {
736 : dl 1.17 throw new IllegalStateException();
737 : dl 1.2 }
738 : tim 1.1 }
739 : dl 1.2
740 : dl 1.5 /**
741 : dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
742 : tim 1.13 * <tt>false</tt>.
743 :     *
744 : dholmes 1.11 * @return an empty iterator
745 : dl 1.5 */
746 : dl 1.2 public Iterator<E> iterator() {
747 : dl 1.5 return new EmptyIterator<E>();
748 : tim 1.1 }
749 :    
750 : dl 1.2
751 : dl 1.5 /**
752 : dholmes 1.11 * Returns a zero-length array.
753 :     * @return a zero-length array
754 : dl 1.5 */
755 : dl 1.3 public Object[] toArray() {
756 : dl 1.25 return new Object[0];
757 : tim 1.1 }
758 :    
759 : dholmes 1.11 /**
760 :     * Sets the zeroeth element of the specified array to <tt>null</tt>
761 :     * (if the array has non-zero length) and returns it.
762 : jsr166 1.50 *
763 : dl 1.40 * @param a the array
764 : dholmes 1.11 * @return the specified array
765 : jsr166 1.50 * @throws NullPointerException if the specified array is null
766 : dholmes 1.11 */
767 : dl 1.2 public <T> T[] toArray(T[] a) {
768 :     if (a.length > 0)
769 :     a[0] = null;
770 :     return a;
771 :     }
772 : dl 1.21
773 : jsr166 1.50 /**
774 :     * @throws UnsupportedOperationException {@inheritDoc}
775 :     * @throws ClassCastException {@inheritDoc}
776 :     * @throws NullPointerException {@inheritDoc}
777 :     * @throws IllegalArgumentException {@inheritDoc}
778 :     */
779 : dl 1.21 public int drainTo(Collection<? super E> c) {
780 :     if (c == null)
781 :     throw new NullPointerException();
782 :     if (c == this)
783 :     throw new IllegalArgumentException();
784 :     int n = 0;
785 :     E e;
786 :     while ( (e = poll()) != null) {
787 :     c.add(e);
788 :     ++n;
789 :     }
790 :     return n;
791 :     }
792 :    
793 : jsr166 1.50 /**
794 :     * @throws UnsupportedOperationException {@inheritDoc}
795 :     * @throws ClassCastException {@inheritDoc}
796 :     * @throws NullPointerException {@inheritDoc}
797 :     * @throws IllegalArgumentException {@inheritDoc}
798 :     */
799 : dl 1.21 public int drainTo(Collection<? super E> c, int maxElements) {
800 :     if (c == null)
801 :     throw new NullPointerException();
802 :     if (c == this)
803 :     throw new IllegalArgumentException();
804 :     int n = 0;
805 :     E e;
806 :     while (n < maxElements && (e = poll()) != null) {
807 :     c.add(e);
808 :     ++n;
809 :     }
810 :     return n;
811 :     }
812 : tim 1.1 }

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8