ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.109
Committed: Wed Dec 31 07:54:14 2014 UTC (9 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.108: +3 -3 lines
Log Message:
standardize import statement order

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 package java.util.concurrent;
9
10 import java.util.AbstractQueue;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.Iterator;
14 import java.util.Spliterator;
15 import java.util.Spliterators;
16 import java.util.concurrent.locks.LockSupport;
17 import java.util.concurrent.locks.ReentrantLock;
18 import java.util.function.Consumer;
19 import java.util.stream.Stream;
20
21 /**
22 * A {@linkplain BlockingQueue blocking queue} in which each insert
23 * operation must wait for a corresponding remove operation by another
24 * thread, and vice versa. A synchronous queue does not have any
25 * internal capacity, not even a capacity of one. You cannot
26 * {@code peek} at a synchronous queue because an element is only
27 * present when you try to remove it; you cannot insert an element
28 * (using any method) unless another thread is trying to remove it;
29 * you cannot iterate as there is nothing to iterate. The
30 * <em>head</em> of the queue is the element that the first queued
31 * inserting thread is trying to add to the queue; if there is no such
32 * queued thread then no element is available for removal and
33 * {@code poll()} will return {@code null}. For purposes of other
34 * {@code Collection} methods (for example {@code contains}), a
35 * {@code SynchronousQueue} acts as an empty collection. This queue
36 * does not permit {@code null} elements.
37 *
38 * <p>Synchronous queues are similar to rendezvous channels used in
39 * CSP and Ada. They are well suited for handoff designs, in which an
40 * object running in one thread must sync up with an object running
41 * in another thread in order to hand it some information, event, or
42 * task.
43 *
44 * <p>This class supports an optional fairness policy for ordering
45 * waiting producer and consumer threads. By default, this ordering
46 * is not guaranteed. However, a queue constructed with fairness set
47 * to {@code true} grants threads access in FIFO order.
48 *
49 * <p>This class and its iterator implement all of the
50 * <em>optional</em> methods of the {@link Collection} and {@link
51 * Iterator} interfaces.
52 *
53 * <p>This class is a member of the
54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
55 * Java Collections Framework</a>.
56 *
57 * @since 1.5
58 * @author Doug Lea and Bill Scherer and Michael Scott
59 * @param <E> the type of elements held in this queue
60 */
61 public class SynchronousQueue<E> extends AbstractQueue<E>
62 implements BlockingQueue<E>, java.io.Serializable {
63 private static final long serialVersionUID = -3223113410248163686L;
64
65 /*
66 * This class implements extensions of the dual stack and dual
67 * queue algorithms described in "Nonblocking Concurrent Objects
68 * with Condition Synchronization", by W. N. Scherer III and
69 * M. L. Scott. 18th Annual Conf. on Distributed Computing,
70 * Oct. 2004 (see also
71 * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
72 * The (Lifo) stack is used for non-fair mode, and the (Fifo)
73 * queue for fair mode. The performance of the two is generally
74 * similar. Fifo usually supports higher throughput under
75 * contention but Lifo maintains higher thread locality in common
76 * applications.
77 *
78 * A dual queue (and similarly stack) is one that at any given
79 * time either holds "data" -- items provided by put operations,
80 * or "requests" -- slots representing take operations, or is
81 * empty. A call to "fulfill" (i.e., a call requesting an item
82 * from a queue holding data or vice versa) dequeues a
83 * complementary node. The most interesting feature of these
84 * queues is that any operation can figure out which mode the
85 * queue is in, and act accordingly without needing locks.
86 *
87 * Both the queue and stack extend abstract class Transferer
88 * defining the single method transfer that does a put or a
89 * take. These are unified into a single method because in dual
90 * data structures, the put and take operations are symmetrical,
91 * so nearly all code can be combined. The resulting transfer
92 * methods are on the long side, but are easier to follow than
93 * they would be if broken up into nearly-duplicated parts.
94 *
95 * The queue and stack data structures share many conceptual
96 * similarities but very few concrete details. For simplicity,
97 * they are kept distinct so that they can later evolve
98 * separately.
99 *
100 * The algorithms here differ from the versions in the above paper
101 * in extending them for use in synchronous queues, as well as
102 * dealing with cancellation. The main differences include:
103 *
104 * 1. The original algorithms used bit-marked pointers, but
105 * the ones here use mode bits in nodes, leading to a number
106 * of further adaptations.
107 * 2. SynchronousQueues must block threads waiting to become
108 * fulfilled.
109 * 3. Support for cancellation via timeout and interrupts,
110 * including cleaning out cancelled nodes/threads
111 * from lists to avoid garbage retention and memory depletion.
112 *
113 * Blocking is mainly accomplished using LockSupport park/unpark,
114 * except that nodes that appear to be the next ones to become
115 * fulfilled first spin a bit (on multiprocessors only). On very
116 * busy synchronous queues, spinning can dramatically improve
117 * throughput. And on less busy ones, the amount of spinning is
118 * small enough not to be noticeable.
119 *
120 * Cleaning is done in different ways in queues vs stacks. For
121 * queues, we can almost always remove a node immediately in O(1)
122 * time (modulo retries for consistency checks) when it is
123 * cancelled. But if it may be pinned as the current tail, it must
124 * wait until some subsequent cancellation. For stacks, we need a
125 * potentially O(n) traversal to be sure that we can remove the
126 * node, but this can run concurrently with other threads
127 * accessing the stack.
128 *
129 * While garbage collection takes care of most node reclamation
130 * issues that otherwise complicate nonblocking algorithms, care
131 * is taken to "forget" references to data, other nodes, and
132 * threads that might be held on to long-term by blocked
133 * threads. In cases where setting to null would otherwise
134 * conflict with main algorithms, this is done by changing a
135 * node's link to now point to the node itself. This doesn't arise
136 * much for Stack nodes (because blocked threads do not hang on to
137 * old head pointers), but references in Queue nodes must be
138 * aggressively forgotten to avoid reachability of everything any
139 * node has ever referred to since arrival.
140 */
141
142 /**
143 * Shared internal API for dual stacks and queues.
144 */
145 abstract static class Transferer<E> {
146 /**
147 * Performs a put or take.
148 *
149 * @param e if non-null, the item to be handed to a consumer;
150 * if null, requests that transfer return an item
151 * offered by producer.
152 * @param timed if this operation should timeout
153 * @param nanos the timeout, in nanoseconds
154 * @return if non-null, the item provided or received; if null,
155 * the operation failed due to timeout or interrupt --
156 * the caller can distinguish which of these occurred
157 * by checking Thread.interrupted.
158 */
159 abstract E transfer(E e, boolean timed, long nanos);
160 }
161
162 /** The number of CPUs, for spin control */
163 static final int NCPUS = Runtime.getRuntime().availableProcessors();
164
165 /**
166 * The number of times to spin before blocking in timed waits.
167 * The value is empirically derived -- it works well across a
168 * variety of processors and OSes. Empirically, the best value
169 * seems not to vary with number of CPUs (beyond 2) so is just
170 * a constant.
171 */
172 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
173
174 /**
175 * The number of times to spin before blocking in untimed waits.
176 * This is greater than timed value because untimed waits spin
177 * faster since they don't need to check times on each spin.
178 */
179 static final int maxUntimedSpins = maxTimedSpins * 16;
180
181 /**
182 * The number of nanoseconds for which it is faster to spin
183 * rather than to use timed park. A rough estimate suffices.
184 */
185 static final long spinForTimeoutThreshold = 1000L;
186
187 /** Dual stack */
188 static final class TransferStack<E> extends Transferer<E> {
189 /*
190 * This extends Scherer-Scott dual stack algorithm, differing,
191 * among other ways, by using "covering" nodes rather than
192 * bit-marked pointers: Fulfilling operations push on marker
193 * nodes (with FULFILLING bit set in mode) to reserve a spot
194 * to match a waiting node.
195 */
196
197 /* Modes for SNodes, ORed together in node fields */
198 /** Node represents an unfulfilled consumer */
199 static final int REQUEST = 0;
200 /** Node represents an unfulfilled producer */
201 static final int DATA = 1;
202 /** Node is fulfilling another unfulfilled DATA or REQUEST */
203 static final int FULFILLING = 2;
204
205 /** Returns true if m has fulfilling bit set. */
206 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
207
208 /** Node class for TransferStacks. */
209 static final class SNode {
210 volatile SNode next; // next node in stack
211 volatile SNode match; // the node matched to this
212 volatile Thread waiter; // to control park/unpark
213 Object item; // data; or null for REQUESTs
214 int mode;
215 // Note: item and mode fields don't need to be volatile
216 // since they are always written before, and read after,
217 // other volatile/atomic operations.
218
219 SNode(Object item) {
220 this.item = item;
221 }
222
223 boolean casNext(SNode cmp, SNode val) {
224 return cmp == next &&
225 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
226 }
227
228 /**
229 * Tries to match node s to this node, if so, waking up thread.
230 * Fulfillers call tryMatch to identify their waiters.
231 * Waiters block until they have been matched.
232 *
233 * @param s the node to match
234 * @return true if successfully matched to s
235 */
236 boolean tryMatch(SNode s) {
237 if (match == null &&
238 UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
239 Thread w = waiter;
240 if (w != null) { // waiters need at most one unpark
241 waiter = null;
242 LockSupport.unpark(w);
243 }
244 return true;
245 }
246 return match == s;
247 }
248
249 /**
250 * Tries to cancel a wait by matching node to itself.
251 */
252 void tryCancel() {
253 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
254 }
255
256 boolean isCancelled() {
257 return match == this;
258 }
259
260 // Unsafe mechanics
261 private static final sun.misc.Unsafe UNSAFE;
262 private static final long matchOffset;
263 private static final long nextOffset;
264
265 static {
266 try {
267 UNSAFE = sun.misc.Unsafe.getUnsafe();
268 Class<?> k = SNode.class;
269 matchOffset = UNSAFE.objectFieldOffset
270 (k.getDeclaredField("match"));
271 nextOffset = UNSAFE.objectFieldOffset
272 (k.getDeclaredField("next"));
273 } catch (Exception e) {
274 throw new Error(e);
275 }
276 }
277 }
278
279 /** The head (top) of the stack */
280 volatile SNode head;
281
282 boolean casHead(SNode h, SNode nh) {
283 return h == head &&
284 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
285 }
286
287 /**
288 * Creates or resets fields of a node. Called only from transfer
289 * where the node to push on stack is lazily created and
290 * reused when possible to help reduce intervals between reads
291 * and CASes of head and to avoid surges of garbage when CASes
292 * to push nodes fail due to contention.
293 */
294 static SNode snode(SNode s, Object e, SNode next, int mode) {
295 if (s == null) s = new SNode(e);
296 s.mode = mode;
297 s.next = next;
298 return s;
299 }
300
301 /**
302 * Puts or takes an item.
303 */
304 @SuppressWarnings("unchecked")
305 E transfer(E e, boolean timed, long nanos) {
306 /*
307 * Basic algorithm is to loop trying one of three actions:
308 *
309 * 1. If apparently empty or already containing nodes of same
310 * mode, try to push node on stack and wait for a match,
311 * returning it, or null if cancelled.
312 *
313 * 2. If apparently containing node of complementary mode,
314 * try to push a fulfilling node on to stack, match
315 * with corresponding waiting node, pop both from
316 * stack, and return matched item. The matching or
317 * unlinking might not actually be necessary because of
318 * other threads performing action 3:
319 *
320 * 3. If top of stack already holds another fulfilling node,
321 * help it out by doing its match and/or pop
322 * operations, and then continue. The code for helping
323 * is essentially the same as for fulfilling, except
324 * that it doesn't return the item.
325 */
326
327 SNode s = null; // constructed/reused as needed
328 int mode = (e == null) ? REQUEST : DATA;
329
330 for (;;) {
331 SNode h = head;
332 if (h == null || h.mode == mode) { // empty or same-mode
333 if (timed && nanos <= 0) { // can't wait
334 if (h != null && h.isCancelled())
335 casHead(h, h.next); // pop cancelled node
336 else
337 return null;
338 } else if (casHead(h, s = snode(s, e, h, mode))) {
339 SNode m = awaitFulfill(s, timed, nanos);
340 if (m == s) { // wait was cancelled
341 clean(s);
342 return null;
343 }
344 if ((h = head) != null && h.next == s)
345 casHead(h, s.next); // help s's fulfiller
346 return (E) ((mode == REQUEST) ? m.item : s.item);
347 }
348 } else if (!isFulfilling(h.mode)) { // try to fulfill
349 if (h.isCancelled()) // already cancelled
350 casHead(h, h.next); // pop and retry
351 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
352 for (;;) { // loop until matched or waiters disappear
353 SNode m = s.next; // m is s's match
354 if (m == null) { // all waiters are gone
355 casHead(s, null); // pop fulfill node
356 s = null; // use new node next time
357 break; // restart main loop
358 }
359 SNode mn = m.next;
360 if (m.tryMatch(s)) {
361 casHead(s, mn); // pop both s and m
362 return (E) ((mode == REQUEST) ? m.item : s.item);
363 } else // lost match
364 s.casNext(m, mn); // help unlink
365 }
366 }
367 } else { // help a fulfiller
368 SNode m = h.next; // m is h's match
369 if (m == null) // waiter is gone
370 casHead(h, null); // pop fulfilling node
371 else {
372 SNode mn = m.next;
373 if (m.tryMatch(h)) // help match
374 casHead(h, mn); // pop both h and m
375 else // lost match
376 h.casNext(m, mn); // help unlink
377 }
378 }
379 }
380 }
381
382 /**
383 * Spins/blocks until node s is matched by a fulfill operation.
384 *
385 * @param s the waiting node
386 * @param timed true if timed wait
387 * @param nanos timeout value
388 * @return matched node, or s if cancelled
389 */
390 SNode awaitFulfill(SNode s, boolean timed, long nanos) {
391 /*
392 * When a node/thread is about to block, it sets its waiter
393 * field and then rechecks state at least one more time
394 * before actually parking, thus covering race vs
395 * fulfiller noticing that waiter is non-null so should be
396 * woken.
397 *
398 * When invoked by nodes that appear at the point of call
399 * to be at the head of the stack, calls to park are
400 * preceded by spins to avoid blocking when producers and
401 * consumers are arriving very close in time. This can
402 * happen enough to bother only on multiprocessors.
403 *
404 * The order of checks for returning out of main loop
405 * reflects fact that interrupts have precedence over
406 * normal returns, which have precedence over
407 * timeouts. (So, on timeout, one last check for match is
408 * done before giving up.) Except that calls from untimed
409 * SynchronousQueue.{poll/offer} don't check interrupts
410 * and don't wait at all, so are trapped in transfer
411 * method rather than calling awaitFulfill.
412 */
413 final long deadline = timed ? System.nanoTime() + nanos : 0L;
414 Thread w = Thread.currentThread();
415 int spins = shouldSpin(s)
416 ? (timed ? maxTimedSpins : maxUntimedSpins)
417 : 0;
418 for (;;) {
419 if (w.isInterrupted())
420 s.tryCancel();
421 SNode m = s.match;
422 if (m != null)
423 return m;
424 if (timed) {
425 nanos = deadline - System.nanoTime();
426 if (nanos <= 0L) {
427 s.tryCancel();
428 continue;
429 }
430 }
431 if (spins > 0)
432 spins = shouldSpin(s) ? (spins - 1) : 0;
433 else if (s.waiter == null)
434 s.waiter = w; // establish waiter so can park next iter
435 else if (!timed)
436 LockSupport.park(this);
437 else if (nanos > spinForTimeoutThreshold)
438 LockSupport.parkNanos(this, nanos);
439 }
440 }
441
442 /**
443 * Returns true if node s is at head or there is an active
444 * fulfiller.
445 */
446 boolean shouldSpin(SNode s) {
447 SNode h = head;
448 return (h == s || h == null || isFulfilling(h.mode));
449 }
450
451 /**
452 * Unlinks s from the stack.
453 */
454 void clean(SNode s) {
455 s.item = null; // forget item
456 s.waiter = null; // forget thread
457
458 /*
459 * At worst we may need to traverse entire stack to unlink
460 * s. If there are multiple concurrent calls to clean, we
461 * might not see s if another thread has already removed
462 * it. But we can stop when we see any node known to
463 * follow s. We use s.next unless it too is cancelled, in
464 * which case we try the node one past. We don't check any
465 * further because we don't want to doubly traverse just to
466 * find sentinel.
467 */
468
469 SNode past = s.next;
470 if (past != null && past.isCancelled())
471 past = past.next;
472
473 // Absorb cancelled nodes at head
474 SNode p;
475 while ((p = head) != null && p != past && p.isCancelled())
476 casHead(p, p.next);
477
478 // Unsplice embedded nodes
479 while (p != null && p != past) {
480 SNode n = p.next;
481 if (n != null && n.isCancelled())
482 p.casNext(n, n.next);
483 else
484 p = n;
485 }
486 }
487
488 // Unsafe mechanics
489 private static final sun.misc.Unsafe UNSAFE;
490 private static final long headOffset;
491 static {
492 try {
493 UNSAFE = sun.misc.Unsafe.getUnsafe();
494 Class<?> k = TransferStack.class;
495 headOffset = UNSAFE.objectFieldOffset
496 (k.getDeclaredField("head"));
497 } catch (Exception e) {
498 throw new Error(e);
499 }
500 }
501 }
502
503 /** Dual Queue */
504 static final class TransferQueue<E> extends Transferer<E> {
505 /*
506 * This extends Scherer-Scott dual queue algorithm, differing,
507 * among other ways, by using modes within nodes rather than
508 * marked pointers. The algorithm is a little simpler than
509 * that for stacks because fulfillers do not need explicit
510 * nodes, and matching is done by CAS'ing QNode.item field
511 * from non-null to null (for put) or vice versa (for take).
512 */
513
514 /** Node class for TransferQueue. */
515 static final class QNode {
516 volatile QNode next; // next node in queue
517 volatile Object item; // CAS'ed to or from null
518 volatile Thread waiter; // to control park/unpark
519 final boolean isData;
520
521 QNode(Object item, boolean isData) {
522 this.item = item;
523 this.isData = isData;
524 }
525
526 boolean casNext(QNode cmp, QNode val) {
527 return next == cmp &&
528 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
529 }
530
531 boolean casItem(Object cmp, Object val) {
532 return item == cmp &&
533 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
534 }
535
536 /**
537 * Tries to cancel by CAS'ing ref to this as item.
538 */
539 void tryCancel(Object cmp) {
540 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
541 }
542
543 boolean isCancelled() {
544 return item == this;
545 }
546
547 /**
548 * Returns true if this node is known to be off the queue
549 * because its next pointer has been forgotten due to
550 * an advanceHead operation.
551 */
552 boolean isOffList() {
553 return next == this;
554 }
555
556 // Unsafe mechanics
557 private static final sun.misc.Unsafe UNSAFE;
558 private static final long itemOffset;
559 private static final long nextOffset;
560
561 static {
562 try {
563 UNSAFE = sun.misc.Unsafe.getUnsafe();
564 Class<?> k = QNode.class;
565 itemOffset = UNSAFE.objectFieldOffset
566 (k.getDeclaredField("item"));
567 nextOffset = UNSAFE.objectFieldOffset
568 (k.getDeclaredField("next"));
569 } catch (Exception e) {
570 throw new Error(e);
571 }
572 }
573 }
574
575 /** Head of queue */
576 transient volatile QNode head;
577 /** Tail of queue */
578 transient volatile QNode tail;
579 /**
580 * Reference to a cancelled node that might not yet have been
581 * unlinked from queue because it was the last inserted node
582 * when it was cancelled.
583 */
584 transient volatile QNode cleanMe;
585
586 TransferQueue() {
587 QNode h = new QNode(null, false); // initialize to dummy node.
588 head = h;
589 tail = h;
590 }
591
592 /**
593 * Tries to cas nh as new head; if successful, unlink
594 * old head's next node to avoid garbage retention.
595 */
596 void advanceHead(QNode h, QNode nh) {
597 if (h == head &&
598 UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
599 h.next = h; // forget old next
600 }
601
602 /**
603 * Tries to cas nt as new tail.
604 */
605 void advanceTail(QNode t, QNode nt) {
606 if (tail == t)
607 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
608 }
609
610 /**
611 * Tries to CAS cleanMe slot.
612 */
613 boolean casCleanMe(QNode cmp, QNode val) {
614 return cleanMe == cmp &&
615 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
616 }
617
618 /**
619 * Puts or takes an item.
620 */
621 @SuppressWarnings("unchecked")
622 E transfer(E e, boolean timed, long nanos) {
623 /* Basic algorithm is to loop trying to take either of
624 * two actions:
625 *
626 * 1. If queue apparently empty or holding same-mode nodes,
627 * try to add node to queue of waiters, wait to be
628 * fulfilled (or cancelled) and return matching item.
629 *
630 * 2. If queue apparently contains waiting items, and this
631 * call is of complementary mode, try to fulfill by CAS'ing
632 * item field of waiting node and dequeuing it, and then
633 * returning matching item.
634 *
635 * In each case, along the way, check for and try to help
636 * advance head and tail on behalf of other stalled/slow
637 * threads.
638 *
639 * The loop starts off with a null check guarding against
640 * seeing uninitialized head or tail values. This never
641 * happens in current SynchronousQueue, but could if
642 * callers held non-volatile/final ref to the
643 * transferer. The check is here anyway because it places
644 * null checks at top of loop, which is usually faster
645 * than having them implicitly interspersed.
646 */
647
648 QNode s = null; // constructed/reused as needed
649 boolean isData = (e != null);
650
651 for (;;) {
652 QNode t = tail;
653 QNode h = head;
654 if (t == null || h == null) // saw uninitialized value
655 continue; // spin
656
657 if (h == t || t.isData == isData) { // empty or same-mode
658 QNode tn = t.next;
659 if (t != tail) // inconsistent read
660 continue;
661 if (tn != null) { // lagging tail
662 advanceTail(t, tn);
663 continue;
664 }
665 if (timed && nanos <= 0) // can't wait
666 return null;
667 if (s == null)
668 s = new QNode(e, isData);
669 if (!t.casNext(null, s)) // failed to link in
670 continue;
671
672 advanceTail(t, s); // swing tail and wait
673 Object x = awaitFulfill(s, e, timed, nanos);
674 if (x == s) { // wait was cancelled
675 clean(t, s);
676 return null;
677 }
678
679 if (!s.isOffList()) { // not already unlinked
680 advanceHead(t, s); // unlink if head
681 if (x != null) // and forget fields
682 s.item = s;
683 s.waiter = null;
684 }
685 return (x != null) ? (E)x : e;
686
687 } else { // complementary-mode
688 QNode m = h.next; // node to fulfill
689 if (t != tail || m == null || h != head)
690 continue; // inconsistent read
691
692 Object x = m.item;
693 if (isData == (x != null) || // m already fulfilled
694 x == m || // m cancelled
695 !m.casItem(x, e)) { // lost CAS
696 advanceHead(h, m); // dequeue and retry
697 continue;
698 }
699
700 advanceHead(h, m); // successfully fulfilled
701 LockSupport.unpark(m.waiter);
702 return (x != null) ? (E)x : e;
703 }
704 }
705 }
706
707 /**
708 * Spins/blocks until node s is fulfilled.
709 *
710 * @param s the waiting node
711 * @param e the comparison value for checking match
712 * @param timed true if timed wait
713 * @param nanos timeout value
714 * @return matched item, or s if cancelled
715 */
716 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
717 /* Same idea as TransferStack.awaitFulfill */
718 final long deadline = timed ? System.nanoTime() + nanos : 0L;
719 Thread w = Thread.currentThread();
720 int spins = (head.next == s)
721 ? (timed ? maxTimedSpins : maxUntimedSpins)
722 : 0;
723 for (;;) {
724 if (w.isInterrupted())
725 s.tryCancel(e);
726 Object x = s.item;
727 if (x != e)
728 return x;
729 if (timed) {
730 nanos = deadline - System.nanoTime();
731 if (nanos <= 0L) {
732 s.tryCancel(e);
733 continue;
734 }
735 }
736 if (spins > 0)
737 --spins;
738 else if (s.waiter == null)
739 s.waiter = w;
740 else if (!timed)
741 LockSupport.park(this);
742 else if (nanos > spinForTimeoutThreshold)
743 LockSupport.parkNanos(this, nanos);
744 }
745 }
746
747 /**
748 * Gets rid of cancelled node s with original predecessor pred.
749 */
750 void clean(QNode pred, QNode s) {
751 s.waiter = null; // forget thread
752 /*
753 * At any given time, exactly one node on list cannot be
754 * deleted -- the last inserted node. To accommodate this,
755 * if we cannot delete s, we save its predecessor as
756 * "cleanMe", deleting the previously saved version
757 * first. At least one of node s or the node previously
758 * saved can always be deleted, so this always terminates.
759 */
760 while (pred.next == s) { // Return early if already unlinked
761 QNode h = head;
762 QNode hn = h.next; // Absorb cancelled first node as head
763 if (hn != null && hn.isCancelled()) {
764 advanceHead(h, hn);
765 continue;
766 }
767 QNode t = tail; // Ensure consistent read for tail
768 if (t == h)
769 return;
770 QNode tn = t.next;
771 if (t != tail)
772 continue;
773 if (tn != null) {
774 advanceTail(t, tn);
775 continue;
776 }
777 if (s != t) { // If not tail, try to unsplice
778 QNode sn = s.next;
779 if (sn == s || pred.casNext(s, sn))
780 return;
781 }
782 QNode dp = cleanMe;
783 if (dp != null) { // Try unlinking previous cancelled node
784 QNode d = dp.next;
785 QNode dn;
786 if (d == null || // d is gone or
787 d == dp || // d is off list or
788 !d.isCancelled() || // d not cancelled or
789 (d != t && // d not tail and
790 (dn = d.next) != null && // has successor
791 dn != d && // that is on list
792 dp.casNext(d, dn))) // d unspliced
793 casCleanMe(dp, null);
794 if (dp == pred)
795 return; // s is already saved node
796 } else if (casCleanMe(null, pred))
797 return; // Postpone cleaning s
798 }
799 }
800
801 private static final sun.misc.Unsafe UNSAFE;
802 private static final long headOffset;
803 private static final long tailOffset;
804 private static final long cleanMeOffset;
805 static {
806 try {
807 UNSAFE = sun.misc.Unsafe.getUnsafe();
808 Class<?> k = TransferQueue.class;
809 headOffset = UNSAFE.objectFieldOffset
810 (k.getDeclaredField("head"));
811 tailOffset = UNSAFE.objectFieldOffset
812 (k.getDeclaredField("tail"));
813 cleanMeOffset = UNSAFE.objectFieldOffset
814 (k.getDeclaredField("cleanMe"));
815 } catch (Exception e) {
816 throw new Error(e);
817 }
818 }
819 }
820
821 /**
822 * The transferer. Set only in constructor, but cannot be declared
823 * as final without further complicating serialization. Since
824 * this is accessed only at most once per public method, there
825 * isn't a noticeable performance penalty for using volatile
826 * instead of final here.
827 */
828 private transient volatile Transferer<E> transferer;
829
830 /**
831 * Creates a {@code SynchronousQueue} with nonfair access policy.
832 */
833 public SynchronousQueue() {
834 this(false);
835 }
836
837 /**
838 * Creates a {@code SynchronousQueue} with the specified fairness policy.
839 *
840 * @param fair if true, waiting threads contend in FIFO order for
841 * access; otherwise the order is unspecified.
842 */
843 public SynchronousQueue(boolean fair) {
844 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
845 }
846
847 /**
848 * Adds the specified element to this queue, waiting if necessary for
849 * another thread to receive it.
850 *
851 * @throws InterruptedException {@inheritDoc}
852 * @throws NullPointerException {@inheritDoc}
853 */
854 public void put(E e) throws InterruptedException {
855 if (e == null) throw new NullPointerException();
856 if (transferer.transfer(e, false, 0) == null) {
857 Thread.interrupted();
858 throw new InterruptedException();
859 }
860 }
861
862 /**
863 * Inserts the specified element into this queue, waiting if necessary
864 * up to the specified wait time for another thread to receive it.
865 *
866 * @return {@code true} if successful, or {@code false} if the
867 * specified waiting time elapses before a consumer appears
868 * @throws InterruptedException {@inheritDoc}
869 * @throws NullPointerException {@inheritDoc}
870 */
871 public boolean offer(E e, long timeout, TimeUnit unit)
872 throws InterruptedException {
873 if (e == null) throw new NullPointerException();
874 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
875 return true;
876 if (!Thread.interrupted())
877 return false;
878 throw new InterruptedException();
879 }
880
881 /**
882 * Inserts the specified element into this queue, if another thread is
883 * waiting to receive it.
884 *
885 * @param e the element to add
886 * @return {@code true} if the element was added to this queue, else
887 * {@code false}
888 * @throws NullPointerException if the specified element is null
889 */
890 public boolean offer(E e) {
891 if (e == null) throw new NullPointerException();
892 return transferer.transfer(e, true, 0) != null;
893 }
894
895 /**
896 * Retrieves and removes the head of this queue, waiting if necessary
897 * for another thread to insert it.
898 *
899 * @return the head of this queue
900 * @throws InterruptedException {@inheritDoc}
901 */
902 public E take() throws InterruptedException {
903 E e = transferer.transfer(null, false, 0);
904 if (e != null)
905 return e;
906 Thread.interrupted();
907 throw new InterruptedException();
908 }
909
910 /**
911 * Retrieves and removes the head of this queue, waiting
912 * if necessary up to the specified wait time, for another thread
913 * to insert it.
914 *
915 * @return the head of this queue, or {@code null} if the
916 * specified waiting time elapses before an element is present
917 * @throws InterruptedException {@inheritDoc}
918 */
919 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
920 E e = transferer.transfer(null, true, unit.toNanos(timeout));
921 if (e != null || !Thread.interrupted())
922 return e;
923 throw new InterruptedException();
924 }
925
926 /**
927 * Retrieves and removes the head of this queue, if another thread
928 * is currently making an element available.
929 *
930 * @return the head of this queue, or {@code null} if no
931 * element is available
932 */
933 public E poll() {
934 return transferer.transfer(null, true, 0);
935 }
936
937 /**
938 * Always returns {@code true}.
939 * A {@code SynchronousQueue} has no internal capacity.
940 *
941 * @return {@code true}
942 */
943 public boolean isEmpty() {
944 return true;
945 }
946
947 /**
948 * Always returns zero.
949 * A {@code SynchronousQueue} has no internal capacity.
950 *
951 * @return zero
952 */
953 public int size() {
954 return 0;
955 }
956
957 /**
958 * Always returns zero.
959 * A {@code SynchronousQueue} has no internal capacity.
960 *
961 * @return zero
962 */
963 public int remainingCapacity() {
964 return 0;
965 }
966
967 /**
968 * Does nothing.
969 * A {@code SynchronousQueue} has no internal capacity.
970 */
971 public void clear() {
972 }
973
974 /**
975 * Always returns {@code false}.
976 * A {@code SynchronousQueue} has no internal capacity.
977 *
978 * @param o the element
979 * @return {@code false}
980 */
981 public boolean contains(Object o) {
982 return false;
983 }
984
985 /**
986 * Always returns {@code false}.
987 * A {@code SynchronousQueue} has no internal capacity.
988 *
989 * @param o the element to remove
990 * @return {@code false}
991 */
992 public boolean remove(Object o) {
993 return false;
994 }
995
996 /**
997 * Returns {@code false} unless the given collection is empty.
998 * A {@code SynchronousQueue} has no internal capacity.
999 *
1000 * @param c the collection
1001 * @return {@code false} unless given collection is empty
1002 */
1003 public boolean containsAll(Collection<?> c) {
1004 return c.isEmpty();
1005 }
1006
1007 /**
1008 * Always returns {@code false}.
1009 * A {@code SynchronousQueue} has no internal capacity.
1010 *
1011 * @param c the collection
1012 * @return {@code false}
1013 */
1014 public boolean removeAll(Collection<?> c) {
1015 return false;
1016 }
1017
1018 /**
1019 * Always returns {@code false}.
1020 * A {@code SynchronousQueue} has no internal capacity.
1021 *
1022 * @param c the collection
1023 * @return {@code false}
1024 */
1025 public boolean retainAll(Collection<?> c) {
1026 return false;
1027 }
1028
1029 /**
1030 * Always returns {@code null}.
1031 * A {@code SynchronousQueue} does not return elements
1032 * unless actively waited on.
1033 *
1034 * @return {@code null}
1035 */
1036 public E peek() {
1037 return null;
1038 }
1039
1040 /**
1041 * Returns an empty iterator in which {@code hasNext} always returns
1042 * {@code false}.
1043 *
1044 * @return an empty iterator
1045 */
1046 public Iterator<E> iterator() {
1047 return Collections.emptyIterator();
1048 }
1049
1050 /**
1051 * Returns an empty spliterator in which calls to
1052 * {@link java.util.Spliterator#trySplit()} always return {@code null}.
1053 *
1054 * @return an empty spliterator
1055 * @since 1.8
1056 */
1057 public Spliterator<E> spliterator() {
1058 return Spliterators.emptySpliterator();
1059 }
1060
1061 /**
1062 * Returns a zero-length array.
1063 * @return a zero-length array
1064 */
1065 public Object[] toArray() {
1066 return new Object[0];
1067 }
1068
1069 /**
1070 * Sets the zeroth element of the specified array to {@code null}
1071 * (if the array has non-zero length) and returns it.
1072 *
1073 * @param a the array
1074 * @return the specified array
1075 * @throws NullPointerException if the specified array is null
1076 */
1077 public <T> T[] toArray(T[] a) {
1078 if (a.length > 0)
1079 a[0] = null;
1080 return a;
1081 }
1082
1083 /**
1084 * @throws UnsupportedOperationException {@inheritDoc}
1085 * @throws ClassCastException {@inheritDoc}
1086 * @throws NullPointerException {@inheritDoc}
1087 * @throws IllegalArgumentException {@inheritDoc}
1088 */
1089 public int drainTo(Collection<? super E> c) {
1090 if (c == null)
1091 throw new NullPointerException();
1092 if (c == this)
1093 throw new IllegalArgumentException();
1094 int n = 0;
1095 for (E e; (e = poll()) != null;) {
1096 c.add(e);
1097 ++n;
1098 }
1099 return n;
1100 }
1101
1102 /**
1103 * @throws UnsupportedOperationException {@inheritDoc}
1104 * @throws ClassCastException {@inheritDoc}
1105 * @throws NullPointerException {@inheritDoc}
1106 * @throws IllegalArgumentException {@inheritDoc}
1107 */
1108 public int drainTo(Collection<? super E> c, int maxElements) {
1109 if (c == null)
1110 throw new NullPointerException();
1111 if (c == this)
1112 throw new IllegalArgumentException();
1113 int n = 0;
1114 for (E e; n < maxElements && (e = poll()) != null;) {
1115 c.add(e);
1116 ++n;
1117 }
1118 return n;
1119 }
1120
1121 /*
1122 * To cope with serialization strategy in the 1.5 version of
1123 * SynchronousQueue, we declare some unused classes and fields
1124 * that exist solely to enable serializability across versions.
1125 * These fields are never used, so are initialized only if this
1126 * object is ever serialized or deserialized.
1127 */
1128
1129 @SuppressWarnings("serial")
1130 static class WaitQueue implements java.io.Serializable { }
1131 static class LifoWaitQueue extends WaitQueue {
1132 private static final long serialVersionUID = -3633113410248163686L;
1133 }
1134 static class FifoWaitQueue extends WaitQueue {
1135 private static final long serialVersionUID = -3623113410248163686L;
1136 }
1137 private ReentrantLock qlock;
1138 private WaitQueue waitingProducers;
1139 private WaitQueue waitingConsumers;
1140
1141 /**
1142 * Saves this queue to a stream (that is, serializes it).
1143 * @param s the stream
1144 * @throws java.io.IOException if an I/O error occurs
1145 */
1146 private void writeObject(java.io.ObjectOutputStream s)
1147 throws java.io.IOException {
1148 boolean fair = transferer instanceof TransferQueue;
1149 if (fair) {
1150 qlock = new ReentrantLock(true);
1151 waitingProducers = new FifoWaitQueue();
1152 waitingConsumers = new FifoWaitQueue();
1153 }
1154 else {
1155 qlock = new ReentrantLock();
1156 waitingProducers = new LifoWaitQueue();
1157 waitingConsumers = new LifoWaitQueue();
1158 }
1159 s.defaultWriteObject();
1160 }
1161
1162 /**
1163 * Reconstitutes this queue from a stream (that is, deserializes it).
1164 * @param s the stream
1165 * @throws ClassNotFoundException if the class of a serialized object
1166 * could not be found
1167 * @throws java.io.IOException if an I/O error occurs
1168 */
1169 private void readObject(java.io.ObjectInputStream s)
1170 throws java.io.IOException, ClassNotFoundException {
1171 s.defaultReadObject();
1172 if (waitingProducers instanceof FifoWaitQueue)
1173 transferer = new TransferQueue<E>();
1174 else
1175 transferer = new TransferStack<E>();
1176 }
1177
1178 }