ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.111
Committed: Sun Jan 4 01:06:15 2015 UTC (9 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.110: +3 -3 lines
Log Message:
use ReflectiveOperationException for Unsafe mechanics

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.55 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 jsr166 1.75 * http://creativecommons.org/publicdomain/zero/1.0/
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.107
10     import java.util.AbstractQueue;
11     import java.util.Collection;
12     import java.util.Collections;
13     import java.util.Iterator;
14 dl 1.93 import java.util.Spliterator;
15 dl 1.95 import java.util.Spliterators;
16 jsr166 1.109 import java.util.concurrent.locks.LockSupport;
17     import java.util.concurrent.locks.ReentrantLock;
18 tim 1.1
19     /**
20 jsr166 1.52 * A {@linkplain BlockingQueue blocking queue} in which each insert
21     * operation must wait for a corresponding remove operation by another
22     * thread, and vice versa. A synchronous queue does not have any
23     * internal capacity, not even a capacity of one. You cannot
24 jsr166 1.90 * {@code peek} at a synchronous queue because an element is only
25 jsr166 1.52 * present when you try to remove it; you cannot insert an element
26     * (using any method) unless another thread is trying to remove it;
27     * you cannot iterate as there is nothing to iterate. The
28     * <em>head</em> of the queue is the element that the first queued
29     * inserting thread is trying to add to the queue; if there is no such
30     * queued thread then no element is available for removal and
31 jsr166 1.90 * {@code poll()} will return {@code null}. For purposes of other
32     * {@code Collection} methods (for example {@code contains}), a
33     * {@code SynchronousQueue} acts as an empty collection. This queue
34     * does not permit {@code null} elements.
35 dl 1.18 *
36     * <p>Synchronous queues are similar to rendezvous channels used in
37     * CSP and Ada. They are well suited for handoff designs, in which an
38 dl 1.30 * object running in one thread must sync up with an object running
39 dl 1.18 * in another thread in order to hand it some information, event, or
40     * task.
41 dl 1.43 *
42 jsr166 1.88 * <p>This class supports an optional fairness policy for ordering
43 dl 1.43 * waiting producer and consumer threads. By default, this ordering
44     * is not guaranteed. However, a queue constructed with fairness set
45 jsr166 1.90 * to {@code true} grants threads access in FIFO order.
46 dl 1.43 *
47 dl 1.46 * <p>This class and its iterator implement all of the
48     * <em>optional</em> methods of the {@link Collection} and {@link
49 jsr166 1.48 * Iterator} interfaces.
50 dl 1.42 *
51     * <p>This class is a member of the
52 jsr166 1.66 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
53 dl 1.42 * Java Collections Framework</a>.
54     *
55 dl 1.6 * @since 1.5
56 dl 1.56 * @author Doug Lea and Bill Scherer and Michael Scott
57 jsr166 1.106 * @param <E> the type of elements held in this queue
58 dl 1.23 */
59 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
60 dl 1.55 implements BlockingQueue<E>, java.io.Serializable {
61 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
62 tim 1.1
63 dl 1.2 /*
64 dl 1.55 * This class implements extensions of the dual stack and dual
65     * queue algorithms described in "Nonblocking Concurrent Objects
66     * with Condition Synchronization", by W. N. Scherer III and
67     * M. L. Scott. 18th Annual Conf. on Distributed Computing,
68     * Oct. 2004 (see also
69     * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
70     * The (Lifo) stack is used for non-fair mode, and the (Fifo)
71     * queue for fair mode. The performance of the two is generally
72     * similar. Fifo usually supports higher throughput under
73     * contention but Lifo maintains higher thread locality in common
74     * applications.
75     *
76     * A dual queue (and similarly stack) is one that at any given
77     * time either holds "data" -- items provided by put operations,
78     * or "requests" -- slots representing take operations, or is
79     * empty. A call to "fulfill" (i.e., a call requesting an item
80     * from a queue holding data or vice versa) dequeues a
81     * complementary node. The most interesting feature of these
82     * queues is that any operation can figure out which mode the
83     * queue is in, and act accordingly without needing locks.
84     *
85     * Both the queue and stack extend abstract class Transferer
86     * defining the single method transfer that does a put or a
87     * take. These are unified into a single method because in dual
88     * data structures, the put and take operations are symmetrical,
89     * so nearly all code can be combined. The resulting transfer
90     * methods are on the long side, but are easier to follow than
91     * they would be if broken up into nearly-duplicated parts.
92     *
93     * The queue and stack data structures share many conceptual
94     * similarities but very few concrete details. For simplicity,
95     * they are kept distinct so that they can later evolve
96     * separately.
97     *
98     * The algorithms here differ from the versions in the above paper
99     * in extending them for use in synchronous queues, as well as
100     * dealing with cancellation. The main differences include:
101     *
102 jsr166 1.59 * 1. The original algorithms used bit-marked pointers, but
103 dl 1.55 * the ones here use mode bits in nodes, leading to a number
104     * of further adaptations.
105     * 2. SynchronousQueues must block threads waiting to become
106     * fulfilled.
107 jsr166 1.58 * 3. Support for cancellation via timeout and interrupts,
108     * including cleaning out cancelled nodes/threads
109 dl 1.56 * from lists to avoid garbage retention and memory depletion.
110 dl 1.55 *
111     * Blocking is mainly accomplished using LockSupport park/unpark,
112     * except that nodes that appear to be the next ones to become
113     * fulfilled first spin a bit (on multiprocessors only). On very
114     * busy synchronous queues, spinning can dramatically improve
115     * throughput. And on less busy ones, the amount of spinning is
116     * small enough not to be noticeable.
117     *
118     * Cleaning is done in different ways in queues vs stacks. For
119     * queues, we can almost always remove a node immediately in O(1)
120     * time (modulo retries for consistency checks) when it is
121     * cancelled. But if it may be pinned as the current tail, it must
122     * wait until some subsequent cancellation. For stacks, we need a
123     * potentially O(n) traversal to be sure that we can remove the
124     * node, but this can run concurrently with other threads
125     * accessing the stack.
126     *
127     * While garbage collection takes care of most node reclamation
128     * issues that otherwise complicate nonblocking algorithms, care
129 jsr166 1.59 * is taken to "forget" references to data, other nodes, and
130 dl 1.55 * threads that might be held on to long-term by blocked
131     * threads. In cases where setting to null would otherwise
132     * conflict with main algorithms, this is done by changing a
133     * node's link to now point to the node itself. This doesn't arise
134     * much for Stack nodes (because blocked threads do not hang on to
135     * old head pointers), but references in Queue nodes must be
136 jsr166 1.59 * aggressively forgotten to avoid reachability of everything any
137 dl 1.55 * node has ever referred to since arrival.
138     */
139 dl 1.2
140 dl 1.43 /**
141 dl 1.55 * Shared internal API for dual stacks and queues.
142 dl 1.43 */
143 jsr166 1.82 abstract static class Transferer<E> {
144 dl 1.55 /**
145 jsr166 1.59 * Performs a put or take.
146     *
147 dl 1.55 * @param e if non-null, the item to be handed to a consumer;
148 jsr166 1.59 * if null, requests that transfer return an item
149     * offered by producer.
150 dl 1.55 * @param timed if this operation should timeout
151     * @param nanos the timeout, in nanoseconds
152 jsr166 1.59 * @return if non-null, the item provided or received; if null,
153     * the operation failed due to timeout or interrupt --
154     * the caller can distinguish which of these occurred
155     * by checking Thread.interrupted.
156 dl 1.55 */
157 jsr166 1.82 abstract E transfer(E e, boolean timed, long nanos);
158 dl 1.43 }
159    
160 dl 1.55 /** The number of CPUs, for spin control */
161     static final int NCPUS = Runtime.getRuntime().availableProcessors();
162    
163 dl 1.43 /**
164 dl 1.55 * The number of times to spin before blocking in timed waits.
165     * The value is empirically derived -- it works well across a
166 dl 1.56 * variety of processors and OSes. Empirically, the best value
167 dl 1.55 * seems not to vary with number of CPUs (beyond 2) so is just
168     * a constant.
169 dl 1.43 */
170 jsr166 1.72 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
171 dl 1.43
172     /**
173 jsr166 1.60 * The number of times to spin before blocking in untimed waits.
174     * This is greater than timed value because untimed waits spin
175     * faster since they don't need to check times on each spin.
176 dl 1.43 */
177 dl 1.55 static final int maxUntimedSpins = maxTimedSpins * 16;
178 dl 1.43
179     /**
180 dl 1.55 * The number of nanoseconds for which it is faster to spin
181     * rather than to use timed park. A rough estimate suffices.
182 dl 1.43 */
183 dl 1.55 static final long spinForTimeoutThreshold = 1000L;
184    
185 jsr166 1.60 /** Dual stack */
186 jsr166 1.82 static final class TransferStack<E> extends Transferer<E> {
187 dl 1.55 /*
188     * This extends Scherer-Scott dual stack algorithm, differing,
189     * among other ways, by using "covering" nodes rather than
190     * bit-marked pointers: Fulfilling operations push on marker
191     * nodes (with FULFILLING bit set in mode) to reserve a spot
192     * to match a waiting node.
193     */
194 dl 1.43
195 dl 1.55 /* Modes for SNodes, ORed together in node fields */
196     /** Node represents an unfulfilled consumer */
197     static final int REQUEST = 0;
198     /** Node represents an unfulfilled producer */
199     static final int DATA = 1;
200     /** Node is fulfilling another unfulfilled DATA or REQUEST */
201     static final int FULFILLING = 2;
202    
203 jsr166 1.87 /** Returns true if m has fulfilling bit set. */
204 dl 1.55 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
205    
206     /** Node class for TransferStacks. */
207     static final class SNode {
208     volatile SNode next; // next node in stack
209     volatile SNode match; // the node matched to this
210     volatile Thread waiter; // to control park/unpark
211     Object item; // data; or null for REQUESTs
212     int mode;
213     // Note: item and mode fields don't need to be volatile
214     // since they are always written before, and read after,
215     // other volatile/atomic operations.
216    
217     SNode(Object item) {
218     this.item = item;
219     }
220    
221     boolean casNext(SNode cmp, SNode val) {
222 dl 1.69 return cmp == next &&
223     UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
224 dl 1.55 }
225    
226     /**
227 jsr166 1.63 * Tries to match node s to this node, if so, waking up thread.
228     * Fulfillers call tryMatch to identify their waiters.
229     * Waiters block until they have been matched.
230     *
231 dl 1.55 * @param s the node to match
232     * @return true if successfully matched to s
233     */
234     boolean tryMatch(SNode s) {
235     if (match == null &&
236 dl 1.69 UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
237 dl 1.55 Thread w = waiter;
238     if (w != null) { // waiters need at most one unpark
239     waiter = null;
240     LockSupport.unpark(w);
241     }
242     return true;
243 dl 1.47 }
244 dl 1.55 return match == s;
245     }
246    
247     /**
248 jsr166 1.59 * Tries to cancel a wait by matching node to itself.
249 dl 1.55 */
250     void tryCancel() {
251 dl 1.69 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
252 dl 1.55 }
253    
254     boolean isCancelled() {
255     return match == this;
256 dl 1.47 }
257 dl 1.69
258     // Unsafe mechanics
259 dl 1.73 private static final sun.misc.Unsafe UNSAFE;
260     private static final long matchOffset;
261     private static final long nextOffset;
262 dl 1.74
263 dl 1.73 static {
264     try {
265     UNSAFE = sun.misc.Unsafe.getUnsafe();
266 jsr166 1.77 Class<?> k = SNode.class;
267 dl 1.73 matchOffset = UNSAFE.objectFieldOffset
268     (k.getDeclaredField("match"));
269     nextOffset = UNSAFE.objectFieldOffset
270     (k.getDeclaredField("next"));
271 jsr166 1.111 } catch (ReflectiveOperationException e) {
272 dl 1.73 throw new Error(e);
273     }
274     }
275 dl 1.47 }
276 dl 1.43
277 dl 1.55 /** The head (top) of the stack */
278     volatile SNode head;
279 jsr166 1.70
280 dl 1.55 boolean casHead(SNode h, SNode nh) {
281 jsr166 1.70 return h == head &&
282 dl 1.69 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
283 dl 1.55 }
284 dl 1.2
285 dl 1.55 /**
286 jsr166 1.57 * Creates or resets fields of a node. Called only from transfer
287 dl 1.55 * where the node to push on stack is lazily created and
288     * reused when possible to help reduce intervals between reads
289     * and CASes of head and to avoid surges of garbage when CASes
290     * to push nodes fail due to contention.
291     */
292     static SNode snode(SNode s, Object e, SNode next, int mode) {
293     if (s == null) s = new SNode(e);
294     s.mode = mode;
295     s.next = next;
296     return s;
297 dl 1.43 }
298    
299 dl 1.55 /**
300 jsr166 1.57 * Puts or takes an item.
301 dl 1.55 */
302 jsr166 1.83 @SuppressWarnings("unchecked")
303 jsr166 1.82 E transfer(E e, boolean timed, long nanos) {
304 dl 1.55 /*
305     * Basic algorithm is to loop trying one of three actions:
306     *
307     * 1. If apparently empty or already containing nodes of same
308     * mode, try to push node on stack and wait for a match,
309     * returning it, or null if cancelled.
310     *
311     * 2. If apparently containing node of complementary mode,
312     * try to push a fulfilling node on to stack, match
313     * with corresponding waiting node, pop both from
314     * stack, and return matched item. The matching or
315     * unlinking might not actually be necessary because of
316 dl 1.62 * other threads performing action 3:
317 dl 1.55 *
318     * 3. If top of stack already holds another fulfilling node,
319     * help it out by doing its match and/or pop
320     * operations, and then continue. The code for helping
321     * is essentially the same as for fulfilling, except
322     * that it doesn't return the item.
323     */
324    
325     SNode s = null; // constructed/reused as needed
326 jsr166 1.72 int mode = (e == null) ? REQUEST : DATA;
327 dl 1.55
328     for (;;) {
329     SNode h = head;
330     if (h == null || h.mode == mode) { // empty or same-mode
331     if (timed && nanos <= 0) { // can't wait
332 jsr166 1.58 if (h != null && h.isCancelled())
333 dl 1.55 casHead(h, h.next); // pop cancelled node
334     else
335 jsr166 1.58 return null;
336 dl 1.55 } else if (casHead(h, s = snode(s, e, h, mode))) {
337     SNode m = awaitFulfill(s, timed, nanos);
338     if (m == s) { // wait was cancelled
339     clean(s);
340     return null;
341     }
342     if ((h = head) != null && h.next == s)
343     casHead(h, s.next); // help s's fulfiller
344 jsr166 1.83 return (E) ((mode == REQUEST) ? m.item : s.item);
345 dl 1.55 }
346     } else if (!isFulfilling(h.mode)) { // try to fulfill
347     if (h.isCancelled()) // already cancelled
348     casHead(h, h.next); // pop and retry
349     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
350     for (;;) { // loop until matched or waiters disappear
351     SNode m = s.next; // m is s's match
352     if (m == null) { // all waiters are gone
353     casHead(s, null); // pop fulfill node
354     s = null; // use new node next time
355     break; // restart main loop
356     }
357     SNode mn = m.next;
358     if (m.tryMatch(s)) {
359     casHead(s, mn); // pop both s and m
360 jsr166 1.83 return (E) ((mode == REQUEST) ? m.item : s.item);
361 dl 1.55 } else // lost match
362     s.casNext(m, mn); // help unlink
363     }
364     }
365     } else { // help a fulfiller
366     SNode m = h.next; // m is h's match
367     if (m == null) // waiter is gone
368     casHead(h, null); // pop fulfilling node
369     else {
370     SNode mn = m.next;
371     if (m.tryMatch(h)) // help match
372     casHead(h, mn); // pop both h and m
373     else // lost match
374     h.casNext(m, mn); // help unlink
375     }
376 dl 1.47 }
377     }
378     }
379    
380 dl 1.55 /**
381 jsr166 1.57 * Spins/blocks until node s is matched by a fulfill operation.
382 jsr166 1.63 *
383 dl 1.55 * @param s the waiting node
384     * @param timed true if timed wait
385     * @param nanos timeout value
386     * @return matched node, or s if cancelled
387     */
388     SNode awaitFulfill(SNode s, boolean timed, long nanos) {
389     /*
390     * When a node/thread is about to block, it sets its waiter
391     * field and then rechecks state at least one more time
392     * before actually parking, thus covering race vs
393 jsr166 1.59 * fulfiller noticing that waiter is non-null so should be
394 dl 1.55 * woken.
395     *
396     * When invoked by nodes that appear at the point of call
397     * to be at the head of the stack, calls to park are
398     * preceded by spins to avoid blocking when producers and
399     * consumers are arriving very close in time. This can
400     * happen enough to bother only on multiprocessors.
401     *
402     * The order of checks for returning out of main loop
403     * reflects fact that interrupts have precedence over
404     * normal returns, which have precedence over
405     * timeouts. (So, on timeout, one last check for match is
406     * done before giving up.) Except that calls from untimed
407     * SynchronousQueue.{poll/offer} don't check interrupts
408     * and don't wait at all, so are trapped in transfer
409     * method rather than calling awaitFulfill.
410     */
411 jsr166 1.85 final long deadline = timed ? System.nanoTime() + nanos : 0L;
412 dl 1.55 Thread w = Thread.currentThread();
413 jsr166 1.105 int spins = shouldSpin(s)
414     ? (timed ? maxTimedSpins : maxUntimedSpins)
415     : 0;
416 dl 1.55 for (;;) {
417     if (w.isInterrupted())
418     s.tryCancel();
419     SNode m = s.match;
420     if (m != null)
421     return m;
422     if (timed) {
423 jsr166 1.85 nanos = deadline - System.nanoTime();
424     if (nanos <= 0L) {
425 dl 1.55 s.tryCancel();
426     continue;
427     }
428     }
429     if (spins > 0)
430 jsr166 1.108 spins = shouldSpin(s) ? (spins - 1) : 0;
431 dl 1.55 else if (s.waiter == null)
432     s.waiter = w; // establish waiter so can park next iter
433     else if (!timed)
434     LockSupport.park(this);
435     else if (nanos > spinForTimeoutThreshold)
436     LockSupport.parkNanos(this, nanos);
437 dl 1.47 }
438     }
439 dl 1.2
440 dl 1.55 /**
441 jsr166 1.57 * Returns true if node s is at head or there is an active
442 dl 1.55 * fulfiller.
443     */
444     boolean shouldSpin(SNode s) {
445     SNode h = head;
446 dl 1.56 return (h == s || h == null || isFulfilling(h.mode));
447 dl 1.55 }
448    
449     /**
450 jsr166 1.57 * Unlinks s from the stack.
451 dl 1.55 */
452     void clean(SNode s) {
453 jsr166 1.58 s.item = null; // forget item
454 dl 1.55 s.waiter = null; // forget thread
455    
456     /*
457     * At worst we may need to traverse entire stack to unlink
458     * s. If there are multiple concurrent calls to clean, we
459     * might not see s if another thread has already removed
460     * it. But we can stop when we see any node known to
461     * follow s. We use s.next unless it too is cancelled, in
462     * which case we try the node one past. We don't check any
463 jsr166 1.59 * further because we don't want to doubly traverse just to
464 dl 1.55 * find sentinel.
465     */
466    
467     SNode past = s.next;
468     if (past != null && past.isCancelled())
469     past = past.next;
470    
471     // Absorb cancelled nodes at head
472     SNode p;
473     while ((p = head) != null && p != past && p.isCancelled())
474     casHead(p, p.next);
475    
476     // Unsplice embedded nodes
477     while (p != null && p != past) {
478     SNode n = p.next;
479     if (n != null && n.isCancelled())
480     p.casNext(n, n.next);
481     else
482     p = n;
483 dl 1.47 }
484     }
485 dl 1.69
486     // Unsafe mechanics
487 dl 1.73 private static final sun.misc.Unsafe UNSAFE;
488     private static final long headOffset;
489     static {
490     try {
491     UNSAFE = sun.misc.Unsafe.getUnsafe();
492 jsr166 1.77 Class<?> k = TransferStack.class;
493 dl 1.73 headOffset = UNSAFE.objectFieldOffset
494     (k.getDeclaredField("head"));
495 jsr166 1.111 } catch (ReflectiveOperationException e) {
496 dl 1.73 throw new Error(e);
497     }
498     }
499 dl 1.47 }
500 jsr166 1.48
501 jsr166 1.61 /** Dual Queue */
502 jsr166 1.82 static final class TransferQueue<E> extends Transferer<E> {
503 dl 1.55 /*
504     * This extends Scherer-Scott dual queue algorithm, differing,
505     * among other ways, by using modes within nodes rather than
506     * marked pointers. The algorithm is a little simpler than
507     * that for stacks because fulfillers do not need explicit
508     * nodes, and matching is done by CAS'ing QNode.item field
509 jsr166 1.59 * from non-null to null (for put) or vice versa (for take).
510 dl 1.55 */
511 dl 1.53
512 dl 1.55 /** Node class for TransferQueue. */
513     static final class QNode {
514     volatile QNode next; // next node in queue
515     volatile Object item; // CAS'ed to or from null
516     volatile Thread waiter; // to control park/unpark
517 jsr166 1.58 final boolean isData;
518 dl 1.35
519 dl 1.55 QNode(Object item, boolean isData) {
520     this.item = item;
521     this.isData = isData;
522     }
523 dl 1.35
524 dl 1.55 boolean casNext(QNode cmp, QNode val) {
525 dl 1.69 return next == cmp &&
526     UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
527 dl 1.55 }
528    
529     boolean casItem(Object cmp, Object val) {
530 dl 1.69 return item == cmp &&
531     UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
532 dl 1.55 }
533    
534     /**
535 jsr166 1.59 * Tries to cancel by CAS'ing ref to this as item.
536 dl 1.55 */
537     void tryCancel(Object cmp) {
538 dl 1.69 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
539 dl 1.55 }
540 jsr166 1.70
541 dl 1.55 boolean isCancelled() {
542     return item == this;
543     }
544 dl 1.56
545 jsr166 1.58 /**
546 jsr166 1.57 * Returns true if this node is known to be off the queue
547 dl 1.56 * because its next pointer has been forgotten due to
548     * an advanceHead operation.
549     */
550     boolean isOffList() {
551     return next == this;
552     }
553 dl 1.74
554 dl 1.69 // Unsafe mechanics
555 dl 1.73 private static final sun.misc.Unsafe UNSAFE;
556     private static final long itemOffset;
557     private static final long nextOffset;
558 dl 1.74
559 dl 1.73 static {
560     try {
561     UNSAFE = sun.misc.Unsafe.getUnsafe();
562 jsr166 1.77 Class<?> k = QNode.class;
563 dl 1.73 itemOffset = UNSAFE.objectFieldOffset
564     (k.getDeclaredField("item"));
565     nextOffset = UNSAFE.objectFieldOffset
566     (k.getDeclaredField("next"));
567     } catch (Exception e) {
568     throw new Error(e);
569     }
570     }
571 dl 1.31 }
572    
573 dl 1.55 /** Head of queue */
574     transient volatile QNode head;
575     /** Tail of queue */
576     transient volatile QNode tail;
577 dl 1.31 /**
578 dl 1.55 * Reference to a cancelled node that might not yet have been
579     * unlinked from queue because it was the last inserted node
580 jsr166 1.91 * when it was cancelled.
581 dl 1.31 */
582 dl 1.55 transient volatile QNode cleanMe;
583    
584     TransferQueue() {
585     QNode h = new QNode(null, false); // initialize to dummy node.
586     head = h;
587     tail = h;
588 dl 1.31 }
589    
590     /**
591 jsr166 1.59 * Tries to cas nh as new head; if successful, unlink
592 dl 1.55 * old head's next node to avoid garbage retention.
593 dl 1.31 */
594 dl 1.55 void advanceHead(QNode h, QNode nh) {
595 jsr166 1.70 if (h == head &&
596 dl 1.69 UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
597 dl 1.55 h.next = h; // forget old next
598 dl 1.31 }
599    
600     /**
601 jsr166 1.57 * Tries to cas nt as new tail.
602 dl 1.31 */
603 dl 1.55 void advanceTail(QNode t, QNode nt) {
604     if (tail == t)
605 dl 1.69 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
606 dl 1.31 }
607 dl 1.2
608     /**
609 jsr166 1.57 * Tries to CAS cleanMe slot.
610 dl 1.2 */
611 dl 1.55 boolean casCleanMe(QNode cmp, QNode val) {
612 dl 1.69 return cleanMe == cmp &&
613     UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
614 dl 1.35 }
615    
616     /**
617 jsr166 1.57 * Puts or takes an item.
618 dl 1.35 */
619 jsr166 1.83 @SuppressWarnings("unchecked")
620 jsr166 1.82 E transfer(E e, boolean timed, long nanos) {
621 jsr166 1.58 /* Basic algorithm is to loop trying to take either of
622 dl 1.55 * two actions:
623     *
624 jsr166 1.58 * 1. If queue apparently empty or holding same-mode nodes,
625 dl 1.55 * try to add node to queue of waiters, wait to be
626     * fulfilled (or cancelled) and return matching item.
627     *
628     * 2. If queue apparently contains waiting items, and this
629     * call is of complementary mode, try to fulfill by CAS'ing
630     * item field of waiting node and dequeuing it, and then
631     * returning matching item.
632     *
633     * In each case, along the way, check for and try to help
634     * advance head and tail on behalf of other stalled/slow
635     * threads.
636     *
637     * The loop starts off with a null check guarding against
638     * seeing uninitialized head or tail values. This never
639     * happens in current SynchronousQueue, but could if
640     * callers held non-volatile/final ref to the
641     * transferer. The check is here anyway because it places
642     * null checks at top of loop, which is usually faster
643     * than having them implicitly interspersed.
644     */
645    
646     QNode s = null; // constructed/reused as needed
647     boolean isData = (e != null);
648    
649     for (;;) {
650     QNode t = tail;
651     QNode h = head;
652 dl 1.62 if (t == null || h == null) // saw uninitialized value
653 dl 1.55 continue; // spin
654    
655     if (h == t || t.isData == isData) { // empty or same-mode
656     QNode tn = t.next;
657     if (t != tail) // inconsistent read
658     continue;
659     if (tn != null) { // lagging tail
660     advanceTail(t, tn);
661     continue;
662     }
663     if (timed && nanos <= 0) // can't wait
664     return null;
665     if (s == null)
666     s = new QNode(e, isData);
667     if (!t.casNext(null, s)) // failed to link in
668     continue;
669    
670     advanceTail(t, s); // swing tail and wait
671     Object x = awaitFulfill(s, e, timed, nanos);
672     if (x == s) { // wait was cancelled
673     clean(t, s);
674     return null;
675     }
676    
677 dl 1.56 if (!s.isOffList()) { // not already unlinked
678     advanceHead(t, s); // unlink if head
679 dl 1.55 if (x != null) // and forget fields
680     s.item = s;
681     s.waiter = null;
682     }
683 jsr166 1.83 return (x != null) ? (E)x : e;
684 dl 1.55
685     } else { // complementary-mode
686     QNode m = h.next; // node to fulfill
687     if (t != tail || m == null || h != head)
688     continue; // inconsistent read
689    
690     Object x = m.item;
691     if (isData == (x != null) || // m already fulfilled
692     x == m || // m cancelled
693     !m.casItem(x, e)) { // lost CAS
694     advanceHead(h, m); // dequeue and retry
695     continue;
696     }
697    
698     advanceHead(h, m); // successfully fulfilled
699     LockSupport.unpark(m.waiter);
700 jsr166 1.83 return (x != null) ? (E)x : e;
701 dl 1.55 }
702 dl 1.35 }
703     }
704    
705     /**
706 jsr166 1.57 * Spins/blocks until node s is fulfilled.
707 jsr166 1.63 *
708 dl 1.55 * @param s the waiting node
709     * @param e the comparison value for checking match
710     * @param timed true if timed wait
711     * @param nanos timeout value
712     * @return matched item, or s if cancelled
713 dl 1.35 */
714 jsr166 1.82 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
715 dl 1.55 /* Same idea as TransferStack.awaitFulfill */
716 jsr166 1.85 final long deadline = timed ? System.nanoTime() + nanos : 0L;
717 dl 1.55 Thread w = Thread.currentThread();
718 jsr166 1.105 int spins = (head.next == s)
719     ? (timed ? maxTimedSpins : maxUntimedSpins)
720     : 0;
721 dl 1.55 for (;;) {
722     if (w.isInterrupted())
723     s.tryCancel(e);
724     Object x = s.item;
725     if (x != e)
726     return x;
727     if (timed) {
728 jsr166 1.85 nanos = deadline - System.nanoTime();
729     if (nanos <= 0L) {
730 dl 1.55 s.tryCancel(e);
731     continue;
732     }
733     }
734     if (spins > 0)
735     --spins;
736     else if (s.waiter == null)
737     s.waiter = w;
738     else if (!timed)
739     LockSupport.park(this);
740     else if (nanos > spinForTimeoutThreshold)
741     LockSupport.parkNanos(this, nanos);
742 dl 1.35 }
743 dl 1.31 }
744    
745     /**
746 jsr166 1.57 * Gets rid of cancelled node s with original predecessor pred.
747 dl 1.31 */
748 dl 1.55 void clean(QNode pred, QNode s) {
749     s.waiter = null; // forget thread
750     /*
751     * At any given time, exactly one node on list cannot be
752     * deleted -- the last inserted node. To accommodate this,
753     * if we cannot delete s, we save its predecessor as
754     * "cleanMe", deleting the previously saved version
755     * first. At least one of node s or the node previously
756     * saved can always be deleted, so this always terminates.
757     */
758     while (pred.next == s) { // Return early if already unlinked
759     QNode h = head;
760     QNode hn = h.next; // Absorb cancelled first node as head
761     if (hn != null && hn.isCancelled()) {
762     advanceHead(h, hn);
763     continue;
764     }
765 jsr166 1.68 QNode t = tail; // Ensure consistent read for tail
766 dl 1.55 if (t == h)
767     return;
768 jsr166 1.68 QNode tn = t.next;
769     if (t != tail)
770 dl 1.55 continue;
771     if (tn != null) {
772     advanceTail(t, tn);
773     continue;
774     }
775     if (s != t) { // If not tail, try to unsplice
776     QNode sn = s.next;
777     if (sn == s || pred.casNext(s, sn))
778     return;
779     }
780     QNode dp = cleanMe;
781     if (dp != null) { // Try unlinking previous cancelled node
782     QNode d = dp.next;
783     QNode dn;
784     if (d == null || // d is gone or
785     d == dp || // d is off list or
786     !d.isCancelled() || // d not cancelled or
787     (d != t && // d not tail and
788     (dn = d.next) != null && // has successor
789     dn != d && // that is on list
790     dp.casNext(d, dn))) // d unspliced
791 jsr166 1.58 casCleanMe(dp, null);
792     if (dp == pred)
793 dl 1.55 return; // s is already saved node
794 jsr166 1.58 } else if (casCleanMe(null, pred))
795 dl 1.55 return; // Postpone cleaning s
796 dl 1.2 }
797     }
798 dl 1.69
799 dl 1.73 private static final sun.misc.Unsafe UNSAFE;
800     private static final long headOffset;
801     private static final long tailOffset;
802     private static final long cleanMeOffset;
803     static {
804     try {
805     UNSAFE = sun.misc.Unsafe.getUnsafe();
806 jsr166 1.77 Class<?> k = TransferQueue.class;
807 dl 1.73 headOffset = UNSAFE.objectFieldOffset
808     (k.getDeclaredField("head"));
809     tailOffset = UNSAFE.objectFieldOffset
810     (k.getDeclaredField("tail"));
811     cleanMeOffset = UNSAFE.objectFieldOffset
812     (k.getDeclaredField("cleanMe"));
813 jsr166 1.111 } catch (ReflectiveOperationException e) {
814 dl 1.73 throw new Error(e);
815     }
816     }
817 dl 1.55 }
818    
819     /**
820     * The transferer. Set only in constructor, but cannot be declared
821     * as final without further complicating serialization. Since
822 dl 1.56 * this is accessed only at most once per public method, there
823     * isn't a noticeable performance penalty for using volatile
824     * instead of final here.
825 dl 1.55 */
826 jsr166 1.82 private transient volatile Transferer<E> transferer;
827 dl 1.55
828     /**
829 jsr166 1.90 * Creates a {@code SynchronousQueue} with nonfair access policy.
830 dl 1.55 */
831     public SynchronousQueue() {
832     this(false);
833     }
834 dl 1.2
835 dl 1.55 /**
836 jsr166 1.90 * Creates a {@code SynchronousQueue} with the specified fairness policy.
837 jsr166 1.63 *
838     * @param fair if true, waiting threads contend in FIFO order for
839     * access; otherwise the order is unspecified.
840 dl 1.55 */
841     public SynchronousQueue(boolean fair) {
842 jsr166 1.82 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
843 dl 1.2 }
844    
845     /**
846 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
847     * another thread to receive it.
848 jsr166 1.50 *
849     * @throws InterruptedException {@inheritDoc}
850     * @throws NullPointerException {@inheritDoc}
851 tim 1.10 */
852 jsr166 1.82 public void put(E e) throws InterruptedException {
853     if (e == null) throw new NullPointerException();
854     if (transferer.transfer(e, false, 0) == null) {
855 jsr166 1.68 Thread.interrupted();
856 dl 1.55 throw new InterruptedException();
857 jsr166 1.68 }
858 tim 1.1 }
859    
860 dholmes 1.11 /**
861 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
862 dl 1.18 * up to the specified wait time for another thread to receive it.
863 jsr166 1.50 *
864 jsr166 1.90 * @return {@code true} if successful, or {@code false} if the
865 jsr166 1.92 * specified waiting time elapses before a consumer appears
866 jsr166 1.50 * @throws InterruptedException {@inheritDoc}
867     * @throws NullPointerException {@inheritDoc}
868 dholmes 1.11 */
869 jsr166 1.82 public boolean offer(E e, long timeout, TimeUnit unit)
870 dl 1.55 throws InterruptedException {
871 jsr166 1.82 if (e == null) throw new NullPointerException();
872     if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
873 dl 1.55 return true;
874     if (!Thread.interrupted())
875     return false;
876     throw new InterruptedException();
877     }
878    
879     /**
880     * Inserts the specified element into this queue, if another thread is
881     * waiting to receive it.
882     *
883     * @param e the element to add
884 jsr166 1.90 * @return {@code true} if the element was added to this queue, else
885     * {@code false}
886 dl 1.55 * @throws NullPointerException if the specified element is null
887     */
888     public boolean offer(E e) {
889 jsr166 1.49 if (e == null) throw new NullPointerException();
890 dl 1.55 return transferer.transfer(e, true, 0) != null;
891 tim 1.1 }
892    
893 dholmes 1.11 /**
894     * Retrieves and removes the head of this queue, waiting if necessary
895     * for another thread to insert it.
896 jsr166 1.50 *
897 dholmes 1.11 * @return the head of this queue
898 jsr166 1.50 * @throws InterruptedException {@inheritDoc}
899 dholmes 1.11 */
900 dl 1.2 public E take() throws InterruptedException {
901 jsr166 1.82 E e = transferer.transfer(null, false, 0);
902 dl 1.55 if (e != null)
903 jsr166 1.82 return e;
904 jsr166 1.68 Thread.interrupted();
905 dl 1.55 throw new InterruptedException();
906 tim 1.1 }
907 dl 1.2
908 dholmes 1.11 /**
909     * Retrieves and removes the head of this queue, waiting
910     * if necessary up to the specified wait time, for another thread
911     * to insert it.
912 jsr166 1.50 *
913 jsr166 1.90 * @return the head of this queue, or {@code null} if the
914 jsr166 1.92 * specified waiting time elapses before an element is present
915 jsr166 1.50 * @throws InterruptedException {@inheritDoc}
916 dholmes 1.11 */
917 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
918 jsr166 1.82 E e = transferer.transfer(null, true, unit.toNanos(timeout));
919 dl 1.55 if (e != null || !Thread.interrupted())
920 jsr166 1.82 return e;
921 dl 1.55 throw new InterruptedException();
922 tim 1.1 }
923 dl 1.2
924 dl 1.18 /**
925     * Retrieves and removes the head of this queue, if another thread
926     * is currently making an element available.
927     *
928 jsr166 1.90 * @return the head of this queue, or {@code null} if no
929 jsr166 1.92 * element is available
930 dl 1.18 */
931 dl 1.2 public E poll() {
932 jsr166 1.82 return transferer.transfer(null, true, 0);
933 tim 1.1 }
934 dl 1.2
935 dl 1.5 /**
936 jsr166 1.90 * Always returns {@code true}.
937     * A {@code SynchronousQueue} has no internal capacity.
938 jsr166 1.63 *
939 jsr166 1.90 * @return {@code true}
940 dl 1.5 */
941     public boolean isEmpty() {
942     return true;
943     }
944    
945     /**
946 dholmes 1.11 * Always returns zero.
947 jsr166 1.90 * A {@code SynchronousQueue} has no internal capacity.
948 jsr166 1.63 *
949 jsr166 1.89 * @return zero
950 dl 1.5 */
951     public int size() {
952     return 0;
953 tim 1.1 }
954 dl 1.2
955 dl 1.5 /**
956 dholmes 1.11 * Always returns zero.
957 jsr166 1.90 * A {@code SynchronousQueue} has no internal capacity.
958 jsr166 1.63 *
959 jsr166 1.89 * @return zero
960 dl 1.5 */
961     public int remainingCapacity() {
962     return 0;
963     }
964    
965     /**
966 dholmes 1.11 * Does nothing.
967 jsr166 1.90 * A {@code SynchronousQueue} has no internal capacity.
968 dholmes 1.11 */
969 dl 1.55 public void clear() {
970     }
971 dholmes 1.11
972     /**
973 jsr166 1.90 * Always returns {@code false}.
974     * A {@code SynchronousQueue} has no internal capacity.
975 jsr166 1.63 *
976 dl 1.55 * @param o the element
977 jsr166 1.90 * @return {@code false}
978 dholmes 1.11 */
979     public boolean contains(Object o) {
980     return false;
981     }
982    
983     /**
984 jsr166 1.90 * Always returns {@code false}.
985     * A {@code SynchronousQueue} has no internal capacity.
986 dl 1.18 *
987     * @param o the element to remove
988 jsr166 1.90 * @return {@code false}
989 dl 1.18 */
990     public boolean remove(Object o) {
991     return false;
992     }
993    
994     /**
995 jsr166 1.90 * Returns {@code false} unless the given collection is empty.
996     * A {@code SynchronousQueue} has no internal capacity.
997 jsr166 1.63 *
998 dl 1.18 * @param c the collection
999 jsr166 1.90 * @return {@code false} unless given collection is empty
1000 dholmes 1.11 */
1001 dl 1.12 public boolean containsAll(Collection<?> c) {
1002 dl 1.16 return c.isEmpty();
1003 dholmes 1.11 }
1004    
1005     /**
1006 jsr166 1.90 * Always returns {@code false}.
1007     * A {@code SynchronousQueue} has no internal capacity.
1008 jsr166 1.63 *
1009 dl 1.18 * @param c the collection
1010 jsr166 1.90 * @return {@code false}
1011 dholmes 1.11 */
1012 dl 1.12 public boolean removeAll(Collection<?> c) {
1013 dholmes 1.11 return false;
1014     }
1015    
1016     /**
1017 jsr166 1.90 * Always returns {@code false}.
1018     * A {@code SynchronousQueue} has no internal capacity.
1019 jsr166 1.63 *
1020 dl 1.18 * @param c the collection
1021 jsr166 1.90 * @return {@code false}
1022 dholmes 1.11 */
1023 dl 1.12 public boolean retainAll(Collection<?> c) {
1024 dholmes 1.11 return false;
1025     }
1026    
1027     /**
1028 jsr166 1.90 * Always returns {@code null}.
1029     * A {@code SynchronousQueue} does not return elements
1030 dl 1.5 * unless actively waited on.
1031 jsr166 1.63 *
1032 jsr166 1.90 * @return {@code null}
1033 dl 1.5 */
1034     public E peek() {
1035     return null;
1036     }
1037    
1038     /**
1039 jsr166 1.90 * Returns an empty iterator in which {@code hasNext} always returns
1040     * {@code false}.
1041 tim 1.13 *
1042 dholmes 1.11 * @return an empty iterator
1043 dl 1.5 */
1044 dl 1.2 public Iterator<E> iterator() {
1045 jsr166 1.102 return Collections.emptyIterator();
1046 tim 1.1 }
1047    
1048 jsr166 1.101 /**
1049     * Returns an empty spliterator in which calls to
1050     * {@link java.util.Spliterator#trySplit()} always return {@code null}.
1051     *
1052     * @return an empty spliterator
1053     * @since 1.8
1054     */
1055 dl 1.96 public Spliterator<E> spliterator() {
1056 dl 1.95 return Spliterators.emptySpliterator();
1057 dl 1.93 }
1058 jsr166 1.94
1059 dl 1.5 /**
1060 dholmes 1.11 * Returns a zero-length array.
1061     * @return a zero-length array
1062 dl 1.5 */
1063 dl 1.3 public Object[] toArray() {
1064 dl 1.25 return new Object[0];
1065 tim 1.1 }
1066    
1067 dholmes 1.11 /**
1068 jsr166 1.103 * Sets the zeroth element of the specified array to {@code null}
1069 dholmes 1.11 * (if the array has non-zero length) and returns it.
1070 jsr166 1.50 *
1071 dl 1.40 * @param a the array
1072 dholmes 1.11 * @return the specified array
1073 jsr166 1.50 * @throws NullPointerException if the specified array is null
1074 dholmes 1.11 */
1075 dl 1.2 public <T> T[] toArray(T[] a) {
1076     if (a.length > 0)
1077     a[0] = null;
1078     return a;
1079     }
1080 dl 1.21
1081 jsr166 1.50 /**
1082     * @throws UnsupportedOperationException {@inheritDoc}
1083     * @throws ClassCastException {@inheritDoc}
1084     * @throws NullPointerException {@inheritDoc}
1085     * @throws IllegalArgumentException {@inheritDoc}
1086     */
1087 dl 1.21 public int drainTo(Collection<? super E> c) {
1088     if (c == null)
1089     throw new NullPointerException();
1090     if (c == this)
1091     throw new IllegalArgumentException();
1092     int n = 0;
1093 jsr166 1.80 for (E e; (e = poll()) != null;) {
1094 dl 1.21 c.add(e);
1095     ++n;
1096     }
1097     return n;
1098     }
1099    
1100 jsr166 1.50 /**
1101     * @throws UnsupportedOperationException {@inheritDoc}
1102     * @throws ClassCastException {@inheritDoc}
1103     * @throws NullPointerException {@inheritDoc}
1104     * @throws IllegalArgumentException {@inheritDoc}
1105     */
1106 dl 1.21 public int drainTo(Collection<? super E> c, int maxElements) {
1107     if (c == null)
1108     throw new NullPointerException();
1109     if (c == this)
1110     throw new IllegalArgumentException();
1111     int n = 0;
1112 jsr166 1.80 for (E e; n < maxElements && (e = poll()) != null;) {
1113 dl 1.21 c.add(e);
1114     ++n;
1115     }
1116     return n;
1117     }
1118 dl 1.55
1119     /*
1120     * To cope with serialization strategy in the 1.5 version of
1121     * SynchronousQueue, we declare some unused classes and fields
1122     * that exist solely to enable serializability across versions.
1123     * These fields are never used, so are initialized only if this
1124     * object is ever serialized or deserialized.
1125     */
1126    
1127 jsr166 1.82 @SuppressWarnings("serial")
1128 dl 1.55 static class WaitQueue implements java.io.Serializable { }
1129     static class LifoWaitQueue extends WaitQueue {
1130     private static final long serialVersionUID = -3633113410248163686L;
1131     }
1132     static class FifoWaitQueue extends WaitQueue {
1133     private static final long serialVersionUID = -3623113410248163686L;
1134     }
1135     private ReentrantLock qlock;
1136     private WaitQueue waitingProducers;
1137     private WaitQueue waitingConsumers;
1138    
1139     /**
1140 jsr166 1.84 * Saves this queue to a stream (that is, serializes it).
1141 jsr166 1.99 * @param s the stream
1142 jsr166 1.100 * @throws java.io.IOException if an I/O error occurs
1143 dl 1.55 */
1144     private void writeObject(java.io.ObjectOutputStream s)
1145     throws java.io.IOException {
1146     boolean fair = transferer instanceof TransferQueue;
1147     if (fair) {
1148     qlock = new ReentrantLock(true);
1149     waitingProducers = new FifoWaitQueue();
1150     waitingConsumers = new FifoWaitQueue();
1151     }
1152     else {
1153     qlock = new ReentrantLock();
1154     waitingProducers = new LifoWaitQueue();
1155     waitingConsumers = new LifoWaitQueue();
1156     }
1157     s.defaultWriteObject();
1158     }
1159    
1160 jsr166 1.84 /**
1161     * Reconstitutes this queue from a stream (that is, deserializes it).
1162 jsr166 1.99 * @param s the stream
1163 jsr166 1.100 * @throws ClassNotFoundException if the class of a serialized object
1164     * could not be found
1165     * @throws java.io.IOException if an I/O error occurs
1166 jsr166 1.84 */
1167 jsr166 1.98 private void readObject(java.io.ObjectInputStream s)
1168 dl 1.55 throws java.io.IOException, ClassNotFoundException {
1169     s.defaultReadObject();
1170     if (waitingProducers instanceof FifoWaitQueue)
1171 jsr166 1.82 transferer = new TransferQueue<E>();
1172 dl 1.55 else
1173 jsr166 1.82 transferer = new TransferStack<E>();
1174 dl 1.55 }
1175    
1176 tim 1.1 }