| 1 |
/* |
/* |
| 2 |
* Written by Doug Lea with assistance from members of JCP JSR-166 |
* Written by Doug Lea, Bill Scherer, and Michael Scott with |
| 3 |
* Expert Group and released to the public domain, as explained at |
* assistance from members of JCP JSR-166 Expert Group and released to |
| 4 |
|
* the public domain, as explained at |
| 5 |
* http://creativecommons.org/licenses/publicdomain |
* http://creativecommons.org/licenses/publicdomain |
| 6 |
*/ |
*/ |
| 7 |
|
|
| 8 |
package java.util.concurrent; |
package java.util.concurrent; |
| 9 |
import java.util.concurrent.locks.*; |
import java.util.concurrent.locks.*; |
| 10 |
|
import java.util.concurrent.atomic.*; |
| 11 |
import java.util.*; |
import java.util.*; |
| 12 |
|
|
| 13 |
/** |
/** |
| 36 |
* <p> This class supports an optional fairness policy for ordering |
* <p> This class supports an optional fairness policy for ordering |
| 37 |
* waiting producer and consumer threads. By default, this ordering |
* waiting producer and consumer threads. By default, this ordering |
| 38 |
* is not guaranteed. However, a queue constructed with fairness set |
* is not guaranteed. However, a queue constructed with fairness set |
| 39 |
* to <tt>true</tt> grants threads access in FIFO order. Fairness |
* to <tt>true</tt> grants threads access in FIFO order. |
|
* generally decreases throughput but reduces variability and avoids |
|
|
* starvation. |
|
| 40 |
* |
* |
| 41 |
* <p>This class and its iterator implement all of the |
* <p>This class and its iterator implement all of the |
| 42 |
* <em>optional</em> methods of the {@link Collection} and {@link |
* <em>optional</em> methods of the {@link Collection} and {@link |
| 55 |
private static final long serialVersionUID = -3223113410248163686L; |
private static final long serialVersionUID = -3223113410248163686L; |
| 56 |
|
|
| 57 |
/* |
/* |
| 58 |
This implementation divides actions into two cases for puts: |
* This class implements extensions of the dual stack and dual |
| 59 |
|
* queue algorithms described in "Nonblocking Concurrent Objects |
| 60 |
* An arriving producer that does not already have a waiting consumer |
* with Condition Synchronization", by W. N. Scherer III and |
| 61 |
creates a node holding item, and then waits for a consumer to take it. |
* M. L. Scott. 18th Annual Conf. on Distributed Computing, |
| 62 |
* An arriving producer that does already have a waiting consumer fills |
* Oct. 2004 (see also |
| 63 |
the slot node created by the consumer, and notifies it to continue. |
* http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). |
| 64 |
|
* The (Lifo) stack is used for non-fair mode, and the (Fifo) |
| 65 |
And symmetrically, two for takes: |
* queue for fair mode. The performance of the two is generally |
| 66 |
|
* similar. Fifo usually supports higher throughput under |
| 67 |
* An arriving consumer that does not already have a waiting producer |
* contention but Lifo maintains higher thread locality in common |
| 68 |
creates an empty slot node, and then waits for a producer to fill it. |
* applications. |
| 69 |
* An arriving consumer that does already have a waiting producer takes |
* |
| 70 |
item from the node created by the producer, and notifies it to continue. |
* A dual queue (and similarly stack) is one that at any given |
| 71 |
|
* time either holds "data" -- items provided by put operations, |
| 72 |
When a put or take waiting for the actions of its counterpart |
* or "requests" -- slots representing take operations, or is |
| 73 |
aborts due to interruption or timeout, it marks the node |
* empty. A call to "fulfill" (i.e., a call requesting an item |
| 74 |
it created as "CANCELLED", which causes its counterpart to retry |
* from a queue holding data or vice versa) dequeues a |
| 75 |
the entire put or take sequence. |
* complementary node. The most interesting feature of these |
| 76 |
|
* queues is that any operation can figure out which mode the |
| 77 |
This requires keeping two simple queues, waitingProducers and |
* queue is in, and act accordingly without needing locks. |
| 78 |
waitingConsumers. Each of these can be FIFO (preserves fairness) |
* |
| 79 |
or LIFO (improves throughput). |
* Both the queue and stack extend abstract class Transferer |
| 80 |
*/ |
* defining the single method transfer that does a put or a |
| 81 |
|
* take. These are unified into a single method because in dual |
| 82 |
/** Lock protecting both wait queues */ |
* data structures, the put and take operations are symmetrical, |
| 83 |
private final ReentrantLock qlock; |
* so nearly all code can be combined. The resulting transfer |
| 84 |
/** Queue holding waiting puts */ |
* methods are on the long side, but are easier to follow than |
| 85 |
private final WaitQueue waitingProducers; |
* they would be if broken up into nearly-duplicated parts. |
| 86 |
/** Queue holding waiting takes */ |
* |
| 87 |
private final WaitQueue waitingConsumers; |
* The queue and stack data structures share many conceptual |
| 88 |
|
* similarities but very few concrete details. For simplicity, |
| 89 |
|
* they are kept distinct so that they can later evolve |
| 90 |
|
* separately. |
| 91 |
|
* |
| 92 |
|
* The algorithms here differ from the versions in the above paper |
| 93 |
|
* in extending them for use in synchronous queues, as well as |
| 94 |
|
* dealing with cancellation. The main differences include: |
| 95 |
|
* |
| 96 |
|
* 1. The orginal algorithms used bit-marked pointers, but |
| 97 |
|
* the ones here use mode bits in nodes, leading to a number |
| 98 |
|
* of further adaptations. |
| 99 |
|
* 2. SynchronousQueues must block threads waiting to become |
| 100 |
|
* fulfilled. |
| 101 |
|
* 3. Nodes/threads that have been cancelled due to timeouts |
| 102 |
|
* or interruptions are cleaned out of the lists to |
| 103 |
|
* avoid garbage retention and memory depletion. |
| 104 |
|
* |
| 105 |
|
* Blocking is mainly accomplished using LockSupport park/unpark, |
| 106 |
|
* except that nodes that appear to be the next ones to become |
| 107 |
|
* fulfilled first spin a bit (on multiprocessors only). On very |
| 108 |
|
* busy synchronous queues, spinning can dramatically improve |
| 109 |
|
* throughput. And on less busy ones, the amount of spinning is |
| 110 |
|
* small enough not to be noticeable. |
| 111 |
|
* |
| 112 |
|
* Cleaning is done in different ways in queues vs stacks. For |
| 113 |
|
* queues, we can almost always remove a node immediately in O(1) |
| 114 |
|
* time (modulo retries for consistency checks) when it is |
| 115 |
|
* cancelled. But if it may be pinned as the current tail, it must |
| 116 |
|
* wait until some subsequent cancellation. For stacks, we need a |
| 117 |
|
* potentially O(n) traversal to be sure that we can remove the |
| 118 |
|
* node, but this can run concurrently with other threads |
| 119 |
|
* accessing the stack. |
| 120 |
|
* |
| 121 |
|
* While garbage collection takes care of most node reclamation |
| 122 |
|
* issues that otherwise complicate nonblocking algorithms, care |
| 123 |
|
* is made to "forget" references to data, other nodes, and |
| 124 |
|
* threads that might be held on to long-term by blocked |
| 125 |
|
* threads. In cases where setting to null would otherwise |
| 126 |
|
* conflict with main algorithms, this is done by changing a |
| 127 |
|
* node's link to now point to the node itself. This doesn't arise |
| 128 |
|
* much for Stack nodes (because blocked threads do not hang on to |
| 129 |
|
* old head pointers), but references in Queue nodes must be |
| 130 |
|
* agressively forgotten to avoid reachability of everything any |
| 131 |
|
* node has ever referred to since arrival. |
| 132 |
|
*/ |
| 133 |
|
|
| 134 |
|
/** |
| 135 |
|
* Shared internal API for dual stacks and queues. |
| 136 |
|
*/ |
| 137 |
|
static abstract class Transferer { |
| 138 |
|
/** |
| 139 |
|
* Perform a put or take. |
| 140 |
|
* @param e if non-null, the item to be handed to a consumer; |
| 141 |
|
* if null, requests that transfer return an item offered by |
| 142 |
|
* producer. |
| 143 |
|
* @param timed if this operation should timeout |
| 144 |
|
* @param nanos the timeout, in nanoseconds |
| 145 |
|
* @return if nonnull, the item provided or received; if null, |
| 146 |
|
* the operation failed due to timeout or interrupt -- the |
| 147 |
|
* caller can distinguish which of these occurred by checking |
| 148 |
|
* Thread.interrupted. |
| 149 |
|
*/ |
| 150 |
|
abstract Object transfer(Object e, boolean timed, long nanos); |
| 151 |
|
} |
| 152 |
|
|
| 153 |
|
/** The number of CPUs, for spin control */ |
| 154 |
|
static final int NCPUS = Runtime.getRuntime().availableProcessors(); |
| 155 |
|
|
| 156 |
|
/** |
| 157 |
|
* The number of times to spin before blocking in timed waits. |
| 158 |
|
* The value is empirically derived -- it works well across a |
| 159 |
|
* variety of processors and OSes. Emprically, the best value |
| 160 |
|
* seems not to vary with number of CPUs (beyond 2) so is just |
| 161 |
|
* a constant. |
| 162 |
|
*/ |
| 163 |
|
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; |
| 164 |
|
|
| 165 |
|
/** |
| 166 |
|
* The number of times to spin before blocking in untimed |
| 167 |
|
* waits. This is greater than timed value because untimed |
| 168 |
|
* waits spin faster since they don't need to check times on |
| 169 |
|
* each spin. |
| 170 |
|
*/ |
| 171 |
|
static final int maxUntimedSpins = maxTimedSpins * 16; |
| 172 |
|
|
| 173 |
/** |
/** |
| 174 |
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy. |
* The number of nanoseconds for which it is faster to spin |
| 175 |
|
* rather than to use timed park. A rough estimate suffices. |
| 176 |
*/ |
*/ |
| 177 |
public SynchronousQueue() { |
static final long spinForTimeoutThreshold = 1000L; |
| 178 |
this(false); |
|
| 179 |
|
/** Dual stack */ |
| 180 |
|
static final class TransferStack extends Transferer { |
| 181 |
|
/* |
| 182 |
|
* This extends Scherer-Scott dual stack algorithm, differing, |
| 183 |
|
* among other ways, by using "covering" nodes rather than |
| 184 |
|
* bit-marked pointers: Fulfilling operations push on marker |
| 185 |
|
* nodes (with FULFILLING bit set in mode) to reserve a spot |
| 186 |
|
* to match a waiting node. |
| 187 |
|
*/ |
| 188 |
|
|
| 189 |
|
/* Modes for SNodes, ORed together in node fields */ |
| 190 |
|
/** Node represents an unfulfilled consumer */ |
| 191 |
|
static final int REQUEST = 0; |
| 192 |
|
/** Node represents an unfulfilled producer */ |
| 193 |
|
static final int DATA = 1; |
| 194 |
|
/** Node is fulfilling another unfulfilled DATA or REQUEST */ |
| 195 |
|
static final int FULFILLING = 2; |
| 196 |
|
|
| 197 |
|
/** Return true if m has fulfilling bit set */ |
| 198 |
|
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } |
| 199 |
|
|
| 200 |
|
/** Node class for TransferStacks. */ |
| 201 |
|
static final class SNode { |
| 202 |
|
volatile SNode next; // next node in stack |
| 203 |
|
volatile SNode match; // the node matched to this |
| 204 |
|
volatile Thread waiter; // to control park/unpark |
| 205 |
|
Object item; // data; or null for REQUESTs |
| 206 |
|
int mode; |
| 207 |
|
// Note: item and mode fields don't need to be volatile |
| 208 |
|
// since they are always written before, and read after, |
| 209 |
|
// other volatile/atomic operations. |
| 210 |
|
|
| 211 |
|
SNode(Object item) { |
| 212 |
|
this.item = item; |
| 213 |
|
} |
| 214 |
|
|
| 215 |
|
static final AtomicReferenceFieldUpdater<SNode, SNode> |
| 216 |
|
nextUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 217 |
|
(SNode.class, SNode.class, "next"); |
| 218 |
|
|
| 219 |
|
boolean casNext(SNode cmp, SNode val) { |
| 220 |
|
return (cmp == next && |
| 221 |
|
nextUpdater.compareAndSet(this, cmp, val)); |
| 222 |
|
} |
| 223 |
|
|
| 224 |
|
static final AtomicReferenceFieldUpdater<SNode, SNode> |
| 225 |
|
matchUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 226 |
|
(SNode.class, SNode.class, "match"); |
| 227 |
|
|
| 228 |
|
/** |
| 229 |
|
* Try to match node s to this node, if so, waking up |
| 230 |
|
* thread. Fulfillers call tryMatch to identify their |
| 231 |
|
* waiters. Waiters block until they have been |
| 232 |
|
* matched. |
| 233 |
|
* @param s the node to match |
| 234 |
|
* @return true if successfully matched to s |
| 235 |
|
*/ |
| 236 |
|
boolean tryMatch(SNode s) { |
| 237 |
|
if (match == null && |
| 238 |
|
matchUpdater.compareAndSet(this, null, s)) { |
| 239 |
|
Thread w = waiter; |
| 240 |
|
if (w != null) { // waiters need at most one unpark |
| 241 |
|
waiter = null; |
| 242 |
|
LockSupport.unpark(w); |
| 243 |
|
} |
| 244 |
|
return true; |
| 245 |
|
} |
| 246 |
|
return match == s; |
| 247 |
} |
} |
| 248 |
|
|
| 249 |
/** |
/** |
| 250 |
* Creates a <tt>SynchronousQueue</tt> with specified fairness policy. |
* Try to cancel a wait by matching node to itself. |
|
* @param fair if true, threads contend in FIFO order for access; |
|
|
* otherwise the order is unspecified. |
|
| 251 |
*/ |
*/ |
| 252 |
public SynchronousQueue(boolean fair) { |
void tryCancel() { |
| 253 |
if (fair) { |
matchUpdater.compareAndSet(this, null, this); |
| 254 |
qlock = new ReentrantLock(true); |
} |
| 255 |
waitingProducers = new FifoWaitQueue(); |
|
| 256 |
waitingConsumers = new FifoWaitQueue(); |
boolean isCancelled() { |
| 257 |
|
return match == this; |
| 258 |
} |
} |
|
else { |
|
|
qlock = new ReentrantLock(); |
|
|
waitingProducers = new LifoWaitQueue(); |
|
|
waitingConsumers = new LifoWaitQueue(); |
|
| 259 |
} |
} |
| 260 |
|
|
| 261 |
|
/** The head (top) of the stack */ |
| 262 |
|
volatile SNode head; |
| 263 |
|
|
| 264 |
|
static final AtomicReferenceFieldUpdater<TransferStack, SNode> |
| 265 |
|
headUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 266 |
|
(TransferStack.class, SNode.class, "head"); |
| 267 |
|
|
| 268 |
|
boolean casHead(SNode h, SNode nh) { |
| 269 |
|
return h == head && headUpdater.compareAndSet(this, h, nh); |
| 270 |
} |
} |
| 271 |
|
|
| 272 |
/** |
/** |
| 273 |
* Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. |
* Create or reset fields of a node. Called only from transfer |
| 274 |
* These queues have all transient fields, but are serializable |
* where the node to push on stack is lazily created and |
| 275 |
* in order to recover fairness settings when deserialized. |
* reused when possible to help reduce intervals between reads |
| 276 |
|
* and CASes of head and to avoid surges of garbage when CASes |
| 277 |
|
* to push nodes fail due to contention. |
| 278 |
*/ |
*/ |
| 279 |
static abstract class WaitQueue implements java.io.Serializable { |
static SNode snode(SNode s, Object e, SNode next, int mode) { |
| 280 |
/** Creates, adds, and returns node for x. */ |
if (s == null) s = new SNode(e); |
| 281 |
abstract Node enq(Object x); |
s.mode = mode; |
| 282 |
/** Removes and returns node, or null if empty. */ |
s.next = next; |
| 283 |
abstract Node deq(); |
return s; |
|
/** Removes a cancelled node to avoid garbage retention. */ |
|
|
abstract void unlink(Node node); |
|
|
/** Returns true if a cancelled node might be on queue. */ |
|
|
abstract boolean shouldUnlink(Node node); |
|
| 284 |
} |
} |
| 285 |
|
|
| 286 |
/** |
/** |
| 287 |
* FIFO queue to hold waiting puts/takes. |
* Put or take an item. |
| 288 |
|
*/ |
| 289 |
|
Object transfer(Object e, boolean timed, long nanos) { |
| 290 |
|
/* |
| 291 |
|
* Basic algorithm is to loop trying one of three actions: |
| 292 |
|
* |
| 293 |
|
* 1. If apparently empty or already containing nodes of same |
| 294 |
|
* mode, try to push node on stack and wait for a match, |
| 295 |
|
* returning it, or null if cancelled. |
| 296 |
|
* |
| 297 |
|
* 2. If apparently containing node of complementary mode, |
| 298 |
|
* try to push a fulfilling node on to stack, match |
| 299 |
|
* with corresponding waiting node, pop both from |
| 300 |
|
* stack, and return matched item. The matching or |
| 301 |
|
* unlinking might not actually be necessary because of |
| 302 |
|
* another threads performing action 3: |
| 303 |
|
* |
| 304 |
|
* 3. If top of stack already holds another fulfilling node, |
| 305 |
|
* help it out by doing its match and/or pop |
| 306 |
|
* operations, and then continue. The code for helping |
| 307 |
|
* is essentially the same as for fulfilling, except |
| 308 |
|
* that it doesn't return the item. |
| 309 |
*/ |
*/ |
|
static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable { |
|
|
private static final long serialVersionUID = -3623113410248163686L; |
|
|
private transient Node head; |
|
|
private transient Node last; |
|
| 310 |
|
|
| 311 |
Node enq(Object x) { |
SNode s = null; // constructed/reused as needed |
| 312 |
Node p = new Node(x); |
int mode = (e == null)? REQUEST : DATA; |
| 313 |
if (last == null) |
|
| 314 |
last = head = p; |
for (;;) { |
| 315 |
|
SNode h = head; |
| 316 |
|
if (h == null || h.mode == mode) { // empty or same-mode |
| 317 |
|
if (timed && nanos <= 0) { // can't wait |
| 318 |
|
if (h != null && h.isCancelled()) |
| 319 |
|
casHead(h, h.next); // pop cancelled node |
| 320 |
else |
else |
| 321 |
last = last.next = p; |
return null; |
| 322 |
return p; |
} else if (casHead(h, s = snode(s, e, h, mode))) { |
| 323 |
|
SNode m = awaitFulfill(s, timed, nanos); |
| 324 |
|
if (m == s) { // wait was cancelled |
| 325 |
|
clean(s); |
| 326 |
|
return null; |
| 327 |
|
} |
| 328 |
|
if ((h = head) != null && h.next == s) |
| 329 |
|
casHead(h, s.next); // help s's fulfiller |
| 330 |
|
return mode == REQUEST? m.item : s.item; |
| 331 |
|
} |
| 332 |
|
} else if (!isFulfilling(h.mode)) { // try to fulfill |
| 333 |
|
if (h.isCancelled()) // already cancelled |
| 334 |
|
casHead(h, h.next); // pop and retry |
| 335 |
|
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { |
| 336 |
|
for (;;) { // loop until matched or waiters disappear |
| 337 |
|
SNode m = s.next; // m is s's match |
| 338 |
|
if (m == null) { // all waiters are gone |
| 339 |
|
casHead(s, null); // pop fulfill node |
| 340 |
|
s = null; // use new node next time |
| 341 |
|
break; // restart main loop |
| 342 |
|
} |
| 343 |
|
SNode mn = m.next; |
| 344 |
|
if (m.tryMatch(s)) { |
| 345 |
|
casHead(s, mn); // pop both s and m |
| 346 |
|
return (mode == REQUEST)? m.item : s.item; |
| 347 |
|
} else // lost match |
| 348 |
|
s.casNext(m, mn); // help unlink |
| 349 |
|
} |
| 350 |
|
} |
| 351 |
|
} else { // help a fulfiller |
| 352 |
|
SNode m = h.next; // m is h's match |
| 353 |
|
if (m == null) // waiter is gone |
| 354 |
|
casHead(h, null); // pop fulfilling node |
| 355 |
|
else { |
| 356 |
|
SNode mn = m.next; |
| 357 |
|
if (m.tryMatch(h)) // help match |
| 358 |
|
casHead(h, mn); // pop both h and m |
| 359 |
|
else // lost match |
| 360 |
|
h.casNext(m, mn); // help unlink |
| 361 |
} |
} |
|
|
|
|
Node deq() { |
|
|
Node p = head; |
|
|
if (p != null) { |
|
|
if ((head = p.next) == null) |
|
|
last = null; |
|
|
p.next = null; |
|
| 362 |
} |
} |
|
return p; |
|
| 363 |
} |
} |
|
|
|
|
boolean shouldUnlink(Node node) { |
|
|
return (node == last || node.next != null); |
|
| 364 |
} |
} |
| 365 |
|
|
| 366 |
void unlink(Node node) { |
/** |
| 367 |
Node p = head; |
* Spin/block until node s is matched by a fulfill operation. |
| 368 |
Node trail = null; |
* @param s the waiting node |
| 369 |
while (p != null) { |
* @param timed true if timed wait |
| 370 |
if (p == node) { |
* @param nanos timeout value |
| 371 |
Node next = p.next; |
* @return matched node, or s if cancelled |
| 372 |
if (trail == null) |
*/ |
| 373 |
head = next; |
SNode awaitFulfill(SNode s, boolean timed, long nanos) { |
| 374 |
else |
/* |
| 375 |
trail.next = next; |
* When a node/thread is about to block, it sets its waiter |
| 376 |
if (last == node) |
* field and then rechecks state at least one more time |
| 377 |
last = trail; |
* before actually parking, thus covering race vs |
| 378 |
break; |
* fulfiller noticing that waiter is nonnull so should be |
| 379 |
|
* woken. |
| 380 |
|
* |
| 381 |
|
* When invoked by nodes that appear at the point of call |
| 382 |
|
* to be at the head of the stack, calls to park are |
| 383 |
|
* preceded by spins to avoid blocking when producers and |
| 384 |
|
* consumers are arriving very close in time. This can |
| 385 |
|
* happen enough to bother only on multiprocessors. |
| 386 |
|
* |
| 387 |
|
* The order of checks for returning out of main loop |
| 388 |
|
* reflects fact that interrupts have precedence over |
| 389 |
|
* normal returns, which have precedence over |
| 390 |
|
* timeouts. (So, on timeout, one last check for match is |
| 391 |
|
* done before giving up.) Except that calls from untimed |
| 392 |
|
* SynchronousQueue.{poll/offer} don't check interrupts |
| 393 |
|
* and don't wait at all, so are trapped in transfer |
| 394 |
|
* method rather than calling awaitFulfill. |
| 395 |
|
*/ |
| 396 |
|
long lastTime = (timed)? System.nanoTime() : 0; |
| 397 |
|
Thread w = Thread.currentThread(); |
| 398 |
|
SNode h = head; |
| 399 |
|
int spins = (shouldSpin(s)? |
| 400 |
|
(timed? maxTimedSpins : maxUntimedSpins) : 0); |
| 401 |
|
for (;;) { |
| 402 |
|
if (w.isInterrupted()) |
| 403 |
|
s.tryCancel(); |
| 404 |
|
SNode m = s.match; |
| 405 |
|
if (m != null) |
| 406 |
|
return m; |
| 407 |
|
if (timed) { |
| 408 |
|
long now = System.nanoTime(); |
| 409 |
|
nanos -= now - lastTime; |
| 410 |
|
lastTime = now; |
| 411 |
|
if (nanos <= 0) { |
| 412 |
|
s.tryCancel(); |
| 413 |
|
continue; |
| 414 |
} |
} |
|
trail = p; |
|
|
p = p.next; |
|
| 415 |
} |
} |
| 416 |
|
if (spins > 0) |
| 417 |
|
spins = shouldSpin(s)? (spins-1) : 0; |
| 418 |
|
else if (s.waiter == null) |
| 419 |
|
s.waiter = w; // establish waiter so can park next iter |
| 420 |
|
else if (!timed) |
| 421 |
|
LockSupport.park(this); |
| 422 |
|
else if (nanos > spinForTimeoutThreshold) |
| 423 |
|
LockSupport.parkNanos(this, nanos); |
| 424 |
} |
} |
| 425 |
} |
} |
| 426 |
|
|
| 427 |
/** |
/** |
| 428 |
* LIFO queue to hold waiting puts/takes. |
* Return true if node s is at head or there is an active |
| 429 |
|
* fulfiller. |
| 430 |
*/ |
*/ |
| 431 |
static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable { |
boolean shouldSpin(SNode s) { |
| 432 |
private static final long serialVersionUID = -3633113410248163686L; |
SNode h = head; |
| 433 |
private transient Node head; |
return (h == null || h == s || isFulfilling(h.mode)); |
|
|
|
|
Node enq(Object x) { |
|
|
return head = new Node(x, head); |
|
|
} |
|
|
|
|
|
Node deq() { |
|
|
Node p = head; |
|
|
if (p != null) { |
|
|
head = p.next; |
|
|
p.next = null; |
|
|
} |
|
|
return p; |
|
| 434 |
} |
} |
| 435 |
|
|
| 436 |
boolean shouldUnlink(Node node) { |
/** |
| 437 |
// Return false if already dequeued or is bottom node (in which |
* Unlink s from the stack |
| 438 |
// case we might retain at most one garbage node) |
*/ |
| 439 |
return (node == head || node.next != null); |
void clean(SNode s) { |
| 440 |
} |
s.item = null; // forget item |
| 441 |
|
s.waiter = null; // forget thread |
| 442 |
|
|
| 443 |
void unlink(Node node) { |
/* |
| 444 |
Node p = head; |
* At worst we may need to traverse entire stack to unlink |
| 445 |
Node trail = null; |
* s. If there are multiple concurrent calls to clean, we |
| 446 |
while (p != null) { |
* might not see s if another thread has already removed |
| 447 |
if (p == node) { |
* it. But we can stop when we see any node known to |
| 448 |
Node next = p.next; |
* follow s. We use s.next unless it too is cancelled, in |
| 449 |
if (trail == null) |
* which case we try the node one past. We don't check any |
| 450 |
head = next; |
* futher because we don't want to doubly traverse just to |
| 451 |
|
* find sentinel. |
| 452 |
|
*/ |
| 453 |
|
|
| 454 |
|
SNode past = s.next; |
| 455 |
|
if (past != null && past.isCancelled()) |
| 456 |
|
past = past.next; |
| 457 |
|
|
| 458 |
|
// Absorb cancelled nodes at head |
| 459 |
|
SNode p; |
| 460 |
|
while ((p = head) != null && p != past && p.isCancelled()) |
| 461 |
|
casHead(p, p.next); |
| 462 |
|
|
| 463 |
|
// Unsplice embedded nodes |
| 464 |
|
while (p != null && p != past) { |
| 465 |
|
SNode n = p.next; |
| 466 |
|
if (n != null && n.isCancelled()) |
| 467 |
|
p.casNext(n, n.next); |
| 468 |
else |
else |
| 469 |
trail.next = next; |
p = n; |
|
break; |
|
|
} |
|
|
trail = p; |
|
|
p = p.next; |
|
| 470 |
} |
} |
| 471 |
} |
} |
| 472 |
} |
} |
| 473 |
|
|
| 474 |
/** |
/** Dual Queue. */ |
| 475 |
* Unlinks the given node from consumer queue. Called by cancelled |
static final class TransferQueue extends Transferer { |
| 476 |
* (timeout, interrupt) waiters to avoid garbage retention in the |
/* |
| 477 |
* absence of producers. |
* This extends Scherer-Scott dual queue algorithm, differing, |
| 478 |
|
* among other ways, by using modes within nodes rather than |
| 479 |
|
* marked pointers. The algorithm is a little simpler than |
| 480 |
|
* that for stacks because fulfillers do not need explicit |
| 481 |
|
* nodes, and matching is done by CAS'ing QNode.item field |
| 482 |
|
* from nonnull to null (for put) or vice versa (for take). |
| 483 |
*/ |
*/ |
| 484 |
private void unlinkCancelledConsumer(Node node) { |
|
| 485 |
// Use a form of double-check to avoid unnecessary locking and |
/** Node class for TransferQueue. */ |
| 486 |
// traversal. The first check outside lock might |
static final class QNode { |
| 487 |
// conservatively report true. |
volatile QNode next; // next node in queue |
| 488 |
if (waitingConsumers.shouldUnlink(node)) { |
volatile Object item; // CAS'ed to or from null |
| 489 |
qlock.lock(); |
volatile Thread waiter; // to control park/unpark |
| 490 |
try { |
final boolean isData; |
| 491 |
if (waitingConsumers.shouldUnlink(node)) |
|
| 492 |
waitingConsumers.unlink(node); |
QNode(Object item, boolean isData) { |
| 493 |
} finally { |
this.item = item; |
| 494 |
qlock.unlock(); |
this.isData = isData; |
| 495 |
} |
} |
| 496 |
|
|
| 497 |
|
static final AtomicReferenceFieldUpdater<QNode, QNode> |
| 498 |
|
nextUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 499 |
|
(QNode.class, QNode.class, "next"); |
| 500 |
|
|
| 501 |
|
boolean casNext(QNode cmp, QNode val) { |
| 502 |
|
return (next == cmp && |
| 503 |
|
nextUpdater.compareAndSet(this, cmp, val)); |
| 504 |
} |
} |
| 505 |
|
|
| 506 |
|
static final AtomicReferenceFieldUpdater<QNode, Object> |
| 507 |
|
itemUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 508 |
|
(QNode.class, Object.class, "item"); |
| 509 |
|
|
| 510 |
|
boolean casItem(Object cmp, Object val) { |
| 511 |
|
return (item == cmp && |
| 512 |
|
itemUpdater.compareAndSet(this, cmp, val)); |
| 513 |
} |
} |
| 514 |
|
|
| 515 |
/** |
/** |
| 516 |
* Unlinks the given node from producer queue. Symmetric |
* Try to cancel by CAS'ing ref to this as item. |
|
* to unlinkCancelledConsumer. |
|
| 517 |
*/ |
*/ |
| 518 |
private void unlinkCancelledProducer(Node node) { |
void tryCancel(Object cmp) { |
| 519 |
if (waitingProducers.shouldUnlink(node)) { |
itemUpdater.compareAndSet(this, cmp, this); |
|
qlock.lock(); |
|
|
try { |
|
|
if (waitingProducers.shouldUnlink(node)) |
|
|
waitingProducers.unlink(node); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
| 520 |
} |
} |
| 521 |
|
|
| 522 |
|
boolean isCancelled() { |
| 523 |
|
return item == this; |
| 524 |
} |
} |
| 525 |
} |
} |
| 526 |
|
|
| 527 |
|
/** Head of queue */ |
| 528 |
|
transient volatile QNode head; |
| 529 |
|
/** Tail of queue */ |
| 530 |
|
transient volatile QNode tail; |
| 531 |
/** |
/** |
| 532 |
* Nodes each maintain an item and handle waits and signals for |
* Reference to a cancelled node that might not yet have been |
| 533 |
* getting and setting it. The class extends |
* unlinked from queue because it was the last inserted node |
| 534 |
* AbstractQueuedSynchronizer to manage blocking, using AQS state |
* when it cancelled. |
|
* 0 for waiting, 1 for ack, -1 for cancelled. |
|
| 535 |
*/ |
*/ |
| 536 |
static final class Node extends AbstractQueuedSynchronizer { |
transient volatile QNode cleanMe; |
|
private static final long serialVersionUID = -2631493897867746127L; |
|
| 537 |
|
|
| 538 |
/** Synchronization state value representing that node acked */ |
TransferQueue() { |
| 539 |
private static final int ACK = 1; |
QNode h = new QNode(null, false); // initialize to dummy node. |
| 540 |
/** Synchronization state value representing that node cancelled */ |
head = h; |
| 541 |
private static final int CANCEL = -1; |
tail = h; |
| 542 |
|
} |
|
/** The item being transferred */ |
|
|
Object item; |
|
|
/** Next node in wait queue */ |
|
|
Node next; |
|
|
|
|
|
/** Creates a node with initial item */ |
|
|
Node(Object x) { item = x; } |
|
| 543 |
|
|
| 544 |
/** Creates a node with initial item and next */ |
static final AtomicReferenceFieldUpdater<TransferQueue, QNode> |
| 545 |
Node(Object x, Node n) { item = x; next = n; } |
headUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 546 |
|
(TransferQueue.class, QNode.class, "head"); |
| 547 |
|
|
| 548 |
/** |
/** |
| 549 |
* Implements AQS base acquire to succeed if not in WAITING state |
* Try to cas nh as new head; if successful unlink |
| 550 |
|
* old head's next node to avoid garbage retention. |
| 551 |
*/ |
*/ |
| 552 |
protected boolean tryAcquire(int ignore) { |
void advanceHead(QNode h, QNode nh) { |
| 553 |
return getState() != 0; |
if (h == head && headUpdater.compareAndSet(this, h, nh)) |
| 554 |
|
h.next = h; // forget old next |
| 555 |
} |
} |
| 556 |
|
|
| 557 |
|
static final AtomicReferenceFieldUpdater<TransferQueue, QNode> |
| 558 |
|
tailUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 559 |
|
(TransferQueue.class, QNode.class, "tail"); |
| 560 |
|
|
| 561 |
/** |
/** |
| 562 |
* Implements AQS base release to signal if state changed |
* Try to cas nt as new tail. |
| 563 |
*/ |
*/ |
| 564 |
protected boolean tryRelease(int newState) { |
void advanceTail(QNode t, QNode nt) { |
| 565 |
return compareAndSetState(0, newState); |
if (tail == t) |
| 566 |
|
tailUpdater.compareAndSet(this, t, nt); |
| 567 |
} |
} |
| 568 |
|
|
| 569 |
|
static final AtomicReferenceFieldUpdater<TransferQueue, QNode> |
| 570 |
|
cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater |
| 571 |
|
(TransferQueue.class, QNode.class, "cleanMe"); |
| 572 |
|
|
| 573 |
/** |
/** |
| 574 |
* Takes item and nulls out field (for sake of GC) |
* Try to CAS cleanMe slot |
| 575 |
*/ |
*/ |
| 576 |
private Object extract() { |
boolean casCleanMe(QNode cmp, QNode val) { |
| 577 |
Object x = item; |
return (cleanMe == cmp && |
| 578 |
item = null; |
cleanMeUpdater.compareAndSet(this, cmp, val)); |
|
return x; |
|
| 579 |
} |
} |
| 580 |
|
|
| 581 |
/** |
/** |
| 582 |
* Tries to cancel on interrupt; if so rethrowing, |
* Put or take an item. |
|
* else setting interrupt state |
|
| 583 |
*/ |
*/ |
| 584 |
private void checkCancellationOnInterrupt(InterruptedException ie) |
Object transfer(Object e, boolean timed, long nanos) { |
| 585 |
throws InterruptedException { |
/* Basic algorithm is to loop trying to take either of |
| 586 |
if (release(CANCEL)) |
* two actions: |
| 587 |
throw ie; |
* |
| 588 |
Thread.currentThread().interrupt(); |
* 1. If queue apparently empty or holding same-mode nodes, |
| 589 |
|
* try to add node to queue of waiters, wait to be |
| 590 |
|
* fulfilled (or cancelled) and return matching item. |
| 591 |
|
* |
| 592 |
|
* 2. If queue apparently contains waiting items, and this |
| 593 |
|
* call is of complementary mode, try to fulfill by CAS'ing |
| 594 |
|
* item field of waiting node and dequeuing it, and then |
| 595 |
|
* returning matching item. |
| 596 |
|
* |
| 597 |
|
* In each case, along the way, check for and try to help |
| 598 |
|
* advance head and tail on behalf of other stalled/slow |
| 599 |
|
* threads. |
| 600 |
|
* |
| 601 |
|
* The loop starts off with a null check guarding against |
| 602 |
|
* seeing uninitialized head or tail values. This never |
| 603 |
|
* happens in current SynchronousQueue, but could if |
| 604 |
|
* callers held non-volatile/final ref to the |
| 605 |
|
* transferer. The check is here anyway because it places |
| 606 |
|
* null checks at top of loop, which is usually faster |
| 607 |
|
* than having them implicitly interspersed. |
| 608 |
|
*/ |
| 609 |
|
|
| 610 |
|
QNode s = null; // constructed/reused as needed |
| 611 |
|
boolean isData = (e != null); |
| 612 |
|
|
| 613 |
|
for (;;) { |
| 614 |
|
QNode t = tail; |
| 615 |
|
QNode h = head; |
| 616 |
|
if (t == null || h == null) // saw unitialized values |
| 617 |
|
continue; // spin |
| 618 |
|
|
| 619 |
|
if (h == t || t.isData == isData) { // empty or same-mode |
| 620 |
|
QNode tn = t.next; |
| 621 |
|
if (t != tail) // inconsistent read |
| 622 |
|
continue; |
| 623 |
|
if (tn != null) { // lagging tail |
| 624 |
|
advanceTail(t, tn); |
| 625 |
|
continue; |
| 626 |
|
} |
| 627 |
|
if (timed && nanos <= 0) // can't wait |
| 628 |
|
return null; |
| 629 |
|
if (s == null) |
| 630 |
|
s = new QNode(e, isData); |
| 631 |
|
if (!t.casNext(null, s)) // failed to link in |
| 632 |
|
continue; |
| 633 |
|
|
| 634 |
|
advanceTail(t, s); // swing tail and wait |
| 635 |
|
Object x = awaitFulfill(s, e, timed, nanos); |
| 636 |
|
if (x == s) { // wait was cancelled |
| 637 |
|
clean(t, s); |
| 638 |
|
return null; |
| 639 |
} |
} |
| 640 |
|
|
| 641 |
/** |
if (s.next != s) { // not already unlinked |
| 642 |
* Fills in the slot created by the consumer and signal consumer to |
advanceHead(t, s); // unlink |
| 643 |
* continue. |
if (x != null) // and forget fields |
| 644 |
*/ |
s.item = s; |
| 645 |
boolean setItem(Object x) { |
s.waiter = null; |
|
item = x; // can place in slot even if cancelled |
|
|
return release(ACK); |
|
| 646 |
} |
} |
| 647 |
|
return (x != null)? x : e; |
| 648 |
|
|
| 649 |
/** |
} else { // complementary-mode |
| 650 |
* Removes item from slot created by producer and signal producer |
QNode m = h.next; // node to fulfill |
| 651 |
* to continue. |
if (t != tail || m == null || h != head) |
| 652 |
*/ |
continue; // inconsistent read |
| 653 |
Object getItem() { |
|
| 654 |
return (release(ACK))? extract() : null; |
Object x = m.item; |
| 655 |
|
if (isData == (x != null) || // m already fulfilled |
| 656 |
|
x == m || // m cancelled |
| 657 |
|
!m.casItem(x, e)) { // lost CAS |
| 658 |
|
advanceHead(h, m); // dequeue and retry |
| 659 |
|
continue; |
| 660 |
} |
} |
| 661 |
|
|
| 662 |
/** |
advanceHead(h, m); // successfully fulfilled |
| 663 |
* Waits for a consumer to take item placed by producer. |
LockSupport.unpark(m.waiter); |
| 664 |
*/ |
return (x != null)? x : e; |
| 665 |
void waitForTake() throws InterruptedException { |
} |
|
try { |
|
|
acquireInterruptibly(0); |
|
|
} catch (InterruptedException ie) { |
|
|
checkCancellationOnInterrupt(ie); |
|
| 666 |
} |
} |
| 667 |
} |
} |
| 668 |
|
|
| 669 |
/** |
/** |
| 670 |
* Waits for a producer to put item placed by consumer. |
* Spin/block until node s is fulfilled. |
| 671 |
|
* @param s the waiting node |
| 672 |
|
* @param e the comparison value for checking match |
| 673 |
|
* @param timed true if timed wait |
| 674 |
|
* @param nanos timeout value |
| 675 |
|
* @return matched item, or s if cancelled |
| 676 |
*/ |
*/ |
| 677 |
Object waitForPut() throws InterruptedException { |
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { |
| 678 |
try { |
/* Same idea as TransferStack.awaitFulfill */ |
| 679 |
acquireInterruptibly(0); |
long lastTime = (timed)? System.nanoTime() : 0; |
| 680 |
} catch (InterruptedException ie) { |
Thread w = Thread.currentThread(); |
| 681 |
checkCancellationOnInterrupt(ie); |
int spins = ((head.next == s) ? |
| 682 |
|
(timed? maxTimedSpins : maxUntimedSpins) : 0); |
| 683 |
|
for (;;) { |
| 684 |
|
if (w.isInterrupted()) |
| 685 |
|
s.tryCancel(e); |
| 686 |
|
Object x = s.item; |
| 687 |
|
if (x != e) |
| 688 |
|
return x; |
| 689 |
|
if (timed) { |
| 690 |
|
long now = System.nanoTime(); |
| 691 |
|
nanos -= now - lastTime; |
| 692 |
|
lastTime = now; |
| 693 |
|
if (nanos <= 0) { |
| 694 |
|
s.tryCancel(e); |
| 695 |
|
continue; |
| 696 |
|
} |
| 697 |
|
} |
| 698 |
|
if (spins > 0) |
| 699 |
|
--spins; |
| 700 |
|
else if (s.waiter == null) |
| 701 |
|
s.waiter = w; |
| 702 |
|
else if (!timed) |
| 703 |
|
LockSupport.park(this); |
| 704 |
|
else if (nanos > spinForTimeoutThreshold) |
| 705 |
|
LockSupport.parkNanos(this, nanos); |
| 706 |
} |
} |
|
return extract(); |
|
| 707 |
} |
} |
| 708 |
|
|
| 709 |
/** |
/** |
| 710 |
* Waits for a consumer to take item placed by producer or time out. |
* Get rid of cancelled node s with original predecessor pred. |
| 711 |
*/ |
*/ |
| 712 |
boolean waitForTake(long nanos) throws InterruptedException { |
void clean(QNode pred, QNode s) { |
| 713 |
try { |
s.waiter = null; // forget thread |
| 714 |
if (!tryAcquireNanos(0, nanos) && |
/* |
| 715 |
release(CANCEL)) |
* At any given time, exactly one node on list cannot be |
| 716 |
return false; |
* deleted -- the last inserted node. To accommodate this, |
| 717 |
} catch (InterruptedException ie) { |
* if we cannot delete s, we save its predecessor as |
| 718 |
checkCancellationOnInterrupt(ie); |
* "cleanMe", deleting the previously saved version |
| 719 |
|
* first. At least one of node s or the node previously |
| 720 |
|
* saved can always be deleted, so this always terminates. |
| 721 |
|
*/ |
| 722 |
|
while (pred.next == s) { // Return early if already unlinked |
| 723 |
|
QNode h = head; |
| 724 |
|
QNode hn = h.next; // Absorb cancelled first node as head |
| 725 |
|
if (hn != null && hn.isCancelled()) { |
| 726 |
|
advanceHead(h, hn); |
| 727 |
|
continue; |
| 728 |
|
} |
| 729 |
|
QNode t = tail; // Ensure consistent read for tail |
| 730 |
|
if (t == h) |
| 731 |
|
return; |
| 732 |
|
QNode tn = t.next; |
| 733 |
|
if (t != tail) |
| 734 |
|
continue; |
| 735 |
|
if (tn != null) { |
| 736 |
|
advanceTail(t, tn); |
| 737 |
|
continue; |
| 738 |
|
} |
| 739 |
|
if (s != t) { // If not tail, try to unsplice |
| 740 |
|
QNode sn = s.next; |
| 741 |
|
if (sn == s || pred.casNext(s, sn)) |
| 742 |
|
return; |
| 743 |
|
} |
| 744 |
|
QNode dp = cleanMe; |
| 745 |
|
if (dp != null) { // Try unlinking previous cancelled node |
| 746 |
|
QNode d = dp.next; |
| 747 |
|
QNode dn; |
| 748 |
|
if (d == null || // d is gone or |
| 749 |
|
d == dp || // d is off list or |
| 750 |
|
!d.isCancelled() || // d not cancelled or |
| 751 |
|
(d != t && // d not tail and |
| 752 |
|
(dn = d.next) != null && // has successor |
| 753 |
|
dn != d && // that is on list |
| 754 |
|
dp.casNext(d, dn))) // d unspliced |
| 755 |
|
casCleanMe(dp, null); |
| 756 |
|
if (dp == pred) |
| 757 |
|
return; // s is already saved node |
| 758 |
|
} else if (casCleanMe(null, pred)) |
| 759 |
|
return; // Postpone cleaning s |
| 760 |
|
} |
| 761 |
} |
} |
|
return true; |
|
| 762 |
} |
} |
| 763 |
|
|
| 764 |
/** |
/** |
| 765 |
* Waits for a producer to put item placed by consumer, or time out. |
* The transferer. Set only in constructor, but cannot be declared |
| 766 |
|
* as final without further complicating serialization. Since |
| 767 |
|
* this is accessed only once per public method, there isn't a |
| 768 |
|
* noticeable performance penalty for using volatile instead of |
| 769 |
|
* final here. |
| 770 |
*/ |
*/ |
| 771 |
Object waitForPut(long nanos) throws InterruptedException { |
private transient volatile Transferer transferer; |
| 772 |
try { |
|
| 773 |
if (!tryAcquireNanos(0, nanos) && |
/** |
| 774 |
release(CANCEL)) |
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy. |
| 775 |
return null; |
*/ |
| 776 |
} catch (InterruptedException ie) { |
public SynchronousQueue() { |
| 777 |
checkCancellationOnInterrupt(ie); |
this(false); |
|
} |
|
|
return extract(); |
|
| 778 |
} |
} |
| 779 |
|
|
| 780 |
|
/** |
| 781 |
|
* Creates a <tt>SynchronousQueue</tt> with specified fairness policy. |
| 782 |
|
* @param fair if true, waiting threads contend in FIFO order for access; |
| 783 |
|
* otherwise the order is unspecified. |
| 784 |
|
*/ |
| 785 |
|
public SynchronousQueue(boolean fair) { |
| 786 |
|
transferer = (fair)? new TransferQueue() : new TransferStack(); |
| 787 |
} |
} |
| 788 |
|
|
| 789 |
/** |
/** |
| 793 |
* @throws InterruptedException {@inheritDoc} |
* @throws InterruptedException {@inheritDoc} |
| 794 |
* @throws NullPointerException {@inheritDoc} |
* @throws NullPointerException {@inheritDoc} |
| 795 |
*/ |
*/ |
| 796 |
public void put(E e) throws InterruptedException { |
public void put(E o) throws InterruptedException { |
| 797 |
if (e == null) throw new NullPointerException(); |
if (o == null) throw new NullPointerException(); |
| 798 |
final ReentrantLock qlock = this.qlock; |
if (transferer.transfer(o, false, 0) == null) |
| 799 |
|
throw new InterruptedException(); |
|
for (;;) { |
|
|
Node node; |
|
|
boolean mustWait; |
|
|
if (Thread.interrupted()) throw new InterruptedException(); |
|
|
qlock.lock(); |
|
|
try { |
|
|
node = waitingConsumers.deq(); |
|
|
if ( (mustWait = (node == null)) ) |
|
|
node = waitingProducers.enq(e); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
|
} |
|
|
|
|
|
if (mustWait) { |
|
|
try { |
|
|
node.waitForTake(); |
|
|
return; |
|
|
} catch (InterruptedException ex) { |
|
|
unlinkCancelledProducer(node); |
|
|
throw ex; |
|
|
} |
|
|
} |
|
|
|
|
|
else if (node.setItem(e)) |
|
|
return; |
|
|
|
|
|
// else consumer cancelled, so retry |
|
|
} |
|
| 800 |
} |
} |
| 801 |
|
|
| 802 |
/** |
/** |
| 808 |
* @throws InterruptedException {@inheritDoc} |
* @throws InterruptedException {@inheritDoc} |
| 809 |
* @throws NullPointerException {@inheritDoc} |
* @throws NullPointerException {@inheritDoc} |
| 810 |
*/ |
*/ |
| 811 |
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { |
public boolean offer(E o, long timeout, TimeUnit unit) |
| 812 |
if (e == null) throw new NullPointerException(); |
throws InterruptedException { |
| 813 |
long nanos = unit.toNanos(timeout); |
if (o == null) throw new NullPointerException(); |
| 814 |
final ReentrantLock qlock = this.qlock; |
if (transferer.transfer(o, true, unit.toNanos(timeout)) != null) |
|
for (;;) { |
|
|
Node node; |
|
|
boolean mustWait; |
|
|
if (Thread.interrupted()) throw new InterruptedException(); |
|
|
qlock.lock(); |
|
|
try { |
|
|
node = waitingConsumers.deq(); |
|
|
if ( (mustWait = (node == null)) ) |
|
|
node = waitingProducers.enq(e); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
|
} |
|
|
|
|
|
if (mustWait) { |
|
|
try { |
|
|
boolean x = node.waitForTake(nanos); |
|
|
if (!x) |
|
|
unlinkCancelledProducer(node); |
|
|
return x; |
|
|
} catch (InterruptedException ex) { |
|
|
unlinkCancelledProducer(node); |
|
|
throw ex; |
|
|
} |
|
|
} |
|
|
|
|
|
else if (node.setItem(e)) |
|
| 815 |
return true; |
return true; |
| 816 |
|
if (!Thread.interrupted()) |
| 817 |
// else consumer cancelled, so retry |
return false; |
| 818 |
|
throw new InterruptedException(); |
| 819 |
} |
} |
| 820 |
|
|
| 821 |
|
/** |
| 822 |
|
* Inserts the specified element into this queue, if another thread is |
| 823 |
|
* waiting to receive it. |
| 824 |
|
* |
| 825 |
|
* @param e the element to add |
| 826 |
|
* @return <tt>true</tt> if the element was added to this queue, else |
| 827 |
|
* <tt>false</tt> |
| 828 |
|
* @throws NullPointerException if the specified element is null |
| 829 |
|
*/ |
| 830 |
|
public boolean offer(E e) { |
| 831 |
|
if (e == null) throw new NullPointerException(); |
| 832 |
|
return transferer.transfer(e, true, 0) != null; |
| 833 |
} |
} |
| 834 |
|
|
| 835 |
/** |
/** |
| 840 |
* @throws InterruptedException {@inheritDoc} |
* @throws InterruptedException {@inheritDoc} |
| 841 |
*/ |
*/ |
| 842 |
public E take() throws InterruptedException { |
public E take() throws InterruptedException { |
| 843 |
final ReentrantLock qlock = this.qlock; |
Object e = transferer.transfer(null, false, 0); |
| 844 |
for (;;) { |
if (e != null) |
| 845 |
Node node; |
return (E)e; |
| 846 |
boolean mustWait; |
throw new InterruptedException(); |
|
|
|
|
if (Thread.interrupted()) throw new InterruptedException(); |
|
|
qlock.lock(); |
|
|
try { |
|
|
node = waitingProducers.deq(); |
|
|
if ( (mustWait = (node == null)) ) |
|
|
node = waitingConsumers.enq(null); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
|
} |
|
|
|
|
|
if (mustWait) { |
|
|
try { |
|
|
Object x = node.waitForPut(); |
|
|
return (E)x; |
|
|
} catch (InterruptedException ex) { |
|
|
unlinkCancelledConsumer(node); |
|
|
throw ex; |
|
|
} |
|
|
} |
|
|
else { |
|
|
Object x = node.getItem(); |
|
|
if (x != null) |
|
|
return (E)x; |
|
|
// else cancelled, so retry |
|
|
} |
|
|
} |
|
| 847 |
} |
} |
| 848 |
|
|
| 849 |
/** |
/** |
| 856 |
* @throws InterruptedException {@inheritDoc} |
* @throws InterruptedException {@inheritDoc} |
| 857 |
*/ |
*/ |
| 858 |
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| 859 |
long nanos = unit.toNanos(timeout); |
Object e = transferer.transfer(null, true, unit.toNanos(timeout)); |
| 860 |
final ReentrantLock qlock = this.qlock; |
if (e != null || !Thread.interrupted()) |
| 861 |
|
return (E)e; |
| 862 |
for (;;) { |
throw new InterruptedException(); |
|
Node node; |
|
|
boolean mustWait; |
|
|
|
|
|
if (Thread.interrupted()) throw new InterruptedException(); |
|
|
qlock.lock(); |
|
|
try { |
|
|
node = waitingProducers.deq(); |
|
|
if ( (mustWait = (node == null)) ) |
|
|
node = waitingConsumers.enq(null); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
|
} |
|
|
|
|
|
if (mustWait) { |
|
|
try { |
|
|
Object x = node.waitForPut(nanos); |
|
|
if (x == null) |
|
|
unlinkCancelledConsumer(node); |
|
|
return (E)x; |
|
|
} catch (InterruptedException ex) { |
|
|
unlinkCancelledConsumer(node); |
|
|
throw ex; |
|
|
} |
|
|
} |
|
|
else { |
|
|
Object x = node.getItem(); |
|
|
if (x != null) |
|
|
return (E)x; |
|
|
// else cancelled, so retry |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
// Untimed nonblocking versions |
|
|
|
|
|
/** |
|
|
* Inserts the specified element into this queue, if another thread is |
|
|
* waiting to receive it. |
|
|
* |
|
|
* @param e the element to add |
|
|
* @return <tt>true</tt> if the element was added to this queue, else |
|
|
* <tt>false</tt> |
|
|
* @throws NullPointerException if the specified element is null |
|
|
*/ |
|
|
public boolean offer(E e) { |
|
|
if (e == null) throw new NullPointerException(); |
|
|
final ReentrantLock qlock = this.qlock; |
|
|
|
|
|
for (;;) { |
|
|
Node node; |
|
|
qlock.lock(); |
|
|
try { |
|
|
node = waitingConsumers.deq(); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
|
} |
|
|
if (node == null) |
|
|
return false; |
|
|
|
|
|
else if (node.setItem(e)) |
|
|
return true; |
|
|
// else retry |
|
|
} |
|
| 863 |
} |
} |
| 864 |
|
|
| 865 |
/** |
/** |
| 870 |
* element is available. |
* element is available. |
| 871 |
*/ |
*/ |
| 872 |
public E poll() { |
public E poll() { |
| 873 |
final ReentrantLock qlock = this.qlock; |
return (E)transferer.transfer(null, true, 0); |
|
for (;;) { |
|
|
Node node; |
|
|
qlock.lock(); |
|
|
try { |
|
|
node = waitingProducers.deq(); |
|
|
} finally { |
|
|
qlock.unlock(); |
|
|
} |
|
|
if (node == null) |
|
|
return null; |
|
|
|
|
|
else { |
|
|
Object x = node.getItem(); |
|
|
if (x != null) |
|
|
return (E)x; |
|
|
// else retry |
|
|
} |
|
|
} |
|
| 874 |
} |
} |
| 875 |
|
|
| 876 |
/** |
/** |
| 877 |
* Always returns <tt>true</tt>. |
* Always returns <tt>true</tt>. |
| 878 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
|
* |
|
| 879 |
* @return <tt>true</tt> |
* @return <tt>true</tt> |
| 880 |
*/ |
*/ |
| 881 |
public boolean isEmpty() { |
public boolean isEmpty() { |
| 885 |
/** |
/** |
| 886 |
* Always returns zero. |
* Always returns zero. |
| 887 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
| 888 |
* |
* @return zero. |
|
* @return zero |
|
| 889 |
*/ |
*/ |
| 890 |
public int size() { |
public int size() { |
| 891 |
return 0; |
return 0; |
| 894 |
/** |
/** |
| 895 |
* Always returns zero. |
* Always returns zero. |
| 896 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
| 897 |
* |
* @return zero. |
|
* @return zero |
|
| 898 |
*/ |
*/ |
| 899 |
public int remainingCapacity() { |
public int remainingCapacity() { |
| 900 |
return 0; |
return 0; |
| 904 |
* Does nothing. |
* Does nothing. |
| 905 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
| 906 |
*/ |
*/ |
| 907 |
public void clear() {} |
public void clear() { |
| 908 |
|
} |
| 909 |
|
|
| 910 |
/** |
/** |
| 911 |
* Always returns <tt>false</tt>. |
* Always returns <tt>false</tt>. |
| 912 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
| 913 |
* |
* @param o the element |
|
* @param o object to be checked for containment in this queue |
|
| 914 |
* @return <tt>false</tt> |
* @return <tt>false</tt> |
| 915 |
*/ |
*/ |
| 916 |
public boolean contains(Object o) { |
public boolean contains(Object o) { |
| 929 |
} |
} |
| 930 |
|
|
| 931 |
/** |
/** |
| 932 |
* Returns <tt>false</tt> unless the given collection is empty. |
* Returns <tt>false</tt> unless given collection is empty. |
| 933 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
|
* |
|
| 934 |
* @param c the collection |
* @param c the collection |
| 935 |
* @return <tt>false</tt> unless the given collection is empty |
* @return <tt>false</tt> unless given collection is empty |
|
* @throws NullPointerException if the specified collection is null |
|
| 936 |
*/ |
*/ |
| 937 |
public boolean containsAll(Collection<?> c) { |
public boolean containsAll(Collection<?> c) { |
| 938 |
return c.isEmpty(); |
return c.isEmpty(); |
| 941 |
/** |
/** |
| 942 |
* Always returns <tt>false</tt>. |
* Always returns <tt>false</tt>. |
| 943 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
|
* |
|
| 944 |
* @param c the collection |
* @param c the collection |
| 945 |
* @return <tt>false</tt> |
* @return <tt>false</tt> |
| 946 |
*/ |
*/ |
| 951 |
/** |
/** |
| 952 |
* Always returns <tt>false</tt>. |
* Always returns <tt>false</tt>. |
| 953 |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
* A <tt>SynchronousQueue</tt> has no internal capacity. |
|
* |
|
| 954 |
* @param c the collection |
* @param c the collection |
| 955 |
* @return <tt>false</tt> |
* @return <tt>false</tt> |
| 956 |
*/ |
*/ |
| 962 |
* Always returns <tt>null</tt>. |
* Always returns <tt>null</tt>. |
| 963 |
* A <tt>SynchronousQueue</tt> does not return elements |
* A <tt>SynchronousQueue</tt> does not return elements |
| 964 |
* unless actively waited on. |
* unless actively waited on. |
|
* |
|
| 965 |
* @return <tt>null</tt> |
* @return <tt>null</tt> |
| 966 |
*/ |
*/ |
| 967 |
public E peek() { |
public E peek() { |
| 968 |
return null; |
return null; |
| 969 |
} |
} |
| 970 |
|
|
|
|
|
| 971 |
static class EmptyIterator<E> implements Iterator<E> { |
static class EmptyIterator<E> implements Iterator<E> { |
| 972 |
public boolean hasNext() { |
public boolean hasNext() { |
| 973 |
return false; |
return false; |
| 990 |
return new EmptyIterator<E>(); |
return new EmptyIterator<E>(); |
| 991 |
} |
} |
| 992 |
|
|
|
|
|
| 993 |
/** |
/** |
| 994 |
* Returns a zero-length array. |
* Returns a zero-length array. |
| 995 |
* @return a zero-length array |
* @return a zero-length array |
| 1051 |
} |
} |
| 1052 |
return n; |
return n; |
| 1053 |
} |
} |
| 1054 |
|
|
| 1055 |
|
/* |
| 1056 |
|
* To cope with serialization strategy in the 1.5 version of |
| 1057 |
|
* SynchronousQueue, we declare some unused classes and fields |
| 1058 |
|
* that exist solely to enable serializability across versions. |
| 1059 |
|
* These fields are never used, so are initialized only if this |
| 1060 |
|
* object is ever serialized or deserialized. |
| 1061 |
|
*/ |
| 1062 |
|
|
| 1063 |
|
static class WaitQueue implements java.io.Serializable { } |
| 1064 |
|
static class LifoWaitQueue extends WaitQueue { |
| 1065 |
|
private static final long serialVersionUID = -3633113410248163686L; |
| 1066 |
|
} |
| 1067 |
|
static class FifoWaitQueue extends WaitQueue { |
| 1068 |
|
private static final long serialVersionUID = -3623113410248163686L; |
| 1069 |
|
} |
| 1070 |
|
private ReentrantLock qlock; |
| 1071 |
|
private WaitQueue waitingProducers; |
| 1072 |
|
private WaitQueue waitingConsumers; |
| 1073 |
|
|
| 1074 |
|
/** |
| 1075 |
|
* Save the state to a stream (that is, serialize it). |
| 1076 |
|
* |
| 1077 |
|
* @param s the stream |
| 1078 |
|
*/ |
| 1079 |
|
private void writeObject(java.io.ObjectOutputStream s) |
| 1080 |
|
throws java.io.IOException { |
| 1081 |
|
boolean fair = transferer instanceof TransferQueue; |
| 1082 |
|
if (fair) { |
| 1083 |
|
qlock = new ReentrantLock(true); |
| 1084 |
|
waitingProducers = new FifoWaitQueue(); |
| 1085 |
|
waitingConsumers = new FifoWaitQueue(); |
| 1086 |
|
} |
| 1087 |
|
else { |
| 1088 |
|
qlock = new ReentrantLock(); |
| 1089 |
|
waitingProducers = new LifoWaitQueue(); |
| 1090 |
|
waitingConsumers = new LifoWaitQueue(); |
| 1091 |
|
} |
| 1092 |
|
s.defaultWriteObject(); |
| 1093 |
|
} |
| 1094 |
|
|
| 1095 |
|
private void readObject(final java.io.ObjectInputStream s) |
| 1096 |
|
throws java.io.IOException, ClassNotFoundException { |
| 1097 |
|
s.defaultReadObject(); |
| 1098 |
|
if (waitingProducers instanceof FifoWaitQueue) |
| 1099 |
|
transferer = new TransferQueue(); |
| 1100 |
|
else |
| 1101 |
|
transferer = new TransferStack(); |
| 1102 |
|
} |
| 1103 |
|
|
| 1104 |
} |
} |