Parent Directory
|
Revision Log
Revision 1.54 - (view) (download)
| 1 : | dl | 1.2 | /* |
| 2 : | * Written by Doug Lea with assistance from members of JCP JSR-166 | ||
| 3 : | dl | 1.29 | * Expert Group and released to the public domain, as explained at |
| 4 : | * http://creativecommons.org/licenses/publicdomain | ||
| 5 : | dl | 1.2 | */ |
| 6 : | |||
| 7 : | tim | 1.1 | package java.util.concurrent; |
| 8 : | dl | 1.8 | import java.util.concurrent.locks.*; |
| 9 : | tim | 1.1 | import java.util.*; |
| 10 : | |||
| 11 : | /** | ||
| 12 : | jsr166 | 1.52 | * A {@linkplain BlockingQueue blocking queue} in which each insert |
| 13 : | * operation must wait for a corresponding remove operation by another | ||
| 14 : | * thread, and vice versa. A synchronous queue does not have any | ||
| 15 : | * internal capacity, not even a capacity of one. You cannot | ||
| 16 : | * <tt>peek</tt> at a synchronous queue because an element is only | ||
| 17 : | * present when you try to remove it; you cannot insert an element | ||
| 18 : | * (using any method) unless another thread is trying to remove it; | ||
| 19 : | * you cannot iterate as there is nothing to iterate. The | ||
| 20 : | * <em>head</em> of the queue is the element that the first queued | ||
| 21 : | * inserting thread is trying to add to the queue; if there is no such | ||
| 22 : | * queued thread then no element is available for removal and | ||
| 23 : | * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other | ||
| 24 : | * <tt>Collection</tt> methods (for example <tt>contains</tt>), a | ||
| 25 : | * <tt>SynchronousQueue</tt> acts as an empty collection. This queue | ||
| 26 : | * does not permit <tt>null</tt> elements. | ||
| 27 : | dl | 1.18 | * |
| 28 : | * <p>Synchronous queues are similar to rendezvous channels used in | ||
| 29 : | * CSP and Ada. They are well suited for handoff designs, in which an | ||
| 30 : | dl | 1.30 | * object running in one thread must sync up with an object running |
| 31 : | dl | 1.18 | * in another thread in order to hand it some information, event, or |
| 32 : | * task. | ||
| 33 : | dl | 1.43 | * |
| 34 : | * <p> This class supports an optional fairness policy for ordering | ||
| 35 : | * waiting producer and consumer threads. By default, this ordering | ||
| 36 : | * is not guaranteed. However, a queue constructed with fairness set | ||
| 37 : | * to <tt>true</tt> grants threads access in FIFO order. Fairness | ||
| 38 : | * generally decreases throughput but reduces variability and avoids | ||
| 39 : | * starvation. | ||
| 40 : | * | ||
| 41 : | dl | 1.46 | * <p>This class and its iterator implement all of the |
| 42 : | * <em>optional</em> methods of the {@link Collection} and {@link | ||
| 43 : | jsr166 | 1.48 | * Iterator} interfaces. |
| 44 : | dl | 1.42 | * |
| 45 : | * <p>This class is a member of the | ||
| 46 : | * <a href="{@docRoot}/../guide/collections/index.html"> | ||
| 47 : | * Java Collections Framework</a>. | ||
| 48 : | * | ||
| 49 : | dl | 1.6 | * @since 1.5 |
| 50 : | * @author Doug Lea | ||
| 51 : | dl | 1.24 | * @param <E> the type of elements held in this collection |
| 52 : | dl | 1.23 | */ |
| 53 : | dl | 1.2 | public class SynchronousQueue<E> extends AbstractQueue<E> |
| 54 : | tim | 1.1 | implements BlockingQueue<E>, java.io.Serializable { |
| 55 : | dl | 1.15 | private static final long serialVersionUID = -3223113410248163686L; |
| 56 : | tim | 1.1 | |
| 57 : | dl | 1.2 | /* |
| 58 : | This implementation divides actions into two cases for puts: | ||
| 59 : | |||
| 60 : | dl | 1.43 | * An arriving producer that does not already have a waiting consumer |
| 61 : | jsr166 | 1.50 | creates a node holding item, and then waits for a consumer to take it. |
| 62 : | dl | 1.43 | * An arriving producer that does already have a waiting consumer fills |
| 63 : | jsr166 | 1.50 | the slot node created by the consumer, and notifies it to continue. |
| 64 : | dl | 1.2 | |
| 65 : | And symmetrically, two for takes: | ||
| 66 : | |||
| 67 : | dl | 1.43 | * An arriving consumer that does not already have a waiting producer |
| 68 : | jsr166 | 1.50 | creates an empty slot node, and then waits for a producer to fill it. |
| 69 : | dl | 1.43 | * An arriving consumer that does already have a waiting producer takes |
| 70 : | jsr166 | 1.50 | item from the node created by the producer, and notifies it to continue. |
| 71 : | tim | 1.10 | |
| 72 : | dl | 1.2 | When a put or take waiting for the actions of its counterpart |
| 73 : | aborts due to interruption or timeout, it marks the node | ||
| 74 : | it created as "CANCELLED", which causes its counterpart to retry | ||
| 75 : | the entire put or take sequence. | ||
| 76 : | dl | 1.43 | |
| 77 : | This requires keeping two simple queues, waitingProducers and | ||
| 78 : | waitingConsumers. Each of these can be FIFO (preserves fairness) | ||
| 79 : | or LIFO (improves throughput). | ||
| 80 : | dl | 1.2 | */ |
| 81 : | |||
| 82 : | dl | 1.43 | /** Lock protecting both wait queues */ |
| 83 : | private final ReentrantLock qlock; | ||
| 84 : | /** Queue holding waiting puts */ | ||
| 85 : | private final WaitQueue waitingProducers; | ||
| 86 : | /** Queue holding waiting takes */ | ||
| 87 : | private final WaitQueue waitingConsumers; | ||
| 88 : | dl | 1.2 | |
| 89 : | dl | 1.43 | /** |
| 90 : | * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. | ||
| 91 : | */ | ||
| 92 : | public SynchronousQueue() { | ||
| 93 : | dl | 1.44 | this(false); |
| 94 : | dl | 1.43 | } |
| 95 : | |||
| 96 : | /** | ||
| 97 : | * Creates a <tt>SynchronousQueue</tt> with specified fairness policy. | ||
| 98 : | dl | 1.44 | * @param fair if true, threads contend in FIFO order for access; |
| 99 : | * otherwise the order is unspecified. | ||
| 100 : | dl | 1.43 | */ |
| 101 : | public SynchronousQueue(boolean fair) { | ||
| 102 : | if (fair) { | ||
| 103 : | qlock = new ReentrantLock(true); | ||
| 104 : | waitingProducers = new FifoWaitQueue(); | ||
| 105 : | waitingConsumers = new FifoWaitQueue(); | ||
| 106 : | } | ||
| 107 : | else { | ||
| 108 : | qlock = new ReentrantLock(); | ||
| 109 : | waitingProducers = new LifoWaitQueue(); | ||
| 110 : | waitingConsumers = new LifoWaitQueue(); | ||
| 111 : | } | ||
| 112 : | } | ||
| 113 : | |||
| 114 : | /** | ||
| 115 : | dl | 1.45 | * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. |
| 116 : | dl | 1.43 | * These queues have all transient fields, but are serializable |
| 117 : | dl | 1.44 | * in order to recover fairness settings when deserialized. |
| 118 : | dl | 1.43 | */ |
| 119 : | static abstract class WaitQueue implements java.io.Serializable { | ||
| 120 : | jsr166 | 1.48 | /** Creates, adds, and returns node for x. */ |
| 121 : | dl | 1.43 | abstract Node enq(Object x); |
| 122 : | jsr166 | 1.48 | /** Removes and returns node, or null if empty. */ |
| 123 : | dl | 1.43 | abstract Node deq(); |
| 124 : | jsr166 | 1.48 | /** Removes a cancelled node to avoid garbage retention. */ |
| 125 : | dl | 1.47 | abstract void unlink(Node node); |
| 126 : | jsr166 | 1.48 | /** Returns true if a cancelled node might be on queue. */ |
| 127 : | dl | 1.47 | abstract boolean shouldUnlink(Node node); |
| 128 : | dl | 1.43 | } |
| 129 : | |||
| 130 : | /** | ||
| 131 : | * FIFO queue to hold waiting puts/takes. | ||
| 132 : | */ | ||
| 133 : | static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable { | ||
| 134 : | private static final long serialVersionUID = -3623113410248163686L; | ||
| 135 : | private transient Node head; | ||
| 136 : | private transient Node last; | ||
| 137 : | |||
| 138 : | Node enq(Object x) { | ||
| 139 : | Node p = new Node(x); | ||
| 140 : | if (last == null) | ||
| 141 : | last = head = p; | ||
| 142 : | else | ||
| 143 : | last = last.next = p; | ||
| 144 : | return p; | ||
| 145 : | } | ||
| 146 : | |||
| 147 : | Node deq() { | ||
| 148 : | Node p = head; | ||
| 149 : | if (p != null) { | ||
| 150 : | if ((head = p.next) == null) | ||
| 151 : | last = null; | ||
| 152 : | p.next = null; | ||
| 153 : | } | ||
| 154 : | return p; | ||
| 155 : | } | ||
| 156 : | dl | 1.47 | |
| 157 : | boolean shouldUnlink(Node node) { | ||
| 158 : | return (node == last || node.next != null); | ||
| 159 : | } | ||
| 160 : | |||
| 161 : | void unlink(Node node) { | ||
| 162 : | Node p = head; | ||
| 163 : | Node trail = null; | ||
| 164 : | while (p != null) { | ||
| 165 : | if (p == node) { | ||
| 166 : | Node next = p.next; | ||
| 167 : | jsr166 | 1.48 | if (trail == null) |
| 168 : | dl | 1.47 | head = next; |
| 169 : | else | ||
| 170 : | trail.next = next; | ||
| 171 : | if (last == node) | ||
| 172 : | last = trail; | ||
| 173 : | break; | ||
| 174 : | } | ||
| 175 : | trail = p; | ||
| 176 : | p = p.next; | ||
| 177 : | } | ||
| 178 : | } | ||
| 179 : | dl | 1.43 | } |
| 180 : | |||
| 181 : | /** | ||
| 182 : | * LIFO queue to hold waiting puts/takes. | ||
| 183 : | dl | 1.2 | */ |
| 184 : | dl | 1.43 | static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable { |
| 185 : | private static final long serialVersionUID = -3633113410248163686L; | ||
| 186 : | private transient Node head; | ||
| 187 : | dl | 1.2 | |
| 188 : | dl | 1.43 | Node enq(Object x) { |
| 189 : | return head = new Node(x, head); | ||
| 190 : | } | ||
| 191 : | |||
| 192 : | Node deq() { | ||
| 193 : | Node p = head; | ||
| 194 : | dl | 1.44 | if (p != null) { |
| 195 : | dl | 1.43 | head = p.next; |
| 196 : | dl | 1.44 | p.next = null; |
| 197 : | } | ||
| 198 : | dl | 1.43 | return p; |
| 199 : | } | ||
| 200 : | dl | 1.47 | |
| 201 : | boolean shouldUnlink(Node node) { | ||
| 202 : | // Return false if already dequeued or is bottom node (in which | ||
| 203 : | // case we might retain at most one garbage node) | ||
| 204 : | return (node == head || node.next != null); | ||
| 205 : | } | ||
| 206 : | |||
| 207 : | void unlink(Node node) { | ||
| 208 : | Node p = head; | ||
| 209 : | Node trail = null; | ||
| 210 : | while (p != null) { | ||
| 211 : | if (p == node) { | ||
| 212 : | Node next = p.next; | ||
| 213 : | jsr166 | 1.48 | if (trail == null) |
| 214 : | dl | 1.47 | head = next; |
| 215 : | else | ||
| 216 : | trail.next = next; | ||
| 217 : | break; | ||
| 218 : | } | ||
| 219 : | trail = p; | ||
| 220 : | p = p.next; | ||
| 221 : | } | ||
| 222 : | } | ||
| 223 : | } | ||
| 224 : | |||
| 225 : | jsr166 | 1.50 | /** |
| 226 : | * Unlinks the given node from consumer queue. Called by cancelled | ||
| 227 : | dl | 1.47 | * (timeout, interrupt) waiters to avoid garbage retention in the |
| 228 : | jsr166 | 1.48 | * absence of producers. |
| 229 : | dl | 1.47 | */ |
| 230 : | private void unlinkCancelledConsumer(Node node) { | ||
| 231 : | // Use a form of double-check to avoid unnecessary locking and | ||
| 232 : | // traversal. The first check outside lock might | ||
| 233 : | // conservatively report true. | ||
| 234 : | if (waitingConsumers.shouldUnlink(node)) { | ||
| 235 : | qlock.lock(); | ||
| 236 : | try { | ||
| 237 : | jsr166 | 1.48 | if (waitingConsumers.shouldUnlink(node)) |
| 238 : | dl | 1.47 | waitingConsumers.unlink(node); |
| 239 : | } finally { | ||
| 240 : | qlock.unlock(); | ||
| 241 : | } | ||
| 242 : | } | ||
| 243 : | dl | 1.43 | } |
| 244 : | dl | 1.2 | |
| 245 : | jsr166 | 1.50 | /** |
| 246 : | * Unlinks the given node from producer queue. Symmetric | ||
| 247 : | dl | 1.47 | * to unlinkCancelledConsumer. |
| 248 : | */ | ||
| 249 : | private void unlinkCancelledProducer(Node node) { | ||
| 250 : | if (waitingProducers.shouldUnlink(node)) { | ||
| 251 : | qlock.lock(); | ||
| 252 : | try { | ||
| 253 : | jsr166 | 1.48 | if (waitingProducers.shouldUnlink(node)) |
| 254 : | dl | 1.47 | waitingProducers.unlink(node); |
| 255 : | } finally { | ||
| 256 : | qlock.unlock(); | ||
| 257 : | } | ||
| 258 : | } | ||
| 259 : | } | ||
| 260 : | jsr166 | 1.48 | |
| 261 : | dl | 1.2 | /** |
| 262 : | * Nodes each maintain an item and handle waits and signals for | ||
| 263 : | dl | 1.31 | * getting and setting it. The class extends |
| 264 : | * AbstractQueuedSynchronizer to manage blocking, using AQS state | ||
| 265 : | * 0 for waiting, 1 for ack, -1 for cancelled. | ||
| 266 : | dl | 1.2 | */ |
| 267 : | dl | 1.41 | static final class Node extends AbstractQueuedSynchronizer { |
| 268 : | dl | 1.53 | private static final long serialVersionUID = -2631493897867746127L; |
| 269 : | |||
| 270 : | dl | 1.35 | /** Synchronization state value representing that node acked */ |
| 271 : | private static final int ACK = 1; | ||
| 272 : | /** Synchronization state value representing that node cancelled */ | ||
| 273 : | private static final int CANCEL = -1; | ||
| 274 : | |||
| 275 : | dl | 1.6 | /** The item being transferred */ |
| 276 : | dl | 1.2 | Object item; |
| 277 : | dl | 1.6 | /** Next node in wait queue */ |
| 278 : | dl | 1.2 | Node next; |
| 279 : | dl | 1.35 | |
| 280 : | dl | 1.44 | /** Creates a node with initial item */ |
| 281 : | dl | 1.31 | Node(Object x) { item = x; } |
| 282 : | |||
| 283 : | dl | 1.44 | /** Creates a node with initial item and next */ |
| 284 : | dl | 1.43 | Node(Object x, Node n) { item = x; next = n; } |
| 285 : | |||
| 286 : | dl | 1.31 | /** |
| 287 : | * Implements AQS base acquire to succeed if not in WAITING state | ||
| 288 : | */ | ||
| 289 : | dl | 1.39 | protected boolean tryAcquire(int ignore) { |
| 290 : | dl | 1.35 | return getState() != 0; |
| 291 : | dl | 1.31 | } |
| 292 : | |||
| 293 : | /** | ||
| 294 : | dl | 1.34 | * Implements AQS base release to signal if state changed |
| 295 : | dl | 1.31 | */ |
| 296 : | dl | 1.39 | protected boolean tryRelease(int newState) { |
| 297 : | dl | 1.35 | return compareAndSetState(0, newState); |
| 298 : | dl | 1.31 | } |
| 299 : | |||
| 300 : | /** | ||
| 301 : | dl | 1.44 | * Takes item and nulls out field (for sake of GC) |
| 302 : | dl | 1.31 | */ |
| 303 : | dl | 1.35 | private Object extract() { |
| 304 : | Object x = item; | ||
| 305 : | item = null; | ||
| 306 : | return x; | ||
| 307 : | dl | 1.31 | } |
| 308 : | |||
| 309 : | /** | ||
| 310 : | dl | 1.44 | * Tries to cancel on interrupt; if so rethrowing, |
| 311 : | dl | 1.35 | * else setting interrupt state |
| 312 : | dl | 1.31 | */ |
| 313 : | jsr166 | 1.48 | private void checkCancellationOnInterrupt(InterruptedException ie) |
| 314 : | dl | 1.35 | throws InterruptedException { |
| 315 : | jsr166 | 1.48 | if (release(CANCEL)) |
| 316 : | dl | 1.35 | throw ie; |
| 317 : | Thread.currentThread().interrupt(); | ||
| 318 : | dl | 1.31 | } |
| 319 : | dl | 1.2 | |
| 320 : | /** | ||
| 321 : | dl | 1.44 | * Fills in the slot created by the consumer and signal consumer to |
| 322 : | dl | 1.2 | * continue. |
| 323 : | */ | ||
| 324 : | dl | 1.31 | boolean setItem(Object x) { |
| 325 : | dl | 1.35 | item = x; // can place in slot even if cancelled |
| 326 : | dl | 1.39 | return release(ACK); |
| 327 : | dl | 1.2 | } |
| 328 : | |||
| 329 : | /** | ||
| 330 : | dl | 1.44 | * Removes item from slot created by producer and signal producer |
| 331 : | dl | 1.2 | * to continue. |
| 332 : | */ | ||
| 333 : | dl | 1.31 | Object getItem() { |
| 334 : | dl | 1.39 | return (release(ACK))? extract() : null; |
| 335 : | dl | 1.35 | } |
| 336 : | |||
| 337 : | /** | ||
| 338 : | dl | 1.44 | * Waits for a consumer to take item placed by producer. |
| 339 : | dl | 1.35 | */ |
| 340 : | void waitForTake() throws InterruptedException { | ||
| 341 : | try { | ||
| 342 : | dl | 1.39 | acquireInterruptibly(0); |
| 343 : | dl | 1.35 | } catch (InterruptedException ie) { |
| 344 : | checkCancellationOnInterrupt(ie); | ||
| 345 : | } | ||
| 346 : | } | ||
| 347 : | |||
| 348 : | /** | ||
| 349 : | dl | 1.44 | * Waits for a producer to put item placed by consumer. |
| 350 : | dl | 1.35 | */ |
| 351 : | Object waitForPut() throws InterruptedException { | ||
| 352 : | try { | ||
| 353 : | dl | 1.39 | acquireInterruptibly(0); |
| 354 : | dl | 1.35 | } catch (InterruptedException ie) { |
| 355 : | checkCancellationOnInterrupt(ie); | ||
| 356 : | } | ||
| 357 : | return extract(); | ||
| 358 : | dl | 1.31 | } |
| 359 : | |||
| 360 : | /** | ||
| 361 : | dl | 1.44 | * Waits for a consumer to take item placed by producer or time out. |
| 362 : | dl | 1.31 | */ |
| 363 : | dl | 1.35 | boolean waitForTake(long nanos) throws InterruptedException { |
| 364 : | dl | 1.2 | try { |
| 365 : | dl | 1.39 | if (!tryAcquireNanos(0, nanos) && |
| 366 : | release(CANCEL)) | ||
| 367 : | dl | 1.33 | return false; |
| 368 : | dl | 1.31 | } catch (InterruptedException ie) { |
| 369 : | dl | 1.35 | checkCancellationOnInterrupt(ie); |
| 370 : | dl | 1.2 | } |
| 371 : | dl | 1.35 | return true; |
| 372 : | dl | 1.2 | } |
| 373 : | |||
| 374 : | /** | ||
| 375 : | dl | 1.44 | * Waits for a producer to put item placed by consumer, or time out. |
| 376 : | dl | 1.31 | */ |
| 377 : | dl | 1.35 | Object waitForPut(long nanos) throws InterruptedException { |
| 378 : | dl | 1.31 | try { |
| 379 : | dl | 1.39 | if (!tryAcquireNanos(0, nanos) && |
| 380 : | release(CANCEL)) | ||
| 381 : | dl | 1.33 | return null; |
| 382 : | dl | 1.31 | } catch (InterruptedException ie) { |
| 383 : | dl | 1.35 | checkCancellationOnInterrupt(ie); |
| 384 : | dl | 1.2 | } |
| 385 : | dl | 1.35 | return extract(); |
| 386 : | dl | 1.2 | } |
| 387 : | } | ||
| 388 : | |||
| 389 : | /** | ||
| 390 : | dl | 1.35 | * Adds the specified element to this queue, waiting if necessary for |
| 391 : | * another thread to receive it. | ||
| 392 : | jsr166 | 1.50 | * |
| 393 : | * @throws InterruptedException {@inheritDoc} | ||
| 394 : | * @throws NullPointerException {@inheritDoc} | ||
| 395 : | tim | 1.10 | */ |
| 396 : | jsr166 | 1.49 | public void put(E e) throws InterruptedException { |
| 397 : | if (e == null) throw new NullPointerException(); | ||
| 398 : | dl | 1.35 | final ReentrantLock qlock = this.qlock; |
| 399 : | |||
| 400 : | dl | 1.2 | for (;;) { |
| 401 : | Node node; | ||
| 402 : | boolean mustWait; | ||
| 403 : | dl | 1.43 | if (Thread.interrupted()) throw new InterruptedException(); |
| 404 : | qlock.lock(); | ||
| 405 : | dl | 1.2 | try { |
| 406 : | dl | 1.43 | node = waitingConsumers.deq(); |
| 407 : | dl | 1.2 | if ( (mustWait = (node == null)) ) |
| 408 : | jsr166 | 1.49 | node = waitingProducers.enq(e); |
| 409 : | tim | 1.14 | } finally { |
| 410 : | dl | 1.2 | qlock.unlock(); |
| 411 : | } | ||
| 412 : | |||
| 413 : | dl | 1.31 | if (mustWait) { |
| 414 : | dl | 1.47 | try { |
| 415 : | node.waitForTake(); | ||
| 416 : | return; | ||
| 417 : | } catch (InterruptedException ex) { | ||
| 418 : | unlinkCancelledProducer(node); | ||
| 419 : | throw ex; | ||
| 420 : | } | ||
| 421 : | dl | 1.2 | } |
| 422 : | |||
| 423 : | jsr166 | 1.49 | else if (node.setItem(e)) |
| 424 : | dl | 1.35 | return; |
| 425 : | dl | 1.2 | |
| 426 : | dl | 1.43 | // else consumer cancelled, so retry |
| 427 : | dl | 1.35 | } |
| 428 : | tim | 1.1 | } |
| 429 : | |||
| 430 : | dholmes | 1.11 | /** |
| 431 : | dl | 1.20 | * Inserts the specified element into this queue, waiting if necessary |
| 432 : | dl | 1.18 | * up to the specified wait time for another thread to receive it. |
| 433 : | jsr166 | 1.50 | * |
| 434 : | * @return <tt>true</tt> if successful, or <tt>false</tt> if the | ||
| 435 : | * specified waiting time elapses before a consumer appears. | ||
| 436 : | * @throws InterruptedException {@inheritDoc} | ||
| 437 : | * @throws NullPointerException {@inheritDoc} | ||
| 438 : | dholmes | 1.11 | */ |
| 439 : | jsr166 | 1.49 | public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { |
| 440 : | if (e == null) throw new NullPointerException(); | ||
| 441 : | dl | 1.35 | long nanos = unit.toNanos(timeout); |
| 442 : | final ReentrantLock qlock = this.qlock; | ||
| 443 : | for (;;) { | ||
| 444 : | Node node; | ||
| 445 : | boolean mustWait; | ||
| 446 : | dl | 1.43 | if (Thread.interrupted()) throw new InterruptedException(); |
| 447 : | qlock.lock(); | ||
| 448 : | dl | 1.35 | try { |
| 449 : | dl | 1.43 | node = waitingConsumers.deq(); |
| 450 : | dl | 1.35 | if ( (mustWait = (node == null)) ) |
| 451 : | jsr166 | 1.49 | node = waitingProducers.enq(e); |
| 452 : | dl | 1.35 | } finally { |
| 453 : | qlock.unlock(); | ||
| 454 : | } | ||
| 455 : | |||
| 456 : | dl | 1.47 | if (mustWait) { |
| 457 : | try { | ||
| 458 : | boolean x = node.waitForTake(nanos); | ||
| 459 : | jsr166 | 1.48 | if (!x) |
| 460 : | dl | 1.47 | unlinkCancelledProducer(node); |
| 461 : | return x; | ||
| 462 : | } catch (InterruptedException ex) { | ||
| 463 : | unlinkCancelledProducer(node); | ||
| 464 : | throw ex; | ||
| 465 : | } | ||
| 466 : | } | ||
| 467 : | dl | 1.35 | |
| 468 : | jsr166 | 1.49 | else if (node.setItem(e)) |
| 469 : | dl | 1.35 | return true; |
| 470 : | |||
| 471 : | dl | 1.43 | // else consumer cancelled, so retry |
| 472 : | dl | 1.35 | } |
| 473 : | tim | 1.1 | } |
| 474 : | |||
| 475 : | dholmes | 1.11 | /** |
| 476 : | * Retrieves and removes the head of this queue, waiting if necessary | ||
| 477 : | * for another thread to insert it. | ||
| 478 : | jsr166 | 1.50 | * |
| 479 : | dholmes | 1.11 | * @return the head of this queue |
| 480 : | jsr166 | 1.50 | * @throws InterruptedException {@inheritDoc} |
| 481 : | dholmes | 1.11 | */ |
| 482 : | dl | 1.2 | public E take() throws InterruptedException { |
| 483 : | dl | 1.35 | final ReentrantLock qlock = this.qlock; |
| 484 : | for (;;) { | ||
| 485 : | Node node; | ||
| 486 : | boolean mustWait; | ||
| 487 : | |||
| 488 : | dl | 1.43 | if (Thread.interrupted()) throw new InterruptedException(); |
| 489 : | qlock.lock(); | ||
| 490 : | dl | 1.35 | try { |
| 491 : | dl | 1.43 | node = waitingProducers.deq(); |
| 492 : | dl | 1.35 | if ( (mustWait = (node == null)) ) |
| 493 : | dl | 1.43 | node = waitingConsumers.enq(null); |
| 494 : | dl | 1.35 | } finally { |
| 495 : | qlock.unlock(); | ||
| 496 : | } | ||
| 497 : | |||
| 498 : | dl | 1.36 | if (mustWait) { |
| 499 : | dl | 1.47 | try { |
| 500 : | Object x = node.waitForPut(); | ||
| 501 : | return (E)x; | ||
| 502 : | } catch (InterruptedException ex) { | ||
| 503 : | unlinkCancelledConsumer(node); | ||
| 504 : | throw ex; | ||
| 505 : | } | ||
| 506 : | dl | 1.36 | } |
| 507 : | dl | 1.35 | else { |
| 508 : | Object x = node.getItem(); | ||
| 509 : | if (x != null) | ||
| 510 : | return (E)x; | ||
| 511 : | // else cancelled, so retry | ||
| 512 : | } | ||
| 513 : | } | ||
| 514 : | tim | 1.1 | } |
| 515 : | dl | 1.2 | |
| 516 : | dholmes | 1.11 | /** |
| 517 : | * Retrieves and removes the head of this queue, waiting | ||
| 518 : | * if necessary up to the specified wait time, for another thread | ||
| 519 : | * to insert it. | ||
| 520 : | jsr166 | 1.50 | * |
| 521 : | dl | 1.18 | * @return the head of this queue, or <tt>null</tt> if the |
| 522 : | jsr166 | 1.50 | * specified waiting time elapses before an element is present. |
| 523 : | * @throws InterruptedException {@inheritDoc} | ||
| 524 : | dholmes | 1.11 | */ |
| 525 : | dl | 1.2 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| 526 : | dl | 1.35 | long nanos = unit.toNanos(timeout); |
| 527 : | final ReentrantLock qlock = this.qlock; | ||
| 528 : | |||
| 529 : | for (;;) { | ||
| 530 : | Node node; | ||
| 531 : | boolean mustWait; | ||
| 532 : | |||
| 533 : | dl | 1.43 | if (Thread.interrupted()) throw new InterruptedException(); |
| 534 : | qlock.lock(); | ||
| 535 : | dl | 1.35 | try { |
| 536 : | dl | 1.43 | node = waitingProducers.deq(); |
| 537 : | dl | 1.35 | if ( (mustWait = (node == null)) ) |
| 538 : | dl | 1.43 | node = waitingConsumers.enq(null); |
| 539 : | dl | 1.35 | } finally { |
| 540 : | qlock.unlock(); | ||
| 541 : | } | ||
| 542 : | |||
| 543 : | dl | 1.36 | if (mustWait) { |
| 544 : | dl | 1.47 | try { |
| 545 : | Object x = node.waitForPut(nanos); | ||
| 546 : | jsr166 | 1.48 | if (x == null) |
| 547 : | dl | 1.47 | unlinkCancelledConsumer(node); |
| 548 : | return (E)x; | ||
| 549 : | } catch (InterruptedException ex) { | ||
| 550 : | unlinkCancelledConsumer(node); | ||
| 551 : | throw ex; | ||
| 552 : | } | ||
| 553 : | dl | 1.36 | } |
| 554 : | dl | 1.35 | else { |
| 555 : | Object x = node.getItem(); | ||
| 556 : | if (x != null) | ||
| 557 : | return (E)x; | ||
| 558 : | // else cancelled, so retry | ||
| 559 : | } | ||
| 560 : | } | ||
| 561 : | tim | 1.1 | } |
| 562 : | dl | 1.2 | |
| 563 : | // Untimed nonblocking versions | ||
| 564 : | |||
| 565 : | jsr166 | 1.50 | /** |
| 566 : | * Inserts the specified element into this queue, if another thread is | ||
| 567 : | * waiting to receive it. | ||
| 568 : | * | ||
| 569 : | * @param e the element to add | ||
| 570 : | jsr166 | 1.51 | * @return <tt>true</tt> if the element was added to this queue, else |
| 571 : | * <tt>false</tt> | ||
| 572 : | jsr166 | 1.50 | * @throws NullPointerException if the specified element is null |
| 573 : | */ | ||
| 574 : | jsr166 | 1.49 | public boolean offer(E e) { |
| 575 : | if (e == null) throw new NullPointerException(); | ||
| 576 : | dl | 1.27 | final ReentrantLock qlock = this.qlock; |
| 577 : | tim | 1.10 | |
| 578 : | for (;;) { | ||
| 579 : | tim | 1.26 | Node node; |
| 580 : | dl | 1.2 | qlock.lock(); |
| 581 : | try { | ||
| 582 : | dl | 1.43 | node = waitingConsumers.deq(); |
| 583 : | tim | 1.14 | } finally { |
| 584 : | dl | 1.2 | qlock.unlock(); |
| 585 : | } | ||
| 586 : | if (node == null) | ||
| 587 : | return false; | ||
| 588 : | tim | 1.10 | |
| 589 : | jsr166 | 1.49 | else if (node.setItem(e)) |
| 590 : | dl | 1.2 | return true; |
| 591 : | // else retry | ||
| 592 : | } | ||
| 593 : | tim | 1.1 | } |
| 594 : | dl | 1.2 | |
| 595 : | dl | 1.18 | /** |
| 596 : | * Retrieves and removes the head of this queue, if another thread | ||
| 597 : | * is currently making an element available. | ||
| 598 : | * | ||
| 599 : | * @return the head of this queue, or <tt>null</tt> if no | ||
| 600 : | * element is available. | ||
| 601 : | */ | ||
| 602 : | dl | 1.2 | public E poll() { |
| 603 : | dl | 1.27 | final ReentrantLock qlock = this.qlock; |
| 604 : | dl | 1.2 | for (;;) { |
| 605 : | Node node; | ||
| 606 : | qlock.lock(); | ||
| 607 : | try { | ||
| 608 : | dl | 1.43 | node = waitingProducers.deq(); |
| 609 : | tim | 1.14 | } finally { |
| 610 : | dl | 1.2 | qlock.unlock(); |
| 611 : | } | ||
| 612 : | if (node == null) | ||
| 613 : | return null; | ||
| 614 : | |||
| 615 : | else { | ||
| 616 : | dl | 1.31 | Object x = node.getItem(); |
| 617 : | dl | 1.2 | if (x != null) |
| 618 : | return (E)x; | ||
| 619 : | // else retry | ||
| 620 : | } | ||
| 621 : | } | ||
| 622 : | tim | 1.1 | } |
| 623 : | dl | 1.2 | |
| 624 : | dl | 1.5 | /** |
| 625 : | jsr166 | 1.48 | * Always returns <tt>true</tt>. |
| 626 : | dholmes | 1.11 | * A <tt>SynchronousQueue</tt> has no internal capacity. |
| 627 : | jsr166 | 1.50 | * |
| 628 : | dholmes | 1.11 | * @return <tt>true</tt> |
| 629 : | dl | 1.5 | */ |
| 630 : | public boolean isEmpty() { | ||
| 631 : | return true; | ||
| 632 : | } | ||
| 633 : | |||
| 634 : | /** | ||
| 635 : | dholmes | 1.11 | * Always returns zero. |
| 636 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 637 : | jsr166 | 1.50 | * |
| 638 : | * @return zero | ||
| 639 : | dl | 1.5 | */ |
| 640 : | public int size() { | ||
| 641 : | return 0; | ||
| 642 : | tim | 1.1 | } |
| 643 : | dl | 1.2 | |
| 644 : | dl | 1.5 | /** |
| 645 : | dholmes | 1.11 | * Always returns zero. |
| 646 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 647 : | jsr166 | 1.50 | * |
| 648 : | * @return zero | ||
| 649 : | dl | 1.5 | */ |
| 650 : | public int remainingCapacity() { | ||
| 651 : | return 0; | ||
| 652 : | } | ||
| 653 : | |||
| 654 : | /** | ||
| 655 : | dholmes | 1.11 | * Does nothing. |
| 656 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 657 : | */ | ||
| 658 : | public void clear() {} | ||
| 659 : | |||
| 660 : | /** | ||
| 661 : | * Always returns <tt>false</tt>. | ||
| 662 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 663 : | jsr166 | 1.50 | * |
| 664 : | jsr166 | 1.54 | * @param o object to be checked for containment in this queue |
| 665 : | dholmes | 1.11 | * @return <tt>false</tt> |
| 666 : | */ | ||
| 667 : | public boolean contains(Object o) { | ||
| 668 : | return false; | ||
| 669 : | } | ||
| 670 : | |||
| 671 : | /** | ||
| 672 : | dl | 1.18 | * Always returns <tt>false</tt>. |
| 673 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 674 : | * | ||
| 675 : | * @param o the element to remove | ||
| 676 : | * @return <tt>false</tt> | ||
| 677 : | */ | ||
| 678 : | public boolean remove(Object o) { | ||
| 679 : | return false; | ||
| 680 : | } | ||
| 681 : | |||
| 682 : | /** | ||
| 683 : | jsr166 | 1.50 | * Returns <tt>false</tt> unless the given collection is empty. |
| 684 : | dholmes | 1.11 | * A <tt>SynchronousQueue</tt> has no internal capacity. |
| 685 : | jsr166 | 1.50 | * |
| 686 : | dl | 1.18 | * @param c the collection |
| 687 : | jsr166 | 1.50 | * @return <tt>false</tt> unless the given collection is empty |
| 688 : | * @throws NullPointerException if the specified collection is null | ||
| 689 : | dholmes | 1.11 | */ |
| 690 : | dl | 1.12 | public boolean containsAll(Collection<?> c) { |
| 691 : | dl | 1.16 | return c.isEmpty(); |
| 692 : | dholmes | 1.11 | } |
| 693 : | |||
| 694 : | /** | ||
| 695 : | * Always returns <tt>false</tt>. | ||
| 696 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 697 : | jsr166 | 1.50 | * |
| 698 : | dl | 1.18 | * @param c the collection |
| 699 : | dholmes | 1.11 | * @return <tt>false</tt> |
| 700 : | */ | ||
| 701 : | dl | 1.12 | public boolean removeAll(Collection<?> c) { |
| 702 : | dholmes | 1.11 | return false; |
| 703 : | } | ||
| 704 : | |||
| 705 : | /** | ||
| 706 : | * Always returns <tt>false</tt>. | ||
| 707 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 708 : | jsr166 | 1.50 | * |
| 709 : | dl | 1.18 | * @param c the collection |
| 710 : | dholmes | 1.11 | * @return <tt>false</tt> |
| 711 : | */ | ||
| 712 : | dl | 1.12 | public boolean retainAll(Collection<?> c) { |
| 713 : | dholmes | 1.11 | return false; |
| 714 : | } | ||
| 715 : | |||
| 716 : | /** | ||
| 717 : | jsr166 | 1.48 | * Always returns <tt>null</tt>. |
| 718 : | dholmes | 1.11 | * A <tt>SynchronousQueue</tt> does not return elements |
| 719 : | dl | 1.5 | * unless actively waited on. |
| 720 : | jsr166 | 1.50 | * |
| 721 : | dholmes | 1.11 | * @return <tt>null</tt> |
| 722 : | dl | 1.5 | */ |
| 723 : | public E peek() { | ||
| 724 : | return null; | ||
| 725 : | } | ||
| 726 : | |||
| 727 : | |||
| 728 : | static class EmptyIterator<E> implements Iterator<E> { | ||
| 729 : | dl | 1.2 | public boolean hasNext() { |
| 730 : | return false; | ||
| 731 : | } | ||
| 732 : | public E next() { | ||
| 733 : | throw new NoSuchElementException(); | ||
| 734 : | } | ||
| 735 : | public void remove() { | ||
| 736 : | dl | 1.17 | throw new IllegalStateException(); |
| 737 : | dl | 1.2 | } |
| 738 : | tim | 1.1 | } |
| 739 : | dl | 1.2 | |
| 740 : | dl | 1.5 | /** |
| 741 : | dl | 1.18 | * Returns an empty iterator in which <tt>hasNext</tt> always returns |
| 742 : | tim | 1.13 | * <tt>false</tt>. |
| 743 : | * | ||
| 744 : | dholmes | 1.11 | * @return an empty iterator |
| 745 : | dl | 1.5 | */ |
| 746 : | dl | 1.2 | public Iterator<E> iterator() { |
| 747 : | dl | 1.5 | return new EmptyIterator<E>(); |
| 748 : | tim | 1.1 | } |
| 749 : | |||
| 750 : | dl | 1.2 | |
| 751 : | dl | 1.5 | /** |
| 752 : | dholmes | 1.11 | * Returns a zero-length array. |
| 753 : | * @return a zero-length array | ||
| 754 : | dl | 1.5 | */ |
| 755 : | dl | 1.3 | public Object[] toArray() { |
| 756 : | dl | 1.25 | return new Object[0]; |
| 757 : | tim | 1.1 | } |
| 758 : | |||
| 759 : | dholmes | 1.11 | /** |
| 760 : | * Sets the zeroeth element of the specified array to <tt>null</tt> | ||
| 761 : | * (if the array has non-zero length) and returns it. | ||
| 762 : | jsr166 | 1.50 | * |
| 763 : | dl | 1.40 | * @param a the array |
| 764 : | dholmes | 1.11 | * @return the specified array |
| 765 : | jsr166 | 1.50 | * @throws NullPointerException if the specified array is null |
| 766 : | dholmes | 1.11 | */ |
| 767 : | dl | 1.2 | public <T> T[] toArray(T[] a) { |
| 768 : | if (a.length > 0) | ||
| 769 : | a[0] = null; | ||
| 770 : | return a; | ||
| 771 : | } | ||
| 772 : | dl | 1.21 | |
| 773 : | jsr166 | 1.50 | /** |
| 774 : | * @throws UnsupportedOperationException {@inheritDoc} | ||
| 775 : | * @throws ClassCastException {@inheritDoc} | ||
| 776 : | * @throws NullPointerException {@inheritDoc} | ||
| 777 : | * @throws IllegalArgumentException {@inheritDoc} | ||
| 778 : | */ | ||
| 779 : | dl | 1.21 | public int drainTo(Collection<? super E> c) { |
| 780 : | if (c == null) | ||
| 781 : | throw new NullPointerException(); | ||
| 782 : | if (c == this) | ||
| 783 : | throw new IllegalArgumentException(); | ||
| 784 : | int n = 0; | ||
| 785 : | E e; | ||
| 786 : | while ( (e = poll()) != null) { | ||
| 787 : | c.add(e); | ||
| 788 : | ++n; | ||
| 789 : | } | ||
| 790 : | return n; | ||
| 791 : | } | ||
| 792 : | |||
| 793 : | jsr166 | 1.50 | /** |
| 794 : | * @throws UnsupportedOperationException {@inheritDoc} | ||
| 795 : | * @throws ClassCastException {@inheritDoc} | ||
| 796 : | * @throws NullPointerException {@inheritDoc} | ||
| 797 : | * @throws IllegalArgumentException {@inheritDoc} | ||
| 798 : | */ | ||
| 799 : | dl | 1.21 | public int drainTo(Collection<? super E> c, int maxElements) { |
| 800 : | if (c == null) | ||
| 801 : | throw new NullPointerException(); | ||
| 802 : | if (c == this) | ||
| 803 : | throw new IllegalArgumentException(); | ||
| 804 : | int n = 0; | ||
| 805 : | E e; | ||
| 806 : | while (n < maxElements && (e = poll()) != null) { | ||
| 807 : | c.add(e); | ||
| 808 : | ++n; | ||
| 809 : | } | ||
| 810 : | return n; | ||
| 811 : | } | ||
| 812 : | tim | 1.1 | } |
| Doug Lea | ViewVC Help |
| Powered by ViewVC 1.0.8 |