ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SynchronousQueue.java (file contents):
Revision 1.54 by jsr166, Sun Jun 19 23:13:42 2005 UTC vs.
Revision 1.55 by dl, Mon Aug 1 15:26:40 2005 UTC

# Line 1 | Line 1
1   /*
2 < * Written by Doug Lea with assistance from members of JCP JSR-166
3 < * Expert Group and released to the public domain, as explained at
2 > * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 > * assistance from members of JCP JSR-166 Expert Group and released to
4 > * the public domain, as explained at
5   * http://creativecommons.org/licenses/publicdomain
6   */
7  
8   package java.util.concurrent;
9   import java.util.concurrent.locks.*;
10 + import java.util.concurrent.atomic.*;
11   import java.util.*;
12  
13   /**
# Line 34 | Line 36 | import java.util.*;
36   * <p> This class supports an optional fairness policy for ordering
37   * waiting producer and consumer threads.  By default, this ordering
38   * is not guaranteed. However, a queue constructed with fairness set
39 < * to <tt>true</tt> grants threads access in FIFO order. Fairness
38 < * generally decreases throughput but reduces variability and avoids
39 < * starvation.
39 > * to <tt>true</tt> grants threads access in FIFO order.
40   *
41   * <p>This class and its iterator implement all of the
42   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 51 | Line 51 | import java.util.*;
51   * @param <E> the type of elements held in this collection
52   */
53   public class SynchronousQueue<E> extends AbstractQueue<E>
54 <        implements BlockingQueue<E>, java.io.Serializable {
54 >    implements BlockingQueue<E>, java.io.Serializable {
55      private static final long serialVersionUID = -3223113410248163686L;
56  
57      /*
58 <      This implementation divides actions into two cases for puts:
59 <
60 <      * An arriving producer that does not already have a waiting consumer
61 <        creates a node holding item, and then waits for a consumer to take it.
62 <      * An arriving producer that does already have a waiting consumer fills
63 <        the slot node created by the consumer, and notifies it to continue.
64 <
65 <      And symmetrically, two for takes:
66 <
67 <      * An arriving consumer that does not already have a waiting producer
68 <        creates an empty slot node, and then waits for a producer to fill it.
69 <      * An arriving consumer that does already have a waiting producer takes
70 <        item from the node created by the producer, and notifies it to continue.
71 <
72 <      When a put or take waiting for the actions of its counterpart
73 <      aborts due to interruption or timeout, it marks the node
74 <      it created as "CANCELLED", which causes its counterpart to retry
75 <      the entire put or take sequence.
76 <
77 <      This requires keeping two simple queues, waitingProducers and
78 <      waitingConsumers. Each of these can be FIFO (preserves fairness)
79 <      or LIFO (improves throughput).
80 <    */
81 <
82 <    /** Lock protecting both wait queues */
83 <    private final ReentrantLock qlock;
84 <    /** Queue holding waiting puts */
85 <    private final WaitQueue waitingProducers;
86 <    /** Queue holding waiting takes */
87 <    private final WaitQueue waitingConsumers;
88 <
89 <    /**
90 <     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
58 >     * This class implements extensions of the dual stack and dual
59 >     * queue algorithms described in "Nonblocking Concurrent Objects
60 >     * with Condition Synchronization", by W. N. Scherer III and
61 >     * M. L. Scott.  18th Annual Conf. on Distributed Computing,
62 >     * Oct. 2004 (see also
63 >     * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
64 >     * The (Lifo) stack is used for non-fair mode, and the (Fifo)
65 >     * queue for fair mode. The performance of the two is generally
66 >     * similar. Fifo usually supports higher throughput under
67 >     * contention but Lifo maintains higher thread locality in common
68 >     * applications.
69 >     *
70 >     * A dual queue (and similarly stack) is one that at any given
71 >     * time either holds "data" -- items provided by put operations,
72 >     * or "requests" -- slots representing take operations, or is
73 >     * empty. A call to "fulfill" (i.e., a call requesting an item
74 >     * from a queue holding data or vice versa) dequeues a
75 >     * complementary node.  The most interesting feature of these
76 >     * queues is that any operation can figure out which mode the
77 >     * queue is in, and act accordingly without needing locks.
78 >     *
79 >     * Both the queue and stack extend abstract class Transferer
80 >     * defining the single method transfer that does a put or a
81 >     * take. These are unified into a single method because in dual
82 >     * data structures, the put and take operations are symmetrical,
83 >     * so nearly all code can be combined. The resulting transfer
84 >     * methods are on the long side, but are easier to follow than
85 >     * they would be if broken up into nearly-duplicated parts.
86 >     *
87 >     * The queue and stack data structures share many conceptual
88 >     * similarities but very few concrete details. For simplicity,
89 >     * they are kept distinct so that they can later evolve
90 >     * separately.
91 >     *
92 >     * The algorithms here differ from the versions in the above paper
93 >     * in extending them for use in synchronous queues, as well as
94 >     * dealing with cancellation. The main differences include:
95 >     *
96 >     *  1. The orginal algorithms used bit-marked pointers, but
97 >     *     the ones here use mode bits in nodes, leading to a number
98 >     *     of further adaptations.
99 >     *  2. SynchronousQueues must block threads waiting to become
100 >     *     fulfilled.
101 >     *  3. Nodes/threads that have been cancelled due to timeouts
102 >     *     or interruptions are cleaned out of the lists to
103 >     *     avoid garbage retention and memory depletion.
104 >     *
105 >     * Blocking is mainly accomplished using LockSupport park/unpark,
106 >     * except that nodes that appear to be the next ones to become
107 >     * fulfilled first spin a bit (on multiprocessors only). On very
108 >     * busy synchronous queues, spinning can dramatically improve
109 >     * throughput. And on less busy ones, the amount of spinning is
110 >     * small enough not to be noticeable.
111 >     *
112 >     * Cleaning is done in different ways in queues vs stacks.  For
113 >     * queues, we can almost always remove a node immediately in O(1)
114 >     * time (modulo retries for consistency checks) when it is
115 >     * cancelled. But if it may be pinned as the current tail, it must
116 >     * wait until some subsequent cancellation. For stacks, we need a
117 >     * potentially O(n) traversal to be sure that we can remove the
118 >     * node, but this can run concurrently with other threads
119 >     * accessing the stack.
120 >     *
121 >     * While garbage collection takes care of most node reclamation
122 >     * issues that otherwise complicate nonblocking algorithms, care
123 >     * is made to "forget" references to data, other nodes, and
124 >     * threads that might be held on to long-term by blocked
125 >     * threads. In cases where setting to null would otherwise
126 >     * conflict with main algorithms, this is done by changing a
127 >     * node's link to now point to the node itself. This doesn't arise
128 >     * much for Stack nodes (because blocked threads do not hang on to
129 >     * old head pointers), but references in Queue nodes must be
130 >     * agressively forgotten to avoid reachability of everything any
131 >     * node has ever referred to since arrival.
132       */
92    public SynchronousQueue() {
93        this(false);
94    }
133  
134      /**
135 <     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98 <     * @param fair if true, threads contend in FIFO order for access;
99 <     * otherwise the order is unspecified.
135 >     * Shared internal API for dual stacks and queues.
136       */
137 <    public SynchronousQueue(boolean fair) {
138 <        if (fair) {
139 <            qlock = new ReentrantLock(true);
140 <            waitingProducers = new FifoWaitQueue();
141 <            waitingConsumers = new FifoWaitQueue();
142 <        }
143 <        else {
144 <            qlock = new ReentrantLock();
145 <            waitingProducers = new LifoWaitQueue();
146 <            waitingConsumers = new LifoWaitQueue();
147 <        }
137 >    static abstract class Transferer {
138 >        /**
139 >         * Perform a put or take.
140 >         * @param e if non-null, the item to be handed to a consumer;
141 >         * if null, requests that transfer return an item offered by
142 >         * producer.
143 >         * @param timed if this operation should timeout
144 >         * @param nanos the timeout, in nanoseconds
145 >         * @return if nonnull, the item provided or received; if null,
146 >         * the operation failed due to timeout or interrupt -- the
147 >         * caller can distinguish which of these occurred by checking
148 >         * Thread.interrupted.
149 >         */
150 >        abstract Object transfer(Object e, boolean timed, long nanos);
151      }
152  
153 +    /** The number of CPUs, for spin control */
154 +    static final int NCPUS = Runtime.getRuntime().availableProcessors();
155 +
156      /**
157 <     * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
158 <     * These queues have all transient fields, but are serializable
159 <     * in order to recover fairness settings when deserialized.
157 >     * The number of times to spin before blocking in timed waits.
158 >     * The value is empirically derived -- it works well across a
159 >     * variety of processors and OSes. Emprically, the best value
160 >     * seems not to vary with number of CPUs (beyond 2) so is just
161 >     * a constant.
162       */
163 <    static abstract class WaitQueue implements java.io.Serializable {
120 <        /** Creates, adds, and returns node for x. */
121 <        abstract Node enq(Object x);
122 <        /** Removes and returns node, or null if empty. */
123 <        abstract Node deq();
124 <        /** Removes a cancelled node to avoid garbage retention. */
125 <        abstract void unlink(Node node);
126 <        /** Returns true if a cancelled node might be on queue. */
127 <        abstract boolean shouldUnlink(Node node);
128 <    }
163 >    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
164  
165      /**
166 <     * FIFO queue to hold waiting puts/takes.
166 >     * The number of times to spin before blocking in untimed
167 >     * waits.  This is greater than timed value because untimed
168 >     * waits spin faster since they don't need to check times on
169 >     * each spin.
170       */
171 <    static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
134 <        private static final long serialVersionUID = -3623113410248163686L;
135 <        private transient Node head;
136 <        private transient Node last;
137 <
138 <        Node enq(Object x) {
139 <            Node p = new Node(x);
140 <            if (last == null)
141 <                last = head = p;
142 <            else
143 <                last = last.next = p;
144 <            return p;
145 <        }
146 <
147 <        Node deq() {
148 <            Node p = head;
149 <            if (p != null) {
150 <                if ((head = p.next) == null)
151 <                    last = null;
152 <                p.next = null;
153 <            }
154 <            return p;
155 <        }
156 <
157 <        boolean shouldUnlink(Node node) {
158 <            return (node == last || node.next != null);
159 <        }
160 <
161 <        void unlink(Node node) {
162 <            Node p = head;
163 <            Node trail = null;
164 <            while (p != null) {
165 <                if (p == node) {
166 <                    Node next = p.next;
167 <                    if (trail == null)
168 <                        head = next;
169 <                    else
170 <                        trail.next = next;
171 <                    if (last == node)
172 <                        last = trail;
173 <                    break;
174 <                }
175 <                trail = p;
176 <                p = p.next;
177 <            }
178 <        }
179 <    }
171 >    static final int maxUntimedSpins = maxTimedSpins * 16;
172  
173      /**
174 <     * LIFO queue to hold waiting puts/takes.
174 >     * The number of nanoseconds for which it is faster to spin
175 >     * rather than to use timed park. A rough estimate suffices.
176       */
177 <    static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185 <        private static final long serialVersionUID = -3633113410248163686L;
186 <        private transient Node head;
177 >    static final long spinForTimeoutThreshold = 1000L;
178  
179 <        Node enq(Object x) {
180 <            return head = new Node(x, head);
181 <        }
179 >    /** Dual stack  */
180 >    static final class TransferStack extends Transferer {
181 >        /*
182 >         * This extends Scherer-Scott dual stack algorithm, differing,
183 >         * among other ways, by using "covering" nodes rather than
184 >         * bit-marked pointers: Fulfilling operations push on marker
185 >         * nodes (with FULFILLING bit set in mode) to reserve a spot
186 >         * to match a waiting node.
187 >         */
188  
189 <        Node deq() {
190 <            Node p = head;
191 <            if (p != null) {
192 <                head = p.next;
193 <                p.next = null;
194 <            }
195 <            return p;
196 <        }
197 <
198 <        boolean shouldUnlink(Node node) {
199 <            // Return false if already dequeued or is bottom node (in which
200 <            // case we might retain at most one garbage node)
201 <            return (node == head || node.next != null);
202 <        }
203 <
204 <        void unlink(Node node) {
205 <            Node p = head;
206 <            Node trail = null;
207 <            while (p != null) {
208 <                if (p == node) {
209 <                    Node next = p.next;
210 <                    if (trail == null)
211 <                        head = next;
212 <                    else
213 <                        trail.next = next;
214 <                    break;
189 >        /* Modes for SNodes, ORed together in node fields */
190 >        /** Node represents an unfulfilled consumer */
191 >        static final int REQUEST    = 0;
192 >        /** Node represents an unfulfilled producer */
193 >        static final int DATA       = 1;
194 >        /** Node is fulfilling another unfulfilled DATA or REQUEST */
195 >        static final int FULFILLING = 2;
196 >
197 >        /** Return true if m has fulfilling bit set */
198 >        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
199 >
200 >        /** Node class for TransferStacks. */
201 >        static final class SNode {
202 >            volatile SNode next;        // next node in stack
203 >            volatile SNode match;       // the node matched to this
204 >            volatile Thread waiter;     // to control park/unpark
205 >            Object item;                // data; or null for REQUESTs
206 >            int mode;
207 >            // Note: item and mode fields don't need to be volatile
208 >            // since they are always written before, and read after,
209 >            // other volatile/atomic operations.
210 >
211 >            SNode(Object item) {
212 >                this.item = item;
213 >            }
214 >
215 >            static final AtomicReferenceFieldUpdater<SNode, SNode>
216 >                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
217 >                (SNode.class, SNode.class, "next");
218 >
219 >            boolean casNext(SNode cmp, SNode val) {
220 >                return (cmp == next &&
221 >                        nextUpdater.compareAndSet(this, cmp, val));
222 >            }
223 >
224 >            static final AtomicReferenceFieldUpdater<SNode, SNode>
225 >                matchUpdater = AtomicReferenceFieldUpdater.newUpdater
226 >                (SNode.class, SNode.class, "match");
227 >
228 >            /**
229 >             * Try to match node s to this node, if so, waking up
230 >             * thread. Fulfillers call tryMatch to identify their
231 >             * waiters. Waiters block until they have been
232 >             * matched.
233 >             * @param s the node to match
234 >             * @return true if successfully matched to s
235 >             */
236 >            boolean tryMatch(SNode s) {
237 >                if (match == null &&
238 >                    matchUpdater.compareAndSet(this, null, s)) {
239 >                    Thread w = waiter;
240 >                    if (w != null) {    // waiters need at most one unpark
241 >                        waiter = null;
242 >                        LockSupport.unpark(w);
243 >                    }
244 >                    return true;
245                  }
246 <                trail = p;
220 <                p = p.next;
246 >                return match == s;
247              }
222        }
223    }
248  
249 <    /**
250 <     * Unlinks the given node from consumer queue.  Called by cancelled
251 <     * (timeout, interrupt) waiters to avoid garbage retention in the
252 <     * absence of producers.
253 <     */
230 <    private void unlinkCancelledConsumer(Node node) {
231 <        // Use a form of double-check to avoid unnecessary locking and
232 <        // traversal. The first check outside lock might
233 <        // conservatively report true.
234 <        if (waitingConsumers.shouldUnlink(node)) {
235 <            qlock.lock();
236 <            try {
237 <                if (waitingConsumers.shouldUnlink(node))
238 <                    waitingConsumers.unlink(node);
239 <            } finally {
240 <                qlock.unlock();
249 >            /**
250 >             * Try to cancel a wait by matching node to itself.
251 >             */
252 >            void tryCancel() {
253 >                matchUpdater.compareAndSet(this, null, this);
254              }
242        }
243    }
255  
256 <    /**
257 <     * Unlinks the given node from producer queue.  Symmetric
247 <     * to unlinkCancelledConsumer.
248 <     */
249 <    private void unlinkCancelledProducer(Node node) {
250 <        if (waitingProducers.shouldUnlink(node)) {
251 <            qlock.lock();
252 <            try {
253 <                if (waitingProducers.shouldUnlink(node))
254 <                    waitingProducers.unlink(node);
255 <            } finally {
256 <                qlock.unlock();
256 >            boolean isCancelled() {
257 >                return match == this;
258              }
259          }
259    }
260  
261 <    /**
262 <     * Nodes each maintain an item and handle waits and signals for
263 <     * getting and setting it. The class extends
264 <     * AbstractQueuedSynchronizer to manage blocking, using AQS state
265 <     *  0 for waiting, 1 for ack, -1 for cancelled.
266 <     */
267 <    static final class Node extends AbstractQueuedSynchronizer {
268 <        private static final long serialVersionUID = -2631493897867746127L;
261 >        /** The head (top) of the stack */
262 >        volatile SNode head;
263  
264 <        /** Synchronization state value representing that node acked */
265 <        private static final int ACK    =  1;
266 <        /** Synchronization state value representing that node cancelled */
273 <        private static final int CANCEL = -1;
264 >        static final AtomicReferenceFieldUpdater<TransferStack, SNode>
265 >            headUpdater = AtomicReferenceFieldUpdater.newUpdater
266 >            (TransferStack.class,  SNode.class, "head");
267  
268 <        /** The item being transferred */
269 <        Object item;
270 <        /** Next node in wait queue */
278 <        Node next;
268 >        boolean casHead(SNode h, SNode nh) {
269 >            return h == head && headUpdater.compareAndSet(this, h, nh);
270 >        }
271  
272 <        /** Creates a node with initial item */
273 <        Node(Object x) { item = x; }
272 >        /**
273 >         * Create or reset fields of a node. Called only from transfer
274 >         * where the node to push on stack is lazily created and
275 >         * reused when possible to help reduce intervals between reads
276 >         * and CASes of head and to avoid surges of garbage when CASes
277 >         * to push nodes fail due to contention.
278 >         */
279 >        static SNode snode(SNode s, Object e, SNode next, int mode) {
280 >            if (s == null) s = new SNode(e);
281 >            s.mode = mode;
282 >            s.next = next;
283 >            return s;
284 >        }
285  
286 <        /** Creates a node with initial item and next */
287 <        Node(Object x, Node n) { item = x; next = n; }
286 >        /**
287 >         * Put or take an item.
288 >         */
289 >        Object transfer(Object e, boolean timed, long nanos) {
290 >            /*
291 >             * Basic algorithm is to loop trying one of three actions:
292 >             *
293 >             * 1. If apparently empty or already containing nodes of same
294 >             *    mode, try to push node on stack and wait for a match,
295 >             *    returning it, or null if cancelled.
296 >             *
297 >             * 2. If apparently containing node of complementary mode,
298 >             *    try to push a fulfilling node on to stack, match
299 >             *    with corresponding waiting node, pop both from
300 >             *    stack, and return matched item. The matching or
301 >             *    unlinking might not actually be necessary because of
302 >             *    another threads performing action 3:
303 >             *
304 >             * 3. If top of stack already holds another fulfilling node,
305 >             *    help it out by doing its match and/or pop
306 >             *    operations, and then continue. The code for helping
307 >             *    is essentially the same as for fulfilling, except
308 >             *    that it doesn't return the item.
309 >             */
310 >
311 >            SNode s = null; // constructed/reused as needed
312 >            int mode = (e == null)? REQUEST : DATA;
313 >
314 >            for (;;) {
315 >                SNode h = head;
316 >                if (h == null || h.mode == mode) {  // empty or same-mode
317 >                    if (timed && nanos <= 0) {      // can't wait
318 >                        if (h != null && h.isCancelled())
319 >                            casHead(h, h.next);     // pop cancelled node
320 >                        else
321 >                            return null;
322 >                    } else if (casHead(h, s = snode(s, e, h, mode))) {
323 >                        SNode m = awaitFulfill(s, timed, nanos);
324 >                        if (m == s) {               // wait was cancelled
325 >                            clean(s);
326 >                            return null;
327 >                        }
328 >                        if ((h = head) != null && h.next == s)
329 >                            casHead(h, s.next);     // help s's fulfiller
330 >                        return mode == REQUEST? m.item : s.item;
331 >                    }
332 >                } else if (!isFulfilling(h.mode)) { // try to fulfill
333 >                    if (h.isCancelled())            // already cancelled
334 >                        casHead(h, h.next);         // pop and retry
335 >                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
336 >                        for (;;) { // loop until matched or waiters disappear
337 >                            SNode m = s.next;       // m is s's match
338 >                            if (m == null) {        // all waiters are gone
339 >                                casHead(s, null);   // pop fulfill node
340 >                                s = null;           // use new node next time
341 >                                break;              // restart main loop
342 >                            }
343 >                            SNode mn = m.next;
344 >                            if (m.tryMatch(s)) {
345 >                                casHead(s, mn);     // pop both s and m
346 >                                return (mode == REQUEST)? m.item : s.item;
347 >                            } else                  // lost match
348 >                                s.casNext(m, mn);   // help unlink
349 >                        }
350 >                    }
351 >                } else {                            // help a fulfiller
352 >                    SNode m = h.next;               // m is h's match
353 >                    if (m == null)                  // waiter is gone
354 >                        casHead(h, null);           // pop fulfilling node
355 >                    else {
356 >                        SNode mn = m.next;
357 >                        if (m.tryMatch(h))          // help match
358 >                            casHead(h, mn);         // pop both h and m
359 >                        else                        // lost match
360 >                            h.casNext(m, mn);       // help unlink
361 >                    }
362 >                }
363 >            }
364 >        }
365  
366          /**
367 <         * Implements AQS base acquire to succeed if not in WAITING state
367 >         * Spin/block until node s is matched by a fulfill operation.
368 >         * @param s the waiting node
369 >         * @param timed true if timed wait
370 >         * @param nanos timeout value
371 >         * @return matched node, or s if cancelled
372           */
373 <        protected boolean tryAcquire(int ignore) {
374 <            return getState() != 0;
373 >        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
374 >            /*
375 >             * When a node/thread is about to block, it sets its waiter
376 >             * field and then rechecks state at least one more time
377 >             * before actually parking, thus covering race vs
378 >             * fulfiller noticing that waiter is nonnull so should be
379 >             * woken.
380 >             *
381 >             * When invoked by nodes that appear at the point of call
382 >             * to be at the head of the stack, calls to park are
383 >             * preceded by spins to avoid blocking when producers and
384 >             * consumers are arriving very close in time.  This can
385 >             * happen enough to bother only on multiprocessors.
386 >             *
387 >             * The order of checks for returning out of main loop
388 >             * reflects fact that interrupts have precedence over
389 >             * normal returns, which have precedence over
390 >             * timeouts. (So, on timeout, one last check for match is
391 >             * done before giving up.) Except that calls from untimed
392 >             * SynchronousQueue.{poll/offer} don't check interrupts
393 >             * and don't wait at all, so are trapped in transfer
394 >             * method rather than calling awaitFulfill.
395 >             */
396 >            long lastTime = (timed)? System.nanoTime() : 0;
397 >            Thread w = Thread.currentThread();
398 >            SNode h = head;
399 >            int spins = (shouldSpin(s)?
400 >                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
401 >            for (;;) {
402 >                if (w.isInterrupted())
403 >                    s.tryCancel();
404 >                SNode m = s.match;
405 >                if (m != null)
406 >                    return m;
407 >                if (timed) {
408 >                    long now = System.nanoTime();
409 >                    nanos -= now - lastTime;                    
410 >                    lastTime = now;
411 >                    if (nanos <= 0) {
412 >                        s.tryCancel();
413 >                        continue;
414 >                    }
415 >                }
416 >                if (spins > 0)
417 >                    spins = shouldSpin(s)? (spins-1) : 0;
418 >                else if (s.waiter == null)
419 >                    s.waiter = w; // establish waiter so can park next iter
420 >                else if (!timed)
421 >                    LockSupport.park(this);
422 >                else if (nanos > spinForTimeoutThreshold)
423 >                    LockSupport.parkNanos(this, nanos);
424 >            }
425          }
426  
427          /**
428 <         * Implements AQS base release to signal if state changed
428 >         * Return true if node s is at head or there is an active
429 >         * fulfiller.
430           */
431 <        protected boolean tryRelease(int newState) {
432 <            return compareAndSetState(0, newState);
431 >        boolean shouldSpin(SNode s) {
432 >            SNode h = head;
433 >            return (h == null || h == s || isFulfilling(h.mode));
434          }
435  
436          /**
437 <         * Takes item and nulls out field (for sake of GC)
437 >         * Unlink s from the stack
438           */
439 <        private Object extract() {
440 <            Object x = item;
441 <            item = null;
442 <            return x;
439 >        void clean(SNode s) {
440 >            s.item = null;   // forget item
441 >            s.waiter = null; // forget thread
442 >
443 >            /*
444 >             * At worst we may need to traverse entire stack to unlink
445 >             * s. If there are multiple concurrent calls to clean, we
446 >             * might not see s if another thread has already removed
447 >             * it. But we can stop when we see any node known to
448 >             * follow s. We use s.next unless it too is cancelled, in
449 >             * which case we try the node one past. We don't check any
450 >             * futher because we don't want to doubly traverse just to
451 >             * find sentinel.
452 >             */
453 >
454 >            SNode past = s.next;
455 >            if (past != null && past.isCancelled())
456 >                past = past.next;
457 >
458 >            // Absorb cancelled nodes at head
459 >            SNode p;
460 >            while ((p = head) != null && p != past && p.isCancelled())
461 >                casHead(p, p.next);
462 >
463 >            // Unsplice embedded nodes
464 >            while (p != null && p != past) {
465 >                SNode n = p.next;
466 >                if (n != null && n.isCancelled())
467 >                    p.casNext(n, n.next);
468 >                else
469 >                    p = n;
470 >            }
471          }
472 +    }
473 +
474 +    /** Dual Queue. */
475 +    static final class TransferQueue extends Transferer {
476 +        /*
477 +         * This extends Scherer-Scott dual queue algorithm, differing,
478 +         * among other ways, by using modes within nodes rather than
479 +         * marked pointers. The algorithm is a little simpler than
480 +         * that for stacks because fulfillers do not need explicit
481 +         * nodes, and matching is done by CAS'ing QNode.item field
482 +         * from nonnull to null (for put) or vice versa (for take).
483 +         */
484 +
485 +        /** Node class for TransferQueue. */
486 +        static final class QNode {
487 +            volatile QNode next;          // next node in queue
488 +            volatile Object item;         // CAS'ed to or from null
489 +            volatile Thread waiter;       // to control park/unpark
490 +            final boolean isData;
491 +
492 +            QNode(Object item, boolean isData) {
493 +                this.item = item;
494 +                this.isData = isData;
495 +            }
496 +
497 +            static final AtomicReferenceFieldUpdater<QNode, QNode>
498 +                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
499 +                (QNode.class, QNode.class, "next");
500  
501 +            boolean casNext(QNode cmp, QNode val) {
502 +                return (next == cmp &&
503 +                        nextUpdater.compareAndSet(this, cmp, val));
504 +            }
505 +
506 +            static final AtomicReferenceFieldUpdater<QNode, Object>
507 +                itemUpdater = AtomicReferenceFieldUpdater.newUpdater
508 +                (QNode.class, Object.class, "item");
509 +
510 +            boolean casItem(Object cmp, Object val) {
511 +                return (item == cmp &&
512 +                        itemUpdater.compareAndSet(this, cmp, val));
513 +            }
514 +
515 +            /**
516 +             * Try to cancel by CAS'ing ref to this as item.  
517 +             */
518 +            void tryCancel(Object cmp) {
519 +                itemUpdater.compareAndSet(this, cmp, this);
520 +            }
521 +
522 +            boolean isCancelled() {
523 +                return item == this;
524 +            }
525 +        }
526 +
527 +        /** Head of queue */
528 +        transient volatile QNode head;
529 +        /** Tail of queue */
530 +        transient volatile QNode tail;
531          /**
532 <         * Tries to cancel on interrupt; if so rethrowing,
533 <         * else setting interrupt state
532 >         * Reference to a cancelled node that might not yet have been
533 >         * unlinked from queue because it was the last inserted node
534 >         * when it cancelled.
535           */
536 <        private void checkCancellationOnInterrupt(InterruptedException ie)
537 <            throws InterruptedException {
538 <            if (release(CANCEL))
539 <                throw ie;
540 <            Thread.currentThread().interrupt();
536 >        transient volatile QNode cleanMe;
537 >
538 >        TransferQueue() {
539 >            QNode h = new QNode(null, false); // initialize to dummy node.
540 >            head = h;
541 >            tail = h;
542          }
543  
544 +        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
545 +            headUpdater = AtomicReferenceFieldUpdater.newUpdater
546 +            (TransferQueue.class,  QNode.class, "head");
547 +
548          /**
549 <         * Fills in the slot created by the consumer and signal consumer to
550 <         * continue.
549 >         * Try to cas nh as new head; if successful unlink
550 >         * old head's next node to avoid garbage retention.
551           */
552 <        boolean setItem(Object x) {
553 <            item = x; // can place in slot even if cancelled
554 <            return release(ACK);
552 >        void advanceHead(QNode h, QNode nh) {
553 >            if (h == head && headUpdater.compareAndSet(this, h, nh))
554 >                h.next = h; // forget old next
555          }
556  
557 +        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
558 +            tailUpdater = AtomicReferenceFieldUpdater.newUpdater
559 +            (TransferQueue.class, QNode.class, "tail");
560 +
561          /**
562 <         * Removes item from slot created by producer and signal producer
331 <         * to continue.
562 >         * Try to cas nt as new tail.
563           */
564 <        Object getItem() {
565 <            return (release(ACK))? extract() : null;
564 >        void advanceTail(QNode t, QNode nt) {
565 >            if (tail == t)
566 >                tailUpdater.compareAndSet(this, t, nt);
567          }
568  
569 +        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
570 +            cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
571 +            (TransferQueue.class, QNode.class, "cleanMe");
572 +
573          /**
574 <         * Waits for a consumer to take item placed by producer.
574 >         * Try to CAS cleanMe slot
575           */
576 <        void waitForTake() throws InterruptedException {
577 <            try {
578 <                acquireInterruptibly(0);
343 <            } catch (InterruptedException ie) {
344 <                checkCancellationOnInterrupt(ie);
345 <            }
576 >        boolean casCleanMe(QNode cmp, QNode val) {
577 >            return (cleanMe == cmp &&
578 >                    cleanMeUpdater.compareAndSet(this, cmp, val));
579          }
580  
581          /**
582 <         * Waits for a producer to put item placed by consumer.
582 >         * Put or take an item.
583           */
584 <        Object waitForPut() throws InterruptedException {
585 <            try {
586 <                acquireInterruptibly(0);
587 <            } catch (InterruptedException ie) {
588 <                checkCancellationOnInterrupt(ie);
584 >        Object transfer(Object e, boolean timed, long nanos) {
585 >            /* Basic algorithm is to loop trying to take either of
586 >             * two actions:
587 >             *
588 >             * 1. If queue apparently empty or holding same-mode nodes,
589 >             *    try to add node to queue of waiters, wait to be
590 >             *    fulfilled (or cancelled) and return matching item.
591 >             *
592 >             * 2. If queue apparently contains waiting items, and this
593 >             *    call is of complementary mode, try to fulfill by CAS'ing
594 >             *    item field of waiting node and dequeuing it, and then
595 >             *    returning matching item.
596 >             *
597 >             * In each case, along the way, check for and try to help
598 >             * advance head and tail on behalf of other stalled/slow
599 >             * threads.
600 >             *
601 >             * The loop starts off with a null check guarding against
602 >             * seeing uninitialized head or tail values. This never
603 >             * happens in current SynchronousQueue, but could if
604 >             * callers held non-volatile/final ref to the
605 >             * transferer. The check is here anyway because it places
606 >             * null checks at top of loop, which is usually faster
607 >             * than having them implicitly interspersed.
608 >             */
609 >
610 >            QNode s = null; // constructed/reused as needed
611 >            boolean isData = (e != null);
612 >
613 >            for (;;) {
614 >                QNode t = tail;
615 >                QNode h = head;
616 >                if (t == null || h == null)         // saw unitialized values
617 >                    continue;                       // spin
618 >
619 >                if (h == t || t.isData == isData) { // empty or same-mode
620 >                    QNode tn = t.next;
621 >                    if (t != tail)                  // inconsistent read
622 >                        continue;
623 >                    if (tn != null) {               // lagging tail
624 >                        advanceTail(t, tn);
625 >                        continue;
626 >                    }
627 >                    if (timed && nanos <= 0)        // can't wait
628 >                        return null;
629 >                    if (s == null)
630 >                        s = new QNode(e, isData);
631 >                    if (!t.casNext(null, s))        // failed to link in
632 >                        continue;
633 >
634 >                    advanceTail(t, s);              // swing tail and wait
635 >                    Object x = awaitFulfill(s, e, timed, nanos);
636 >                    if (x == s) {                   // wait was cancelled
637 >                        clean(t, s);
638 >                        return null;
639 >                    }
640 >
641 >                    if (s.next != s) {              // not already unlinked
642 >                        advanceHead(t, s);          // unlink
643 >                        if (x != null)              // and forget fields
644 >                            s.item = s;
645 >                        s.waiter = null;
646 >                    }
647 >                    return (x != null)? x : e;
648 >
649 >                } else {                            // complementary-mode
650 >                    QNode m = h.next;               // node to fulfill
651 >                    if (t != tail || m == null || h != head)
652 >                        continue;                   // inconsistent read
653 >
654 >                    Object x = m.item;
655 >                    if (isData == (x != null) ||    // m already fulfilled
656 >                        x == m ||                   // m cancelled
657 >                        !m.casItem(x, e)) {         // lost CAS
658 >                        advanceHead(h, m);          // dequeue and retry
659 >                        continue;
660 >                    }
661 >
662 >                    advanceHead(h, m);              // successfully fulfilled
663 >                    LockSupport.unpark(m.waiter);
664 >                    return (x != null)? x : e;
665 >                }
666              }
357            return extract();
667          }
668  
669          /**
670 <         * Waits for a consumer to take item placed by producer or time out.
670 >         * Spin/block until node s is fulfilled.
671 >         * @param s the waiting node
672 >         * @param e the comparison value for checking match
673 >         * @param timed true if timed wait
674 >         * @param nanos timeout value
675 >         * @return matched item, or s if cancelled
676           */
677 <        boolean waitForTake(long nanos) throws InterruptedException {
678 <            try {
679 <                if (!tryAcquireNanos(0, nanos) &&
680 <                    release(CANCEL))
681 <                    return false;
682 <            } catch (InterruptedException ie) {
683 <                checkCancellationOnInterrupt(ie);
677 >        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
678 >            /* Same idea as TransferStack.awaitFulfill */
679 >            long lastTime = (timed)? System.nanoTime() : 0;
680 >            Thread w = Thread.currentThread();
681 >            int spins = ((head.next == s) ?
682 >                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
683 >            for (;;) {
684 >                if (w.isInterrupted())
685 >                    s.tryCancel(e);
686 >                Object x = s.item;
687 >                if (x != e)
688 >                    return x;
689 >                if (timed) {
690 >                    long now = System.nanoTime();
691 >                    nanos -= now - lastTime;                    
692 >                    lastTime = now;
693 >                    if (nanos <= 0) {
694 >                        s.tryCancel(e);
695 >                        continue;
696 >                    }
697 >                }
698 >                if (spins > 0)
699 >                    --spins;
700 >                else if (s.waiter == null)
701 >                    s.waiter = w;
702 >                else if (!timed)
703 >                    LockSupport.park(this);
704 >                else if (nanos > spinForTimeoutThreshold)
705 >                    LockSupport.parkNanos(this, nanos);
706              }
371            return true;
707          }
708  
709          /**
710 <         * Waits for a producer to put item placed by consumer, or time out.
710 >         * Get rid of cancelled node s with original predecessor pred.
711           */
712 <        Object waitForPut(long nanos) throws InterruptedException {
713 <            try {
714 <                if (!tryAcquireNanos(0, nanos) &&
715 <                    release(CANCEL))
716 <                    return null;
717 <            } catch (InterruptedException ie) {
718 <                checkCancellationOnInterrupt(ie);
712 >        void clean(QNode pred, QNode s) {
713 >            s.waiter = null; // forget thread
714 >            /*
715 >             * At any given time, exactly one node on list cannot be
716 >             * deleted -- the last inserted node. To accommodate this,
717 >             * if we cannot delete s, we save its predecessor as
718 >             * "cleanMe", deleting the previously saved version
719 >             * first. At least one of node s or the node previously
720 >             * saved can always be deleted, so this always terminates.
721 >             */
722 >            while (pred.next == s) { // Return early if already unlinked
723 >                QNode h = head;
724 >                QNode hn = h.next;   // Absorb cancelled first node as head
725 >                if (hn != null && hn.isCancelled()) {
726 >                    advanceHead(h, hn);
727 >                    continue;
728 >                }
729 >                QNode t = tail;      // Ensure consistent read for tail
730 >                if (t == h)
731 >                    return;
732 >                QNode tn = t.next;
733 >                if (t != tail)
734 >                    continue;
735 >                if (tn != null) {
736 >                    advanceTail(t, tn);
737 >                    continue;
738 >                }
739 >                if (s != t) {        // If not tail, try to unsplice
740 >                    QNode sn = s.next;
741 >                    if (sn == s || pred.casNext(s, sn))
742 >                        return;
743 >                }
744 >                QNode dp = cleanMe;
745 >                if (dp != null) {    // Try unlinking previous cancelled node
746 >                    QNode d = dp.next;
747 >                    QNode dn;
748 >                    if (d == null ||               // d is gone or
749 >                        d == dp ||                 // d is off list or
750 >                        !d.isCancelled() ||        // d not cancelled or
751 >                        (d != t &&                 // d not tail and
752 >                         (dn = d.next) != null &&  //   has successor
753 >                         dn != d &&                //   that is on list
754 >                         dp.casNext(d, dn)))       // d unspliced
755 >                        casCleanMe(dp, null);
756 >                    if (dp == pred)                
757 >                        return;      // s is already saved node
758 >                } else if (casCleanMe(null, pred))
759 >                    return;          // Postpone cleaning s
760              }
385            return extract();
761          }
762      }
763  
764      /**
765 +     * The transferer. Set only in constructor, but cannot be declared
766 +     * as final without further complicating serialization.  Since
767 +     * this is accessed only once per public method, there isn't a
768 +     * noticeable performance penalty for using volatile instead of
769 +     * final here.
770 +     */
771 +    private transient volatile Transferer transferer;
772 +
773 +    /**
774 +     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
775 +     */
776 +    public SynchronousQueue() {
777 +        this(false);
778 +    }
779 +
780 +    /**
781 +     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
782 +     * @param fair if true, waiting threads contend in FIFO order for access;
783 +     * otherwise the order is unspecified.
784 +     */
785 +    public SynchronousQueue(boolean fair) {
786 +        transferer = (fair)? new TransferQueue() : new TransferStack();
787 +    }
788 +
789 +    /**
790       * Adds the specified element to this queue, waiting if necessary for
791       * another thread to receive it.
792       *
793       * @throws InterruptedException {@inheritDoc}
794       * @throws NullPointerException {@inheritDoc}
795       */
796 <    public void put(E e) throws InterruptedException {
797 <        if (e == null) throw new NullPointerException();
798 <        final ReentrantLock qlock = this.qlock;
799 <
400 <        for (;;) {
401 <            Node node;
402 <            boolean mustWait;
403 <            if (Thread.interrupted()) throw new InterruptedException();
404 <            qlock.lock();
405 <            try {
406 <                node = waitingConsumers.deq();
407 <                if ( (mustWait = (node == null)) )
408 <                    node = waitingProducers.enq(e);
409 <            } finally {
410 <                qlock.unlock();
411 <            }
412 <
413 <            if (mustWait) {
414 <                try {
415 <                    node.waitForTake();
416 <                    return;
417 <                } catch (InterruptedException ex) {
418 <                    unlinkCancelledProducer(node);
419 <                    throw ex;
420 <                }
421 <            }
422 <
423 <            else if (node.setItem(e))
424 <                return;
425 <
426 <            // else consumer cancelled, so retry
427 <        }
796 >    public void put(E o) throws InterruptedException {
797 >        if (o == null) throw new NullPointerException();
798 >        if (transferer.transfer(o, false, 0) == null)
799 >            throw new InterruptedException();
800      }
801  
802      /**
# Line 436 | Line 808 | public class SynchronousQueue<E> extends
808       * @throws InterruptedException {@inheritDoc}
809       * @throws NullPointerException {@inheritDoc}
810       */
811 <    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
812 <        if (e == null) throw new NullPointerException();
813 <        long nanos = unit.toNanos(timeout);
814 <        final ReentrantLock qlock = this.qlock;
815 <        for (;;) {
816 <            Node node;
817 <            boolean mustWait;
818 <            if (Thread.interrupted()) throw new InterruptedException();
819 <            qlock.lock();
448 <            try {
449 <                node = waitingConsumers.deq();
450 <                if ( (mustWait = (node == null)) )
451 <                    node = waitingProducers.enq(e);
452 <            } finally {
453 <                qlock.unlock();
454 <            }
455 <
456 <            if (mustWait) {
457 <                try {
458 <                    boolean x = node.waitForTake(nanos);
459 <                    if (!x)
460 <                        unlinkCancelledProducer(node);
461 <                    return x;
462 <                } catch (InterruptedException ex) {
463 <                    unlinkCancelledProducer(node);
464 <                    throw ex;
465 <                }
466 <            }
467 <
468 <            else if (node.setItem(e))
469 <                return true;
811 >    public boolean offer(E o, long timeout, TimeUnit unit)
812 >        throws InterruptedException {
813 >        if (o == null) throw new NullPointerException();
814 >        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
815 >            return true;
816 >        if (!Thread.interrupted())
817 >            return false;
818 >        throw new InterruptedException();
819 >    }
820  
821 <            // else consumer cancelled, so retry
822 <        }
821 >    /**
822 >     * Inserts the specified element into this queue, if another thread is
823 >     * waiting to receive it.
824 >     *
825 >     * @param e the element to add
826 >     * @return <tt>true</tt> if the element was added to this queue, else
827 >     *         <tt>false</tt>
828 >     * @throws NullPointerException if the specified element is null
829 >     */
830 >    public boolean offer(E e) {
831 >        if (e == null) throw new NullPointerException();
832 >        return transferer.transfer(e, true, 0) != null;
833      }
834  
835      /**
# Line 480 | Line 840 | public class SynchronousQueue<E> extends
840       * @throws InterruptedException {@inheritDoc}
841       */
842      public E take() throws InterruptedException {
843 <        final ReentrantLock qlock = this.qlock;
844 <        for (;;) {
845 <            Node node;
846 <            boolean mustWait;
487 <
488 <            if (Thread.interrupted()) throw new InterruptedException();
489 <            qlock.lock();
490 <            try {
491 <                node = waitingProducers.deq();
492 <                if ( (mustWait = (node == null)) )
493 <                    node = waitingConsumers.enq(null);
494 <            } finally {
495 <                qlock.unlock();
496 <            }
497 <
498 <            if (mustWait) {
499 <                try {
500 <                    Object x = node.waitForPut();
501 <                    return (E)x;
502 <                } catch (InterruptedException ex) {
503 <                    unlinkCancelledConsumer(node);
504 <                    throw ex;
505 <                }
506 <            }
507 <            else {
508 <                Object x = node.getItem();
509 <                if (x != null)
510 <                    return (E)x;
511 <                // else cancelled, so retry
512 <            }
513 <        }
843 >        Object e = transferer.transfer(null, false, 0);
844 >        if (e != null)
845 >            return (E)e;
846 >        throw new InterruptedException();
847      }
848  
849      /**
# Line 523 | Line 856 | public class SynchronousQueue<E> extends
856       * @throws InterruptedException {@inheritDoc}
857       */
858      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
859 <        long nanos = unit.toNanos(timeout);
860 <        final ReentrantLock qlock = this.qlock;
861 <
862 <        for (;;) {
530 <            Node node;
531 <            boolean mustWait;
532 <
533 <            if (Thread.interrupted()) throw new InterruptedException();
534 <            qlock.lock();
535 <            try {
536 <                node = waitingProducers.deq();
537 <                if ( (mustWait = (node == null)) )
538 <                    node = waitingConsumers.enq(null);
539 <            } finally {
540 <                qlock.unlock();
541 <            }
542 <
543 <            if (mustWait) {
544 <                try {
545 <                    Object x = node.waitForPut(nanos);
546 <                    if (x == null)
547 <                        unlinkCancelledConsumer(node);
548 <                    return (E)x;
549 <                } catch (InterruptedException ex) {
550 <                    unlinkCancelledConsumer(node);
551 <                    throw ex;
552 <                }
553 <            }
554 <            else {
555 <                Object x = node.getItem();
556 <                if (x != null)
557 <                    return (E)x;
558 <                // else cancelled, so retry
559 <            }
560 <        }
561 <    }
562 <
563 <    // Untimed nonblocking versions
564 <
565 <    /**
566 <     * Inserts the specified element into this queue, if another thread is
567 <     * waiting to receive it.
568 <     *
569 <     * @param e the element to add
570 <     * @return <tt>true</tt> if the element was added to this queue, else
571 <     *         <tt>false</tt>
572 <     * @throws NullPointerException if the specified element is null
573 <     */
574 <    public boolean offer(E e) {
575 <        if (e == null) throw new NullPointerException();
576 <        final ReentrantLock qlock = this.qlock;
577 <
578 <        for (;;) {
579 <            Node node;
580 <            qlock.lock();
581 <            try {
582 <                node = waitingConsumers.deq();
583 <            } finally {
584 <                qlock.unlock();
585 <            }
586 <            if (node == null)
587 <                return false;
588 <
589 <            else if (node.setItem(e))
590 <                return true;
591 <            // else retry
592 <        }
859 >        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
860 >        if (e != null || !Thread.interrupted())
861 >            return (E)e;
862 >        throw new InterruptedException();
863      }
864  
865      /**
# Line 600 | Line 870 | public class SynchronousQueue<E> extends
870       *         element is available.
871       */
872      public E poll() {
873 <        final ReentrantLock qlock = this.qlock;
604 <        for (;;) {
605 <            Node node;
606 <            qlock.lock();
607 <            try {
608 <                node = waitingProducers.deq();
609 <            } finally {
610 <                qlock.unlock();
611 <            }
612 <            if (node == null)
613 <                return null;
614 <
615 <            else {
616 <                Object x = node.getItem();
617 <                if (x != null)
618 <                    return (E)x;
619 <                // else retry
620 <            }
621 <        }
873 >        return (E)transferer.transfer(null, true, 0);
874      }
875  
876      /**
877       * Always returns <tt>true</tt>.
878       * A <tt>SynchronousQueue</tt> has no internal capacity.
627     *
879       * @return <tt>true</tt>
880       */
881      public boolean isEmpty() {
# Line 634 | Line 885 | public class SynchronousQueue<E> extends
885      /**
886       * Always returns zero.
887       * A <tt>SynchronousQueue</tt> has no internal capacity.
888 <     *
638 <     * @return zero
888 >     * @return zero.
889       */
890      public int size() {
891          return 0;
# Line 644 | Line 894 | public class SynchronousQueue<E> extends
894      /**
895       * Always returns zero.
896       * A <tt>SynchronousQueue</tt> has no internal capacity.
897 <     *
648 <     * @return zero
897 >     * @return zero.
898       */
899      public int remainingCapacity() {
900          return 0;
# Line 655 | Line 904 | public class SynchronousQueue<E> extends
904       * Does nothing.
905       * A <tt>SynchronousQueue</tt> has no internal capacity.
906       */
907 <    public void clear() {}
907 >    public void clear() {
908 >    }
909  
910      /**
911       * Always returns <tt>false</tt>.
912       * A <tt>SynchronousQueue</tt> has no internal capacity.
913 <     *
664 <     * @param o object to be checked for containment in this queue
913 >     * @param o the element
914       * @return <tt>false</tt>
915       */
916      public boolean contains(Object o) {
# Line 680 | Line 929 | public class SynchronousQueue<E> extends
929      }
930  
931      /**
932 <     * Returns <tt>false</tt> unless the given collection is empty.
932 >     * Returns <tt>false</tt> unless given collection is empty.
933       * A <tt>SynchronousQueue</tt> has no internal capacity.
685     *
934       * @param c the collection
935 <     * @return <tt>false</tt> unless the given collection is empty
688 <     * @throws NullPointerException if the specified collection is null
935 >     * @return <tt>false</tt> unless given collection is empty
936       */
937      public boolean containsAll(Collection<?> c) {
938          return c.isEmpty();
# Line 694 | Line 941 | public class SynchronousQueue<E> extends
941      /**
942       * Always returns <tt>false</tt>.
943       * A <tt>SynchronousQueue</tt> has no internal capacity.
697     *
944       * @param c the collection
945       * @return <tt>false</tt>
946       */
# Line 705 | Line 951 | public class SynchronousQueue<E> extends
951      /**
952       * Always returns <tt>false</tt>.
953       * A <tt>SynchronousQueue</tt> has no internal capacity.
708     *
954       * @param c the collection
955       * @return <tt>false</tt>
956       */
# Line 717 | Line 962 | public class SynchronousQueue<E> extends
962       * Always returns <tt>null</tt>.
963       * A <tt>SynchronousQueue</tt> does not return elements
964       * unless actively waited on.
720     *
965       * @return <tt>null</tt>
966       */
967      public E peek() {
968          return null;
969      }
970  
727
971      static class EmptyIterator<E> implements Iterator<E> {
972          public boolean hasNext() {
973              return false;
# Line 747 | Line 990 | public class SynchronousQueue<E> extends
990          return new EmptyIterator<E>();
991      }
992  
750
993      /**
994       * Returns a zero-length array.
995       * @return a zero-length array
# Line 809 | Line 1051 | public class SynchronousQueue<E> extends
1051          }
1052          return n;
1053      }
1054 +
1055 +    /*
1056 +     * To cope with serialization strategy in the 1.5 version of
1057 +     * SynchronousQueue, we declare some unused classes and fields
1058 +     * that exist solely to enable serializability across versions.
1059 +     * These fields are never used, so are initialized only if this
1060 +     * object is ever serialized or deserialized.
1061 +     */
1062 +
1063 +    static class WaitQueue implements java.io.Serializable { }
1064 +    static class LifoWaitQueue extends WaitQueue {
1065 +        private static final long serialVersionUID = -3633113410248163686L;
1066 +    }
1067 +    static class FifoWaitQueue extends WaitQueue {
1068 +        private static final long serialVersionUID = -3623113410248163686L;
1069 +    }
1070 +    private ReentrantLock qlock;
1071 +    private WaitQueue waitingProducers;
1072 +    private WaitQueue waitingConsumers;
1073 +
1074 +    /**
1075 +     * Save the state to a stream (that is, serialize it).
1076 +     *
1077 +     * @param s the stream
1078 +     */
1079 +    private void writeObject(java.io.ObjectOutputStream s)
1080 +        throws java.io.IOException {
1081 +        boolean fair = transferer instanceof TransferQueue;
1082 +        if (fair) {
1083 +            qlock = new ReentrantLock(true);
1084 +            waitingProducers = new FifoWaitQueue();
1085 +            waitingConsumers = new FifoWaitQueue();
1086 +        }
1087 +        else {
1088 +            qlock = new ReentrantLock();
1089 +            waitingProducers = new LifoWaitQueue();
1090 +            waitingConsumers = new LifoWaitQueue();
1091 +        }
1092 +        s.defaultWriteObject();
1093 +    }
1094 +
1095 +    private void readObject(final java.io.ObjectInputStream s)
1096 +        throws java.io.IOException, ClassNotFoundException {
1097 +        s.defaultReadObject();
1098 +        if (waitingProducers instanceof FifoWaitQueue)
1099 +            transferer = new TransferQueue();
1100 +        else
1101 +            transferer = new TransferStack();
1102 +    }
1103 +
1104   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines