Parent Directory
|
Revision Log
Revision 1.66 - (view) (download)
| 1 : | dl | 1.2 | /* |
| 2 : | dl | 1.55 | * Written by Doug Lea, Bill Scherer, and Michael Scott with |
| 3 : | * assistance from members of JCP JSR-166 Expert Group and released to | ||
| 4 : | * the public domain, as explained at | ||
| 5 : | dl | 1.29 | * http://creativecommons.org/licenses/publicdomain |
| 6 : | dl | 1.2 | */ |
| 7 : | |||
| 8 : | tim | 1.1 | package java.util.concurrent; |
| 9 : | dl | 1.8 | import java.util.concurrent.locks.*; |
| 10 : | dl | 1.55 | import java.util.concurrent.atomic.*; |
| 11 : | tim | 1.1 | import java.util.*; |
| 12 : | |||
| 13 : | /** | ||
| 14 : | jsr166 | 1.52 | * A {@linkplain BlockingQueue blocking queue} in which each insert |
| 15 : | * operation must wait for a corresponding remove operation by another | ||
| 16 : | * thread, and vice versa. A synchronous queue does not have any | ||
| 17 : | * internal capacity, not even a capacity of one. You cannot | ||
| 18 : | * <tt>peek</tt> at a synchronous queue because an element is only | ||
| 19 : | * present when you try to remove it; you cannot insert an element | ||
| 20 : | * (using any method) unless another thread is trying to remove it; | ||
| 21 : | * you cannot iterate as there is nothing to iterate. The | ||
| 22 : | * <em>head</em> of the queue is the element that the first queued | ||
| 23 : | * inserting thread is trying to add to the queue; if there is no such | ||
| 24 : | * queued thread then no element is available for removal and | ||
| 25 : | * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other | ||
| 26 : | * <tt>Collection</tt> methods (for example <tt>contains</tt>), a | ||
| 27 : | * <tt>SynchronousQueue</tt> acts as an empty collection. This queue | ||
| 28 : | * does not permit <tt>null</tt> elements. | ||
| 29 : | dl | 1.18 | * |
| 30 : | * <p>Synchronous queues are similar to rendezvous channels used in | ||
| 31 : | * CSP and Ada. They are well suited for handoff designs, in which an | ||
| 32 : | dl | 1.30 | * object running in one thread must sync up with an object running |
| 33 : | dl | 1.18 | * in another thread in order to hand it some information, event, or |
| 34 : | * task. | ||
| 35 : | dl | 1.43 | * |
| 36 : | * <p> This class supports an optional fairness policy for ordering | ||
| 37 : | * waiting producer and consumer threads. By default, this ordering | ||
| 38 : | * is not guaranteed. However, a queue constructed with fairness set | ||
| 39 : | jsr166 | 1.58 | * to <tt>true</tt> grants threads access in FIFO order. |
| 40 : | dl | 1.43 | * |
| 41 : | dl | 1.46 | * <p>This class and its iterator implement all of the |
| 42 : | * <em>optional</em> methods of the {@link Collection} and {@link | ||
| 43 : | jsr166 | 1.48 | * Iterator} interfaces. |
| 44 : | dl | 1.42 | * |
| 45 : | * <p>This class is a member of the | ||
| 46 : | jsr166 | 1.66 | * <a href="{@docRoot}/../technotes/guides/collections/index.html"> |
| 47 : | dl | 1.42 | * Java Collections Framework</a>. |
| 48 : | * | ||
| 49 : | dl | 1.6 | * @since 1.5 |
| 50 : | dl | 1.56 | * @author Doug Lea and Bill Scherer and Michael Scott |
| 51 : | dl | 1.24 | * @param <E> the type of elements held in this collection |
| 52 : | dl | 1.23 | */ |
| 53 : | dl | 1.2 | public class SynchronousQueue<E> extends AbstractQueue<E> |
| 54 : | dl | 1.55 | implements BlockingQueue<E>, java.io.Serializable { |
| 55 : | dl | 1.15 | private static final long serialVersionUID = -3223113410248163686L; |
| 56 : | tim | 1.1 | |
| 57 : | dl | 1.2 | /* |
| 58 : | dl | 1.55 | * This class implements extensions of the dual stack and dual |
| 59 : | * queue algorithms described in "Nonblocking Concurrent Objects | ||
| 60 : | * with Condition Synchronization", by W. N. Scherer III and | ||
| 61 : | * M. L. Scott. 18th Annual Conf. on Distributed Computing, | ||
| 62 : | * Oct. 2004 (see also | ||
| 63 : | * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). | ||
| 64 : | * The (Lifo) stack is used for non-fair mode, and the (Fifo) | ||
| 65 : | * queue for fair mode. The performance of the two is generally | ||
| 66 : | * similar. Fifo usually supports higher throughput under | ||
| 67 : | * contention but Lifo maintains higher thread locality in common | ||
| 68 : | * applications. | ||
| 69 : | * | ||
| 70 : | * A dual queue (and similarly stack) is one that at any given | ||
| 71 : | * time either holds "data" -- items provided by put operations, | ||
| 72 : | * or "requests" -- slots representing take operations, or is | ||
| 73 : | * empty. A call to "fulfill" (i.e., a call requesting an item | ||
| 74 : | * from a queue holding data or vice versa) dequeues a | ||
| 75 : | * complementary node. The most interesting feature of these | ||
| 76 : | * queues is that any operation can figure out which mode the | ||
| 77 : | * queue is in, and act accordingly without needing locks. | ||
| 78 : | * | ||
| 79 : | * Both the queue and stack extend abstract class Transferer | ||
| 80 : | * defining the single method transfer that does a put or a | ||
| 81 : | * take. These are unified into a single method because in dual | ||
| 82 : | * data structures, the put and take operations are symmetrical, | ||
| 83 : | * so nearly all code can be combined. The resulting transfer | ||
| 84 : | * methods are on the long side, but are easier to follow than | ||
| 85 : | * they would be if broken up into nearly-duplicated parts. | ||
| 86 : | * | ||
| 87 : | * The queue and stack data structures share many conceptual | ||
| 88 : | * similarities but very few concrete details. For simplicity, | ||
| 89 : | * they are kept distinct so that they can later evolve | ||
| 90 : | * separately. | ||
| 91 : | * | ||
| 92 : | * The algorithms here differ from the versions in the above paper | ||
| 93 : | * in extending them for use in synchronous queues, as well as | ||
| 94 : | * dealing with cancellation. The main differences include: | ||
| 95 : | * | ||
| 96 : | jsr166 | 1.59 | * 1. The original algorithms used bit-marked pointers, but |
| 97 : | dl | 1.55 | * the ones here use mode bits in nodes, leading to a number |
| 98 : | * of further adaptations. | ||
| 99 : | * 2. SynchronousQueues must block threads waiting to become | ||
| 100 : | * fulfilled. | ||
| 101 : | jsr166 | 1.58 | * 3. Support for cancellation via timeout and interrupts, |
| 102 : | * including cleaning out cancelled nodes/threads | ||
| 103 : | dl | 1.56 | * from lists to avoid garbage retention and memory depletion. |
| 104 : | dl | 1.55 | * |
| 105 : | * Blocking is mainly accomplished using LockSupport park/unpark, | ||
| 106 : | * except that nodes that appear to be the next ones to become | ||
| 107 : | * fulfilled first spin a bit (on multiprocessors only). On very | ||
| 108 : | * busy synchronous queues, spinning can dramatically improve | ||
| 109 : | * throughput. And on less busy ones, the amount of spinning is | ||
| 110 : | * small enough not to be noticeable. | ||
| 111 : | * | ||
| 112 : | * Cleaning is done in different ways in queues vs stacks. For | ||
| 113 : | * queues, we can almost always remove a node immediately in O(1) | ||
| 114 : | * time (modulo retries for consistency checks) when it is | ||
| 115 : | * cancelled. But if it may be pinned as the current tail, it must | ||
| 116 : | * wait until some subsequent cancellation. For stacks, we need a | ||
| 117 : | * potentially O(n) traversal to be sure that we can remove the | ||
| 118 : | * node, but this can run concurrently with other threads | ||
| 119 : | * accessing the stack. | ||
| 120 : | * | ||
| 121 : | * While garbage collection takes care of most node reclamation | ||
| 122 : | * issues that otherwise complicate nonblocking algorithms, care | ||
| 123 : | jsr166 | 1.59 | * is taken to "forget" references to data, other nodes, and |
| 124 : | dl | 1.55 | * threads that might be held on to long-term by blocked |
| 125 : | * threads. In cases where setting to null would otherwise | ||
| 126 : | * conflict with main algorithms, this is done by changing a | ||
| 127 : | * node's link to now point to the node itself. This doesn't arise | ||
| 128 : | * much for Stack nodes (because blocked threads do not hang on to | ||
| 129 : | * old head pointers), but references in Queue nodes must be | ||
| 130 : | jsr166 | 1.59 | * aggressively forgotten to avoid reachability of everything any |
| 131 : | dl | 1.55 | * node has ever referred to since arrival. |
| 132 : | */ | ||
| 133 : | dl | 1.2 | |
| 134 : | dl | 1.43 | /** |
| 135 : | dl | 1.55 | * Shared internal API for dual stacks and queues. |
| 136 : | dl | 1.43 | */ |
| 137 : | dl | 1.55 | static abstract class Transferer { |
| 138 : | /** | ||
| 139 : | jsr166 | 1.59 | * Performs a put or take. |
| 140 : | * | ||
| 141 : | dl | 1.55 | * @param e if non-null, the item to be handed to a consumer; |
| 142 : | jsr166 | 1.59 | * if null, requests that transfer return an item |
| 143 : | * offered by producer. | ||
| 144 : | dl | 1.55 | * @param timed if this operation should timeout |
| 145 : | * @param nanos the timeout, in nanoseconds | ||
| 146 : | jsr166 | 1.59 | * @return if non-null, the item provided or received; if null, |
| 147 : | * the operation failed due to timeout or interrupt -- | ||
| 148 : | * the caller can distinguish which of these occurred | ||
| 149 : | * by checking Thread.interrupted. | ||
| 150 : | dl | 1.55 | */ |
| 151 : | abstract Object transfer(Object e, boolean timed, long nanos); | ||
| 152 : | dl | 1.43 | } |
| 153 : | |||
| 154 : | dl | 1.55 | /** The number of CPUs, for spin control */ |
| 155 : | static final int NCPUS = Runtime.getRuntime().availableProcessors(); | ||
| 156 : | |||
| 157 : | dl | 1.43 | /** |
| 158 : | dl | 1.55 | * The number of times to spin before blocking in timed waits. |
| 159 : | * The value is empirically derived -- it works well across a | ||
| 160 : | dl | 1.56 | * variety of processors and OSes. Empirically, the best value |
| 161 : | dl | 1.55 | * seems not to vary with number of CPUs (beyond 2) so is just |
| 162 : | * a constant. | ||
| 163 : | dl | 1.43 | */ |
| 164 : | dl | 1.55 | static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; |
| 165 : | dl | 1.43 | |
| 166 : | /** | ||
| 167 : | jsr166 | 1.60 | * The number of times to spin before blocking in untimed waits. |
| 168 : | * This is greater than timed value because untimed waits spin | ||
| 169 : | * faster since they don't need to check times on each spin. | ||
| 170 : | dl | 1.43 | */ |
| 171 : | dl | 1.55 | static final int maxUntimedSpins = maxTimedSpins * 16; |
| 172 : | dl | 1.43 | |
| 173 : | /** | ||
| 174 : | dl | 1.55 | * The number of nanoseconds for which it is faster to spin |
| 175 : | * rather than to use timed park. A rough estimate suffices. | ||
| 176 : | dl | 1.43 | */ |
| 177 : | dl | 1.55 | static final long spinForTimeoutThreshold = 1000L; |
| 178 : | |||
| 179 : | jsr166 | 1.60 | /** Dual stack */ |
| 180 : | dl | 1.55 | static final class TransferStack extends Transferer { |
| 181 : | /* | ||
| 182 : | * This extends Scherer-Scott dual stack algorithm, differing, | ||
| 183 : | * among other ways, by using "covering" nodes rather than | ||
| 184 : | * bit-marked pointers: Fulfilling operations push on marker | ||
| 185 : | * nodes (with FULFILLING bit set in mode) to reserve a spot | ||
| 186 : | * to match a waiting node. | ||
| 187 : | */ | ||
| 188 : | dl | 1.43 | |
| 189 : | dl | 1.55 | /* Modes for SNodes, ORed together in node fields */ |
| 190 : | /** Node represents an unfulfilled consumer */ | ||
| 191 : | static final int REQUEST = 0; | ||
| 192 : | /** Node represents an unfulfilled producer */ | ||
| 193 : | static final int DATA = 1; | ||
| 194 : | /** Node is fulfilling another unfulfilled DATA or REQUEST */ | ||
| 195 : | static final int FULFILLING = 2; | ||
| 196 : | |||
| 197 : | /** Return true if m has fulfilling bit set */ | ||
| 198 : | static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } | ||
| 199 : | |||
| 200 : | /** Node class for TransferStacks. */ | ||
| 201 : | static final class SNode { | ||
| 202 : | volatile SNode next; // next node in stack | ||
| 203 : | volatile SNode match; // the node matched to this | ||
| 204 : | volatile Thread waiter; // to control park/unpark | ||
| 205 : | Object item; // data; or null for REQUESTs | ||
| 206 : | int mode; | ||
| 207 : | // Note: item and mode fields don't need to be volatile | ||
| 208 : | // since they are always written before, and read after, | ||
| 209 : | // other volatile/atomic operations. | ||
| 210 : | |||
| 211 : | SNode(Object item) { | ||
| 212 : | this.item = item; | ||
| 213 : | } | ||
| 214 : | |||
| 215 : | static final AtomicReferenceFieldUpdater<SNode, SNode> | ||
| 216 : | nextUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 217 : | (SNode.class, SNode.class, "next"); | ||
| 218 : | |||
| 219 : | boolean casNext(SNode cmp, SNode val) { | ||
| 220 : | return (cmp == next && | ||
| 221 : | nextUpdater.compareAndSet(this, cmp, val)); | ||
| 222 : | } | ||
| 223 : | |||
| 224 : | static final AtomicReferenceFieldUpdater<SNode, SNode> | ||
| 225 : | matchUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 226 : | (SNode.class, SNode.class, "match"); | ||
| 227 : | |||
| 228 : | /** | ||
| 229 : | jsr166 | 1.63 | * Tries to match node s to this node, if so, waking up thread. |
| 230 : | * Fulfillers call tryMatch to identify their waiters. | ||
| 231 : | * Waiters block until they have been matched. | ||
| 232 : | * | ||
| 233 : | dl | 1.55 | * @param s the node to match |
| 234 : | * @return true if successfully matched to s | ||
| 235 : | */ | ||
| 236 : | boolean tryMatch(SNode s) { | ||
| 237 : | if (match == null && | ||
| 238 : | matchUpdater.compareAndSet(this, null, s)) { | ||
| 239 : | Thread w = waiter; | ||
| 240 : | if (w != null) { // waiters need at most one unpark | ||
| 241 : | waiter = null; | ||
| 242 : | LockSupport.unpark(w); | ||
| 243 : | } | ||
| 244 : | return true; | ||
| 245 : | dl | 1.47 | } |
| 246 : | dl | 1.55 | return match == s; |
| 247 : | } | ||
| 248 : | |||
| 249 : | /** | ||
| 250 : | jsr166 | 1.59 | * Tries to cancel a wait by matching node to itself. |
| 251 : | dl | 1.55 | */ |
| 252 : | void tryCancel() { | ||
| 253 : | matchUpdater.compareAndSet(this, null, this); | ||
| 254 : | } | ||
| 255 : | |||
| 256 : | boolean isCancelled() { | ||
| 257 : | return match == this; | ||
| 258 : | dl | 1.47 | } |
| 259 : | } | ||
| 260 : | dl | 1.43 | |
| 261 : | dl | 1.55 | /** The head (top) of the stack */ |
| 262 : | volatile SNode head; | ||
| 263 : | |||
| 264 : | static final AtomicReferenceFieldUpdater<TransferStack, SNode> | ||
| 265 : | headUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 266 : | (TransferStack.class, SNode.class, "head"); | ||
| 267 : | |||
| 268 : | boolean casHead(SNode h, SNode nh) { | ||
| 269 : | return h == head && headUpdater.compareAndSet(this, h, nh); | ||
| 270 : | } | ||
| 271 : | dl | 1.2 | |
| 272 : | dl | 1.55 | /** |
| 273 : | jsr166 | 1.57 | * Creates or resets fields of a node. Called only from transfer |
| 274 : | dl | 1.55 | * where the node to push on stack is lazily created and |
| 275 : | * reused when possible to help reduce intervals between reads | ||
| 276 : | * and CASes of head and to avoid surges of garbage when CASes | ||
| 277 : | * to push nodes fail due to contention. | ||
| 278 : | */ | ||
| 279 : | static SNode snode(SNode s, Object e, SNode next, int mode) { | ||
| 280 : | if (s == null) s = new SNode(e); | ||
| 281 : | s.mode = mode; | ||
| 282 : | s.next = next; | ||
| 283 : | return s; | ||
| 284 : | dl | 1.43 | } |
| 285 : | |||
| 286 : | dl | 1.55 | /** |
| 287 : | jsr166 | 1.57 | * Puts or takes an item. |
| 288 : | dl | 1.55 | */ |
| 289 : | Object transfer(Object e, boolean timed, long nanos) { | ||
| 290 : | /* | ||
| 291 : | * Basic algorithm is to loop trying one of three actions: | ||
| 292 : | * | ||
| 293 : | * 1. If apparently empty or already containing nodes of same | ||
| 294 : | * mode, try to push node on stack and wait for a match, | ||
| 295 : | * returning it, or null if cancelled. | ||
| 296 : | * | ||
| 297 : | * 2. If apparently containing node of complementary mode, | ||
| 298 : | * try to push a fulfilling node on to stack, match | ||
| 299 : | * with corresponding waiting node, pop both from | ||
| 300 : | * stack, and return matched item. The matching or | ||
| 301 : | * unlinking might not actually be necessary because of | ||
| 302 : | dl | 1.62 | * other threads performing action 3: |
| 303 : | dl | 1.55 | * |
| 304 : | * 3. If top of stack already holds another fulfilling node, | ||
| 305 : | * help it out by doing its match and/or pop | ||
| 306 : | * operations, and then continue. The code for helping | ||
| 307 : | * is essentially the same as for fulfilling, except | ||
| 308 : | * that it doesn't return the item. | ||
| 309 : | */ | ||
| 310 : | |||
| 311 : | SNode s = null; // constructed/reused as needed | ||
| 312 : | int mode = (e == null)? REQUEST : DATA; | ||
| 313 : | |||
| 314 : | for (;;) { | ||
| 315 : | SNode h = head; | ||
| 316 : | if (h == null || h.mode == mode) { // empty or same-mode | ||
| 317 : | if (timed && nanos <= 0) { // can't wait | ||
| 318 : | jsr166 | 1.58 | if (h != null && h.isCancelled()) |
| 319 : | dl | 1.55 | casHead(h, h.next); // pop cancelled node |
| 320 : | else | ||
| 321 : | jsr166 | 1.58 | return null; |
| 322 : | dl | 1.55 | } else if (casHead(h, s = snode(s, e, h, mode))) { |
| 323 : | SNode m = awaitFulfill(s, timed, nanos); | ||
| 324 : | if (m == s) { // wait was cancelled | ||
| 325 : | clean(s); | ||
| 326 : | return null; | ||
| 327 : | } | ||
| 328 : | if ((h = head) != null && h.next == s) | ||
| 329 : | casHead(h, s.next); // help s's fulfiller | ||
| 330 : | return mode == REQUEST? m.item : s.item; | ||
| 331 : | } | ||
| 332 : | } else if (!isFulfilling(h.mode)) { // try to fulfill | ||
| 333 : | if (h.isCancelled()) // already cancelled | ||
| 334 : | casHead(h, h.next); // pop and retry | ||
| 335 : | else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { | ||
| 336 : | for (;;) { // loop until matched or waiters disappear | ||
| 337 : | SNode m = s.next; // m is s's match | ||
| 338 : | if (m == null) { // all waiters are gone | ||
| 339 : | casHead(s, null); // pop fulfill node | ||
| 340 : | s = null; // use new node next time | ||
| 341 : | break; // restart main loop | ||
| 342 : | } | ||
| 343 : | SNode mn = m.next; | ||
| 344 : | if (m.tryMatch(s)) { | ||
| 345 : | casHead(s, mn); // pop both s and m | ||
| 346 : | return (mode == REQUEST)? m.item : s.item; | ||
| 347 : | } else // lost match | ||
| 348 : | s.casNext(m, mn); // help unlink | ||
| 349 : | } | ||
| 350 : | } | ||
| 351 : | } else { // help a fulfiller | ||
| 352 : | SNode m = h.next; // m is h's match | ||
| 353 : | if (m == null) // waiter is gone | ||
| 354 : | casHead(h, null); // pop fulfilling node | ||
| 355 : | else { | ||
| 356 : | SNode mn = m.next; | ||
| 357 : | if (m.tryMatch(h)) // help match | ||
| 358 : | casHead(h, mn); // pop both h and m | ||
| 359 : | else // lost match | ||
| 360 : | h.casNext(m, mn); // help unlink | ||
| 361 : | } | ||
| 362 : | dl | 1.47 | } |
| 363 : | } | ||
| 364 : | } | ||
| 365 : | |||
| 366 : | dl | 1.55 | /** |
| 367 : | jsr166 | 1.57 | * Spins/blocks until node s is matched by a fulfill operation. |
| 368 : | jsr166 | 1.63 | * |
| 369 : | dl | 1.55 | * @param s the waiting node |
| 370 : | * @param timed true if timed wait | ||
| 371 : | * @param nanos timeout value | ||
| 372 : | * @return matched node, or s if cancelled | ||
| 373 : | */ | ||
| 374 : | SNode awaitFulfill(SNode s, boolean timed, long nanos) { | ||
| 375 : | /* | ||
| 376 : | * When a node/thread is about to block, it sets its waiter | ||
| 377 : | * field and then rechecks state at least one more time | ||
| 378 : | * before actually parking, thus covering race vs | ||
| 379 : | jsr166 | 1.59 | * fulfiller noticing that waiter is non-null so should be |
| 380 : | dl | 1.55 | * woken. |
| 381 : | * | ||
| 382 : | * When invoked by nodes that appear at the point of call | ||
| 383 : | * to be at the head of the stack, calls to park are | ||
| 384 : | * preceded by spins to avoid blocking when producers and | ||
| 385 : | * consumers are arriving very close in time. This can | ||
| 386 : | * happen enough to bother only on multiprocessors. | ||
| 387 : | * | ||
| 388 : | * The order of checks for returning out of main loop | ||
| 389 : | * reflects fact that interrupts have precedence over | ||
| 390 : | * normal returns, which have precedence over | ||
| 391 : | * timeouts. (So, on timeout, one last check for match is | ||
| 392 : | * done before giving up.) Except that calls from untimed | ||
| 393 : | * SynchronousQueue.{poll/offer} don't check interrupts | ||
| 394 : | * and don't wait at all, so are trapped in transfer | ||
| 395 : | * method rather than calling awaitFulfill. | ||
| 396 : | */ | ||
| 397 : | long lastTime = (timed)? System.nanoTime() : 0; | ||
| 398 : | Thread w = Thread.currentThread(); | ||
| 399 : | SNode h = head; | ||
| 400 : | int spins = (shouldSpin(s)? | ||
| 401 : | (timed? maxTimedSpins : maxUntimedSpins) : 0); | ||
| 402 : | for (;;) { | ||
| 403 : | if (w.isInterrupted()) | ||
| 404 : | s.tryCancel(); | ||
| 405 : | SNode m = s.match; | ||
| 406 : | if (m != null) | ||
| 407 : | return m; | ||
| 408 : | if (timed) { | ||
| 409 : | long now = System.nanoTime(); | ||
| 410 : | jsr166 | 1.58 | nanos -= now - lastTime; |
| 411 : | dl | 1.55 | lastTime = now; |
| 412 : | if (nanos <= 0) { | ||
| 413 : | s.tryCancel(); | ||
| 414 : | continue; | ||
| 415 : | } | ||
| 416 : | } | ||
| 417 : | if (spins > 0) | ||
| 418 : | spins = shouldSpin(s)? (spins-1) : 0; | ||
| 419 : | else if (s.waiter == null) | ||
| 420 : | s.waiter = w; // establish waiter so can park next iter | ||
| 421 : | else if (!timed) | ||
| 422 : | LockSupport.park(this); | ||
| 423 : | else if (nanos > spinForTimeoutThreshold) | ||
| 424 : | LockSupport.parkNanos(this, nanos); | ||
| 425 : | dl | 1.47 | } |
| 426 : | } | ||
| 427 : | dl | 1.2 | |
| 428 : | dl | 1.55 | /** |
| 429 : | jsr166 | 1.57 | * Returns true if node s is at head or there is an active |
| 430 : | dl | 1.55 | * fulfiller. |
| 431 : | */ | ||
| 432 : | boolean shouldSpin(SNode s) { | ||
| 433 : | SNode h = head; | ||
| 434 : | dl | 1.56 | return (h == s || h == null || isFulfilling(h.mode)); |
| 435 : | dl | 1.55 | } |
| 436 : | |||
| 437 : | /** | ||
| 438 : | jsr166 | 1.57 | * Unlinks s from the stack. |
| 439 : | dl | 1.55 | */ |
| 440 : | void clean(SNode s) { | ||
| 441 : | jsr166 | 1.58 | s.item = null; // forget item |
| 442 : | dl | 1.55 | s.waiter = null; // forget thread |
| 443 : | |||
| 444 : | /* | ||
| 445 : | * At worst we may need to traverse entire stack to unlink | ||
| 446 : | * s. If there are multiple concurrent calls to clean, we | ||
| 447 : | * might not see s if another thread has already removed | ||
| 448 : | * it. But we can stop when we see any node known to | ||
| 449 : | * follow s. We use s.next unless it too is cancelled, in | ||
| 450 : | * which case we try the node one past. We don't check any | ||
| 451 : | jsr166 | 1.59 | * further because we don't want to doubly traverse just to |
| 452 : | dl | 1.55 | * find sentinel. |
| 453 : | */ | ||
| 454 : | |||
| 455 : | SNode past = s.next; | ||
| 456 : | if (past != null && past.isCancelled()) | ||
| 457 : | past = past.next; | ||
| 458 : | |||
| 459 : | // Absorb cancelled nodes at head | ||
| 460 : | SNode p; | ||
| 461 : | while ((p = head) != null && p != past && p.isCancelled()) | ||
| 462 : | casHead(p, p.next); | ||
| 463 : | |||
| 464 : | // Unsplice embedded nodes | ||
| 465 : | while (p != null && p != past) { | ||
| 466 : | SNode n = p.next; | ||
| 467 : | if (n != null && n.isCancelled()) | ||
| 468 : | p.casNext(n, n.next); | ||
| 469 : | else | ||
| 470 : | p = n; | ||
| 471 : | dl | 1.47 | } |
| 472 : | } | ||
| 473 : | } | ||
| 474 : | jsr166 | 1.48 | |
| 475 : | jsr166 | 1.61 | /** Dual Queue */ |
| 476 : | dl | 1.55 | static final class TransferQueue extends Transferer { |
| 477 : | /* | ||
| 478 : | * This extends Scherer-Scott dual queue algorithm, differing, | ||
| 479 : | * among other ways, by using modes within nodes rather than | ||
| 480 : | * marked pointers. The algorithm is a little simpler than | ||
| 481 : | * that for stacks because fulfillers do not need explicit | ||
| 482 : | * nodes, and matching is done by CAS'ing QNode.item field | ||
| 483 : | jsr166 | 1.59 | * from non-null to null (for put) or vice versa (for take). |
| 484 : | dl | 1.55 | */ |
| 485 : | dl | 1.53 | |
| 486 : | dl | 1.55 | /** Node class for TransferQueue. */ |
| 487 : | static final class QNode { | ||
| 488 : | volatile QNode next; // next node in queue | ||
| 489 : | volatile Object item; // CAS'ed to or from null | ||
| 490 : | volatile Thread waiter; // to control park/unpark | ||
| 491 : | jsr166 | 1.58 | final boolean isData; |
| 492 : | dl | 1.35 | |
| 493 : | dl | 1.55 | QNode(Object item, boolean isData) { |
| 494 : | this.item = item; | ||
| 495 : | this.isData = isData; | ||
| 496 : | } | ||
| 497 : | dl | 1.35 | |
| 498 : | dl | 1.55 | static final AtomicReferenceFieldUpdater<QNode, QNode> |
| 499 : | nextUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 500 : | (QNode.class, QNode.class, "next"); | ||
| 501 : | dl | 1.31 | |
| 502 : | dl | 1.55 | boolean casNext(QNode cmp, QNode val) { |
| 503 : | return (next == cmp && | ||
| 504 : | nextUpdater.compareAndSet(this, cmp, val)); | ||
| 505 : | } | ||
| 506 : | |||
| 507 : | static final AtomicReferenceFieldUpdater<QNode, Object> | ||
| 508 : | itemUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 509 : | (QNode.class, Object.class, "item"); | ||
| 510 : | dl | 1.43 | |
| 511 : | dl | 1.55 | boolean casItem(Object cmp, Object val) { |
| 512 : | return (item == cmp && | ||
| 513 : | itemUpdater.compareAndSet(this, cmp, val)); | ||
| 514 : | } | ||
| 515 : | |||
| 516 : | /** | ||
| 517 : | jsr166 | 1.59 | * Tries to cancel by CAS'ing ref to this as item. |
| 518 : | dl | 1.55 | */ |
| 519 : | void tryCancel(Object cmp) { | ||
| 520 : | itemUpdater.compareAndSet(this, cmp, this); | ||
| 521 : | } | ||
| 522 : | |||
| 523 : | boolean isCancelled() { | ||
| 524 : | return item == this; | ||
| 525 : | } | ||
| 526 : | dl | 1.56 | |
| 527 : | jsr166 | 1.58 | /** |
| 528 : | jsr166 | 1.57 | * Returns true if this node is known to be off the queue |
| 529 : | dl | 1.56 | * because its next pointer has been forgotten due to |
| 530 : | * an advanceHead operation. | ||
| 531 : | */ | ||
| 532 : | boolean isOffList() { | ||
| 533 : | return next == this; | ||
| 534 : | } | ||
| 535 : | dl | 1.31 | } |
| 536 : | |||
| 537 : | dl | 1.55 | /** Head of queue */ |
| 538 : | transient volatile QNode head; | ||
| 539 : | /** Tail of queue */ | ||
| 540 : | transient volatile QNode tail; | ||
| 541 : | dl | 1.31 | /** |
| 542 : | dl | 1.55 | * Reference to a cancelled node that might not yet have been |
| 543 : | * unlinked from queue because it was the last inserted node | ||
| 544 : | * when it cancelled. | ||
| 545 : | dl | 1.31 | */ |
| 546 : | dl | 1.55 | transient volatile QNode cleanMe; |
| 547 : | |||
| 548 : | TransferQueue() { | ||
| 549 : | QNode h = new QNode(null, false); // initialize to dummy node. | ||
| 550 : | head = h; | ||
| 551 : | tail = h; | ||
| 552 : | dl | 1.31 | } |
| 553 : | |||
| 554 : | dl | 1.55 | static final AtomicReferenceFieldUpdater<TransferQueue, QNode> |
| 555 : | headUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 556 : | (TransferQueue.class, QNode.class, "head"); | ||
| 557 : | |||
| 558 : | dl | 1.31 | /** |
| 559 : | jsr166 | 1.59 | * Tries to cas nh as new head; if successful, unlink |
| 560 : | dl | 1.55 | * old head's next node to avoid garbage retention. |
| 561 : | dl | 1.31 | */ |
| 562 : | dl | 1.55 | void advanceHead(QNode h, QNode nh) { |
| 563 : | if (h == head && headUpdater.compareAndSet(this, h, nh)) | ||
| 564 : | h.next = h; // forget old next | ||
| 565 : | dl | 1.31 | } |
| 566 : | |||
| 567 : | dl | 1.55 | static final AtomicReferenceFieldUpdater<TransferQueue, QNode> |
| 568 : | tailUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 569 : | (TransferQueue.class, QNode.class, "tail"); | ||
| 570 : | |||
| 571 : | dl | 1.31 | /** |
| 572 : | jsr166 | 1.57 | * Tries to cas nt as new tail. |
| 573 : | dl | 1.31 | */ |
| 574 : | dl | 1.55 | void advanceTail(QNode t, QNode nt) { |
| 575 : | if (tail == t) | ||
| 576 : | tailUpdater.compareAndSet(this, t, nt); | ||
| 577 : | dl | 1.31 | } |
| 578 : | dl | 1.2 | |
| 579 : | dl | 1.55 | static final AtomicReferenceFieldUpdater<TransferQueue, QNode> |
| 580 : | cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater | ||
| 581 : | (TransferQueue.class, QNode.class, "cleanMe"); | ||
| 582 : | dl | 1.2 | |
| 583 : | /** | ||
| 584 : | jsr166 | 1.57 | * Tries to CAS cleanMe slot. |
| 585 : | dl | 1.2 | */ |
| 586 : | dl | 1.55 | boolean casCleanMe(QNode cmp, QNode val) { |
| 587 : | return (cleanMe == cmp && | ||
| 588 : | cleanMeUpdater.compareAndSet(this, cmp, val)); | ||
| 589 : | dl | 1.35 | } |
| 590 : | |||
| 591 : | /** | ||
| 592 : | jsr166 | 1.57 | * Puts or takes an item. |
| 593 : | dl | 1.35 | */ |
| 594 : | dl | 1.55 | Object transfer(Object e, boolean timed, long nanos) { |
| 595 : | jsr166 | 1.58 | /* Basic algorithm is to loop trying to take either of |
| 596 : | dl | 1.55 | * two actions: |
| 597 : | * | ||
| 598 : | jsr166 | 1.58 | * 1. If queue apparently empty or holding same-mode nodes, |
| 599 : | dl | 1.55 | * try to add node to queue of waiters, wait to be |
| 600 : | * fulfilled (or cancelled) and return matching item. | ||
| 601 : | * | ||
| 602 : | * 2. If queue apparently contains waiting items, and this | ||
| 603 : | * call is of complementary mode, try to fulfill by CAS'ing | ||
| 604 : | * item field of waiting node and dequeuing it, and then | ||
| 605 : | * returning matching item. | ||
| 606 : | * | ||
| 607 : | * In each case, along the way, check for and try to help | ||
| 608 : | * advance head and tail on behalf of other stalled/slow | ||
| 609 : | * threads. | ||
| 610 : | * | ||
| 611 : | * The loop starts off with a null check guarding against | ||
| 612 : | * seeing uninitialized head or tail values. This never | ||
| 613 : | * happens in current SynchronousQueue, but could if | ||
| 614 : | * callers held non-volatile/final ref to the | ||
| 615 : | * transferer. The check is here anyway because it places | ||
| 616 : | * null checks at top of loop, which is usually faster | ||
| 617 : | * than having them implicitly interspersed. | ||
| 618 : | */ | ||
| 619 : | |||
| 620 : | QNode s = null; // constructed/reused as needed | ||
| 621 : | boolean isData = (e != null); | ||
| 622 : | |||
| 623 : | for (;;) { | ||
| 624 : | QNode t = tail; | ||
| 625 : | QNode h = head; | ||
| 626 : | dl | 1.62 | if (t == null || h == null) // saw uninitialized value |
| 627 : | dl | 1.55 | continue; // spin |
| 628 : | |||
| 629 : | if (h == t || t.isData == isData) { // empty or same-mode | ||
| 630 : | QNode tn = t.next; | ||
| 631 : | if (t != tail) // inconsistent read | ||
| 632 : | continue; | ||
| 633 : | if (tn != null) { // lagging tail | ||
| 634 : | advanceTail(t, tn); | ||
| 635 : | continue; | ||
| 636 : | } | ||
| 637 : | if (timed && nanos <= 0) // can't wait | ||
| 638 : | return null; | ||
| 639 : | if (s == null) | ||
| 640 : | s = new QNode(e, isData); | ||
| 641 : | if (!t.casNext(null, s)) // failed to link in | ||
| 642 : | continue; | ||
| 643 : | |||
| 644 : | advanceTail(t, s); // swing tail and wait | ||
| 645 : | Object x = awaitFulfill(s, e, timed, nanos); | ||
| 646 : | if (x == s) { // wait was cancelled | ||
| 647 : | clean(t, s); | ||
| 648 : | return null; | ||
| 649 : | } | ||
| 650 : | |||
| 651 : | dl | 1.56 | if (!s.isOffList()) { // not already unlinked |
| 652 : | advanceHead(t, s); // unlink if head | ||
| 653 : | dl | 1.55 | if (x != null) // and forget fields |
| 654 : | s.item = s; | ||
| 655 : | s.waiter = null; | ||
| 656 : | } | ||
| 657 : | return (x != null)? x : e; | ||
| 658 : | |||
| 659 : | } else { // complementary-mode | ||
| 660 : | QNode m = h.next; // node to fulfill | ||
| 661 : | if (t != tail || m == null || h != head) | ||
| 662 : | continue; // inconsistent read | ||
| 663 : | |||
| 664 : | Object x = m.item; | ||
| 665 : | if (isData == (x != null) || // m already fulfilled | ||
| 666 : | x == m || // m cancelled | ||
| 667 : | !m.casItem(x, e)) { // lost CAS | ||
| 668 : | advanceHead(h, m); // dequeue and retry | ||
| 669 : | continue; | ||
| 670 : | } | ||
| 671 : | |||
| 672 : | advanceHead(h, m); // successfully fulfilled | ||
| 673 : | LockSupport.unpark(m.waiter); | ||
| 674 : | return (x != null)? x : e; | ||
| 675 : | } | ||
| 676 : | dl | 1.35 | } |
| 677 : | } | ||
| 678 : | |||
| 679 : | /** | ||
| 680 : | jsr166 | 1.57 | * Spins/blocks until node s is fulfilled. |
| 681 : | jsr166 | 1.63 | * |
| 682 : | dl | 1.55 | * @param s the waiting node |
| 683 : | * @param e the comparison value for checking match | ||
| 684 : | * @param timed true if timed wait | ||
| 685 : | * @param nanos timeout value | ||
| 686 : | * @return matched item, or s if cancelled | ||
| 687 : | dl | 1.35 | */ |
| 688 : | dl | 1.55 | Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { |
| 689 : | /* Same idea as TransferStack.awaitFulfill */ | ||
| 690 : | long lastTime = (timed)? System.nanoTime() : 0; | ||
| 691 : | Thread w = Thread.currentThread(); | ||
| 692 : | int spins = ((head.next == s) ? | ||
| 693 : | (timed? maxTimedSpins : maxUntimedSpins) : 0); | ||
| 694 : | for (;;) { | ||
| 695 : | if (w.isInterrupted()) | ||
| 696 : | s.tryCancel(e); | ||
| 697 : | Object x = s.item; | ||
| 698 : | if (x != e) | ||
| 699 : | return x; | ||
| 700 : | if (timed) { | ||
| 701 : | long now = System.nanoTime(); | ||
| 702 : | jsr166 | 1.58 | nanos -= now - lastTime; |
| 703 : | dl | 1.55 | lastTime = now; |
| 704 : | if (nanos <= 0) { | ||
| 705 : | s.tryCancel(e); | ||
| 706 : | continue; | ||
| 707 : | } | ||
| 708 : | } | ||
| 709 : | if (spins > 0) | ||
| 710 : | --spins; | ||
| 711 : | else if (s.waiter == null) | ||
| 712 : | s.waiter = w; | ||
| 713 : | else if (!timed) | ||
| 714 : | LockSupport.park(this); | ||
| 715 : | else if (nanos > spinForTimeoutThreshold) | ||
| 716 : | LockSupport.parkNanos(this, nanos); | ||
| 717 : | dl | 1.35 | } |
| 718 : | dl | 1.31 | } |
| 719 : | |||
| 720 : | /** | ||
| 721 : | jsr166 | 1.57 | * Gets rid of cancelled node s with original predecessor pred. |
| 722 : | dl | 1.31 | */ |
| 723 : | dl | 1.55 | void clean(QNode pred, QNode s) { |
| 724 : | s.waiter = null; // forget thread | ||
| 725 : | /* | ||
| 726 : | * At any given time, exactly one node on list cannot be | ||
| 727 : | * deleted -- the last inserted node. To accommodate this, | ||
| 728 : | * if we cannot delete s, we save its predecessor as | ||
| 729 : | * "cleanMe", deleting the previously saved version | ||
| 730 : | * first. At least one of node s or the node previously | ||
| 731 : | * saved can always be deleted, so this always terminates. | ||
| 732 : | */ | ||
| 733 : | while (pred.next == s) { // Return early if already unlinked | ||
| 734 : | QNode h = head; | ||
| 735 : | QNode hn = h.next; // Absorb cancelled first node as head | ||
| 736 : | if (hn != null && hn.isCancelled()) { | ||
| 737 : | advanceHead(h, hn); | ||
| 738 : | continue; | ||
| 739 : | } | ||
| 740 : | QNode t = tail; // Ensure consistent read for tail | ||
| 741 : | if (t == h) | ||
| 742 : | return; | ||
| 743 : | QNode tn = t.next; | ||
| 744 : | if (t != tail) | ||
| 745 : | continue; | ||
| 746 : | if (tn != null) { | ||
| 747 : | advanceTail(t, tn); | ||
| 748 : | continue; | ||
| 749 : | } | ||
| 750 : | if (s != t) { // If not tail, try to unsplice | ||
| 751 : | QNode sn = s.next; | ||
| 752 : | if (sn == s || pred.casNext(s, sn)) | ||
| 753 : | return; | ||
| 754 : | } | ||
| 755 : | QNode dp = cleanMe; | ||
| 756 : | if (dp != null) { // Try unlinking previous cancelled node | ||
| 757 : | QNode d = dp.next; | ||
| 758 : | QNode dn; | ||
| 759 : | if (d == null || // d is gone or | ||
| 760 : | d == dp || // d is off list or | ||
| 761 : | !d.isCancelled() || // d not cancelled or | ||
| 762 : | (d != t && // d not tail and | ||
| 763 : | (dn = d.next) != null && // has successor | ||
| 764 : | dn != d && // that is on list | ||
| 765 : | dp.casNext(d, dn))) // d unspliced | ||
| 766 : | jsr166 | 1.58 | casCleanMe(dp, null); |
| 767 : | if (dp == pred) | ||
| 768 : | dl | 1.55 | return; // s is already saved node |
| 769 : | jsr166 | 1.58 | } else if (casCleanMe(null, pred)) |
| 770 : | dl | 1.55 | return; // Postpone cleaning s |
| 771 : | dl | 1.2 | } |
| 772 : | } | ||
| 773 : | dl | 1.55 | } |
| 774 : | |||
| 775 : | /** | ||
| 776 : | * The transferer. Set only in constructor, but cannot be declared | ||
| 777 : | * as final without further complicating serialization. Since | ||
| 778 : | dl | 1.56 | * this is accessed only at most once per public method, there |
| 779 : | * isn't a noticeable performance penalty for using volatile | ||
| 780 : | * instead of final here. | ||
| 781 : | dl | 1.55 | */ |
| 782 : | private transient volatile Transferer transferer; | ||
| 783 : | |||
| 784 : | /** | ||
| 785 : | * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. | ||
| 786 : | */ | ||
| 787 : | public SynchronousQueue() { | ||
| 788 : | this(false); | ||
| 789 : | } | ||
| 790 : | dl | 1.2 | |
| 791 : | dl | 1.55 | /** |
| 792 : | jsr166 | 1.63 | * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy. |
| 793 : | * | ||
| 794 : | * @param fair if true, waiting threads contend in FIFO order for | ||
| 795 : | * access; otherwise the order is unspecified. | ||
| 796 : | dl | 1.55 | */ |
| 797 : | public SynchronousQueue(boolean fair) { | ||
| 798 : | transferer = (fair)? new TransferQueue() : new TransferStack(); | ||
| 799 : | dl | 1.2 | } |
| 800 : | |||
| 801 : | /** | ||
| 802 : | dl | 1.35 | * Adds the specified element to this queue, waiting if necessary for |
| 803 : | * another thread to receive it. | ||
| 804 : | jsr166 | 1.50 | * |
| 805 : | * @throws InterruptedException {@inheritDoc} | ||
| 806 : | * @throws NullPointerException {@inheritDoc} | ||
| 807 : | tim | 1.10 | */ |
| 808 : | dl | 1.55 | public void put(E o) throws InterruptedException { |
| 809 : | if (o == null) throw new NullPointerException(); | ||
| 810 : | dl | 1.64 | if (transferer.transfer(o, false, 0) == null) { |
| 811 : | jsr166 | 1.65 | Thread.interrupted(); |
| 812 : | dl | 1.55 | throw new InterruptedException(); |
| 813 : | jsr166 | 1.65 | } |
| 814 : | tim | 1.1 | } |
| 815 : | |||
| 816 : | dholmes | 1.11 | /** |
| 817 : | dl | 1.20 | * Inserts the specified element into this queue, waiting if necessary |
| 818 : | dl | 1.18 | * up to the specified wait time for another thread to receive it. |
| 819 : | jsr166 | 1.50 | * |
| 820 : | * @return <tt>true</tt> if successful, or <tt>false</tt> if the | ||
| 821 : | * specified waiting time elapses before a consumer appears. | ||
| 822 : | * @throws InterruptedException {@inheritDoc} | ||
| 823 : | * @throws NullPointerException {@inheritDoc} | ||
| 824 : | dholmes | 1.11 | */ |
| 825 : | jsr166 | 1.58 | public boolean offer(E o, long timeout, TimeUnit unit) |
| 826 : | dl | 1.55 | throws InterruptedException { |
| 827 : | if (o == null) throw new NullPointerException(); | ||
| 828 : | if (transferer.transfer(o, true, unit.toNanos(timeout)) != null) | ||
| 829 : | return true; | ||
| 830 : | if (!Thread.interrupted()) | ||
| 831 : | return false; | ||
| 832 : | throw new InterruptedException(); | ||
| 833 : | } | ||
| 834 : | |||
| 835 : | /** | ||
| 836 : | * Inserts the specified element into this queue, if another thread is | ||
| 837 : | * waiting to receive it. | ||
| 838 : | * | ||
| 839 : | * @param e the element to add | ||
| 840 : | * @return <tt>true</tt> if the element was added to this queue, else | ||
| 841 : | * <tt>false</tt> | ||
| 842 : | * @throws NullPointerException if the specified element is null | ||
| 843 : | */ | ||
| 844 : | public boolean offer(E e) { | ||
| 845 : | jsr166 | 1.49 | if (e == null) throw new NullPointerException(); |
| 846 : | dl | 1.55 | return transferer.transfer(e, true, 0) != null; |
| 847 : | tim | 1.1 | } |
| 848 : | |||
| 849 : | dholmes | 1.11 | /** |
| 850 : | * Retrieves and removes the head of this queue, waiting if necessary | ||
| 851 : | * for another thread to insert it. | ||
| 852 : | jsr166 | 1.50 | * |
| 853 : | dholmes | 1.11 | * @return the head of this queue |
| 854 : | jsr166 | 1.50 | * @throws InterruptedException {@inheritDoc} |
| 855 : | dholmes | 1.11 | */ |
| 856 : | dl | 1.2 | public E take() throws InterruptedException { |
| 857 : | dl | 1.55 | Object e = transferer.transfer(null, false, 0); |
| 858 : | if (e != null) | ||
| 859 : | return (E)e; | ||
| 860 : | jsr166 | 1.65 | Thread.interrupted(); |
| 861 : | dl | 1.55 | throw new InterruptedException(); |
| 862 : | tim | 1.1 | } |
| 863 : | dl | 1.2 | |
| 864 : | dholmes | 1.11 | /** |
| 865 : | * Retrieves and removes the head of this queue, waiting | ||
| 866 : | * if necessary up to the specified wait time, for another thread | ||
| 867 : | * to insert it. | ||
| 868 : | jsr166 | 1.50 | * |
| 869 : | dl | 1.18 | * @return the head of this queue, or <tt>null</tt> if the |
| 870 : | jsr166 | 1.50 | * specified waiting time elapses before an element is present. |
| 871 : | * @throws InterruptedException {@inheritDoc} | ||
| 872 : | dholmes | 1.11 | */ |
| 873 : | dl | 1.2 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| 874 : | dl | 1.55 | Object e = transferer.transfer(null, true, unit.toNanos(timeout)); |
| 875 : | if (e != null || !Thread.interrupted()) | ||
| 876 : | return (E)e; | ||
| 877 : | throw new InterruptedException(); | ||
| 878 : | tim | 1.1 | } |
| 879 : | dl | 1.2 | |
| 880 : | dl | 1.18 | /** |
| 881 : | * Retrieves and removes the head of this queue, if another thread | ||
| 882 : | * is currently making an element available. | ||
| 883 : | * | ||
| 884 : | * @return the head of this queue, or <tt>null</tt> if no | ||
| 885 : | * element is available. | ||
| 886 : | */ | ||
| 887 : | dl | 1.2 | public E poll() { |
| 888 : | dl | 1.55 | return (E)transferer.transfer(null, true, 0); |
| 889 : | tim | 1.1 | } |
| 890 : | dl | 1.2 | |
| 891 : | dl | 1.5 | /** |
| 892 : | jsr166 | 1.48 | * Always returns <tt>true</tt>. |
| 893 : | dholmes | 1.11 | * A <tt>SynchronousQueue</tt> has no internal capacity. |
| 894 : | jsr166 | 1.63 | * |
| 895 : | dholmes | 1.11 | * @return <tt>true</tt> |
| 896 : | dl | 1.5 | */ |
| 897 : | public boolean isEmpty() { | ||
| 898 : | return true; | ||
| 899 : | } | ||
| 900 : | |||
| 901 : | /** | ||
| 902 : | dholmes | 1.11 | * Always returns zero. |
| 903 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 904 : | jsr166 | 1.63 | * |
| 905 : | dl | 1.55 | * @return zero. |
| 906 : | dl | 1.5 | */ |
| 907 : | public int size() { | ||
| 908 : | return 0; | ||
| 909 : | tim | 1.1 | } |
| 910 : | dl | 1.2 | |
| 911 : | dl | 1.5 | /** |
| 912 : | dholmes | 1.11 | * Always returns zero. |
| 913 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 914 : | jsr166 | 1.63 | * |
| 915 : | dl | 1.55 | * @return zero. |
| 916 : | dl | 1.5 | */ |
| 917 : | public int remainingCapacity() { | ||
| 918 : | return 0; | ||
| 919 : | } | ||
| 920 : | |||
| 921 : | /** | ||
| 922 : | dholmes | 1.11 | * Does nothing. |
| 923 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 924 : | */ | ||
| 925 : | dl | 1.55 | public void clear() { |
| 926 : | } | ||
| 927 : | dholmes | 1.11 | |
| 928 : | /** | ||
| 929 : | * Always returns <tt>false</tt>. | ||
| 930 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 931 : | jsr166 | 1.63 | * |
| 932 : | dl | 1.55 | * @param o the element |
| 933 : | dholmes | 1.11 | * @return <tt>false</tt> |
| 934 : | */ | ||
| 935 : | public boolean contains(Object o) { | ||
| 936 : | return false; | ||
| 937 : | } | ||
| 938 : | |||
| 939 : | /** | ||
| 940 : | dl | 1.18 | * Always returns <tt>false</tt>. |
| 941 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 942 : | * | ||
| 943 : | * @param o the element to remove | ||
| 944 : | * @return <tt>false</tt> | ||
| 945 : | */ | ||
| 946 : | public boolean remove(Object o) { | ||
| 947 : | return false; | ||
| 948 : | } | ||
| 949 : | |||
| 950 : | /** | ||
| 951 : | jsr166 | 1.59 | * Returns <tt>false</tt> unless the given collection is empty. |
| 952 : | dholmes | 1.11 | * A <tt>SynchronousQueue</tt> has no internal capacity. |
| 953 : | jsr166 | 1.63 | * |
| 954 : | dl | 1.18 | * @param c the collection |
| 955 : | dl | 1.55 | * @return <tt>false</tt> unless given collection is empty |
| 956 : | dholmes | 1.11 | */ |
| 957 : | dl | 1.12 | public boolean containsAll(Collection<?> c) { |
| 958 : | dl | 1.16 | return c.isEmpty(); |
| 959 : | dholmes | 1.11 | } |
| 960 : | |||
| 961 : | /** | ||
| 962 : | * Always returns <tt>false</tt>. | ||
| 963 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 964 : | jsr166 | 1.63 | * |
| 965 : | dl | 1.18 | * @param c the collection |
| 966 : | dholmes | 1.11 | * @return <tt>false</tt> |
| 967 : | */ | ||
| 968 : | dl | 1.12 | public boolean removeAll(Collection<?> c) { |
| 969 : | dholmes | 1.11 | return false; |
| 970 : | } | ||
| 971 : | |||
| 972 : | /** | ||
| 973 : | * Always returns <tt>false</tt>. | ||
| 974 : | * A <tt>SynchronousQueue</tt> has no internal capacity. | ||
| 975 : | jsr166 | 1.63 | * |
| 976 : | dl | 1.18 | * @param c the collection |
| 977 : | dholmes | 1.11 | * @return <tt>false</tt> |
| 978 : | */ | ||
| 979 : | dl | 1.12 | public boolean retainAll(Collection<?> c) { |
| 980 : | dholmes | 1.11 | return false; |
| 981 : | } | ||
| 982 : | |||
| 983 : | /** | ||
| 984 : | jsr166 | 1.48 | * Always returns <tt>null</tt>. |
| 985 : | dholmes | 1.11 | * A <tt>SynchronousQueue</tt> does not return elements |
| 986 : | dl | 1.5 | * unless actively waited on. |
| 987 : | jsr166 | 1.63 | * |
| 988 : | dholmes | 1.11 | * @return <tt>null</tt> |
| 989 : | dl | 1.5 | */ |
| 990 : | public E peek() { | ||
| 991 : | return null; | ||
| 992 : | } | ||
| 993 : | |||
| 994 : | static class EmptyIterator<E> implements Iterator<E> { | ||
| 995 : | dl | 1.2 | public boolean hasNext() { |
| 996 : | return false; | ||
| 997 : | } | ||
| 998 : | public E next() { | ||
| 999 : | throw new NoSuchElementException(); | ||
| 1000 : | } | ||
| 1001 : | public void remove() { | ||
| 1002 : | dl | 1.17 | throw new IllegalStateException(); |
| 1003 : | dl | 1.2 | } |
| 1004 : | tim | 1.1 | } |
| 1005 : | dl | 1.2 | |
| 1006 : | dl | 1.5 | /** |
| 1007 : | dl | 1.18 | * Returns an empty iterator in which <tt>hasNext</tt> always returns |
| 1008 : | tim | 1.13 | * <tt>false</tt>. |
| 1009 : | * | ||
| 1010 : | dholmes | 1.11 | * @return an empty iterator |
| 1011 : | dl | 1.5 | */ |
| 1012 : | dl | 1.2 | public Iterator<E> iterator() { |
| 1013 : | dl | 1.5 | return new EmptyIterator<E>(); |
| 1014 : | tim | 1.1 | } |
| 1015 : | |||
| 1016 : | dl | 1.5 | /** |
| 1017 : | dholmes | 1.11 | * Returns a zero-length array. |
| 1018 : | * @return a zero-length array | ||
| 1019 : | dl | 1.5 | */ |
| 1020 : | dl | 1.3 | public Object[] toArray() { |
| 1021 : | dl | 1.25 | return new Object[0]; |
| 1022 : | tim | 1.1 | } |
| 1023 : | |||
| 1024 : | dholmes | 1.11 | /** |
| 1025 : | * Sets the zeroeth element of the specified array to <tt>null</tt> | ||
| 1026 : | * (if the array has non-zero length) and returns it. | ||
| 1027 : | jsr166 | 1.50 | * |
| 1028 : | dl | 1.40 | * @param a the array |
| 1029 : | dholmes | 1.11 | * @return the specified array |
| 1030 : | jsr166 | 1.50 | * @throws NullPointerException if the specified array is null |
| 1031 : | dholmes | 1.11 | */ |
| 1032 : | dl | 1.2 | public <T> T[] toArray(T[] a) { |
| 1033 : | if (a.length > 0) | ||
| 1034 : | a[0] = null; | ||
| 1035 : | return a; | ||
| 1036 : | } | ||
| 1037 : | dl | 1.21 | |
| 1038 : | jsr166 | 1.50 | /** |
| 1039 : | * @throws UnsupportedOperationException {@inheritDoc} | ||
| 1040 : | * @throws ClassCastException {@inheritDoc} | ||
| 1041 : | * @throws NullPointerException {@inheritDoc} | ||
| 1042 : | * @throws IllegalArgumentException {@inheritDoc} | ||
| 1043 : | */ | ||
| 1044 : | dl | 1.21 | public int drainTo(Collection<? super E> c) { |
| 1045 : | if (c == null) | ||
| 1046 : | throw new NullPointerException(); | ||
| 1047 : | if (c == this) | ||
| 1048 : | throw new IllegalArgumentException(); | ||
| 1049 : | int n = 0; | ||
| 1050 : | E e; | ||
| 1051 : | while ( (e = poll()) != null) { | ||
| 1052 : | c.add(e); | ||
| 1053 : | ++n; | ||
| 1054 : | } | ||
| 1055 : | return n; | ||
| 1056 : | } | ||
| 1057 : | |||
| 1058 : | jsr166 | 1.50 | /** |
| 1059 : | * @throws UnsupportedOperationException {@inheritDoc} | ||
| 1060 : | * @throws ClassCastException {@inheritDoc} | ||
| 1061 : | * @throws NullPointerException {@inheritDoc} | ||
| 1062 : | * @throws IllegalArgumentException {@inheritDoc} | ||
| 1063 : | */ | ||
| 1064 : | dl | 1.21 | public int drainTo(Collection<? super E> c, int maxElements) { |
| 1065 : | if (c == null) | ||
| 1066 : | throw new NullPointerException(); | ||
| 1067 : | if (c == this) | ||
| 1068 : | throw new IllegalArgumentException(); | ||
| 1069 : | int n = 0; | ||
| 1070 : | E e; | ||
| 1071 : | while (n < maxElements && (e = poll()) != null) { | ||
| 1072 : | c.add(e); | ||
| 1073 : | ++n; | ||
| 1074 : | } | ||
| 1075 : | return n; | ||
| 1076 : | } | ||
| 1077 : | dl | 1.55 | |
| 1078 : | /* | ||
| 1079 : | * To cope with serialization strategy in the 1.5 version of | ||
| 1080 : | * SynchronousQueue, we declare some unused classes and fields | ||
| 1081 : | * that exist solely to enable serializability across versions. | ||
| 1082 : | * These fields are never used, so are initialized only if this | ||
| 1083 : | * object is ever serialized or deserialized. | ||
| 1084 : | */ | ||
| 1085 : | |||
| 1086 : | static class WaitQueue implements java.io.Serializable { } | ||
| 1087 : | static class LifoWaitQueue extends WaitQueue { | ||
| 1088 : | private static final long serialVersionUID = -3633113410248163686L; | ||
| 1089 : | } | ||
| 1090 : | static class FifoWaitQueue extends WaitQueue { | ||
| 1091 : | private static final long serialVersionUID = -3623113410248163686L; | ||
| 1092 : | } | ||
| 1093 : | private ReentrantLock qlock; | ||
| 1094 : | private WaitQueue waitingProducers; | ||
| 1095 : | private WaitQueue waitingConsumers; | ||
| 1096 : | |||
| 1097 : | /** | ||
| 1098 : | * Save the state to a stream (that is, serialize it). | ||
| 1099 : | * | ||
| 1100 : | * @param s the stream | ||
| 1101 : | */ | ||
| 1102 : | private void writeObject(java.io.ObjectOutputStream s) | ||
| 1103 : | throws java.io.IOException { | ||
| 1104 : | boolean fair = transferer instanceof TransferQueue; | ||
| 1105 : | if (fair) { | ||
| 1106 : | qlock = new ReentrantLock(true); | ||
| 1107 : | waitingProducers = new FifoWaitQueue(); | ||
| 1108 : | waitingConsumers = new FifoWaitQueue(); | ||
| 1109 : | } | ||
| 1110 : | else { | ||
| 1111 : | qlock = new ReentrantLock(); | ||
| 1112 : | waitingProducers = new LifoWaitQueue(); | ||
| 1113 : | waitingConsumers = new LifoWaitQueue(); | ||
| 1114 : | } | ||
| 1115 : | s.defaultWriteObject(); | ||
| 1116 : | } | ||
| 1117 : | |||
| 1118 : | private void readObject(final java.io.ObjectInputStream s) | ||
| 1119 : | throws java.io.IOException, ClassNotFoundException { | ||
| 1120 : | s.defaultReadObject(); | ||
| 1121 : | if (waitingProducers instanceof FifoWaitQueue) | ||
| 1122 : | transferer = new TransferQueue(); | ||
| 1123 : | else | ||
| 1124 : | transferer = new TransferStack(); | ||
| 1125 : | } | ||
| 1126 : | |||
| 1127 : | tim | 1.1 | } |
| Doug Lea | ViewVC Help |
| Powered by ViewVC 1.0.8 |