ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
Revision: 1.4
Committed: Wed Jul 29 02:35:47 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +52 -11 lines
Log Message:
sync with jsr166y package

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.AbstractQueue;
10     import java.util.Collection;
11     import java.util.Iterator;
12     import java.util.NoSuchElementException;
13     import java.util.concurrent.locks.LockSupport;
14     import java.util.concurrent.atomic.AtomicReference;
15    
16     /**
17     * An unbounded {@linkplain TransferQueue} based on linked nodes.
18     * This queue orders elements FIFO (first-in-first-out) with respect
19     * to any given producer. The <em>head</em> of the queue is that
20     * element that has been on the queue the longest time for some
21     * producer. The <em>tail</em> of the queue is that element that has
22     * been on the queue the shortest time for some producer.
23     *
24     * <p>Beware that, unlike in most collections, the {@code size}
25     * method is <em>NOT</em> a constant-time operation. Because of the
26     * asynchronous nature of these queues, determining the current number
27     * of elements requires a traversal of the elements.
28     *
29     * <p>This class and its iterator implement all of the
30     * <em>optional</em> methods of the {@link Collection} and {@link
31     * Iterator} interfaces.
32     *
33     * <p>Memory consistency effects: As with other concurrent
34     * collections, actions in a thread prior to placing an object into a
35     * {@code LinkedTransferQueue}
36     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
37     * actions subsequent to the access or removal of that element from
38     * the {@code LinkedTransferQueue} in another thread.
39     *
40     * <p>This class is a member of the
41     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42     * Java Collections Framework</a>.
43     *
44     * @since 1.7
45     * @author Doug Lea
46     * @param <E> the type of elements held in this collection
47     */
48     public class LinkedTransferQueue<E> extends AbstractQueue<E>
49     implements TransferQueue<E>, java.io.Serializable {
50     private static final long serialVersionUID = -3223113410248163686L;
51    
52     /*
53     * This class extends the approach used in FIFO-mode
54     * SynchronousQueues. See the internal documentation, as well as
55     * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
56     * Lea & Scott
57     * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
58     *
59     * The main extension is to provide different Wait modes for the
60     * main "xfer" method that puts or takes items. These don't
61     * impact the basic dual-queue logic, but instead control whether
62     * or how threads block upon insertion of request or data nodes
63     * into the dual queue. It also uses slightly different
64     * conventions for tracking whether nodes are off-list or
65     * cancelled.
66     */
67    
68     // Wait modes for xfer method
69     static final int NOWAIT = 0;
70     static final int TIMEOUT = 1;
71     static final int WAIT = 2;
72    
73     /** The number of CPUs, for spin control */
74     static final int NCPUS = Runtime.getRuntime().availableProcessors();
75    
76     /**
77     * The number of times to spin before blocking in timed waits.
78     * The value is empirically derived -- it works well across a
79     * variety of processors and OSes. Empirically, the best value
80     * seems not to vary with number of CPUs (beyond 2) so is just
81     * a constant.
82     */
83     static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
84    
85     /**
86     * The number of times to spin before blocking in untimed waits.
87     * This is greater than timed value because untimed waits spin
88     * faster since they don't need to check times on each spin.
89     */
90     static final int maxUntimedSpins = maxTimedSpins * 16;
91    
92     /**
93     * The number of nanoseconds for which it is faster to spin
94     * rather than to use timed park. A rough estimate suffices.
95     */
96     static final long spinForTimeoutThreshold = 1000L;
97    
98     /**
99     * Node class for LinkedTransferQueue. Opportunistically
100     * subclasses from AtomicReference to represent item. Uses Object,
101     * not E, to allow setting item to "this" after use, to avoid
102     * garbage retention. Similarly, setting the next field to this is
103     * used as sentinel that node is off list.
104     */
105     static final class Node<E> extends AtomicReference<Object> {
106     volatile Node<E> next;
107     volatile Thread waiter; // to control park/unpark
108     final boolean isData;
109    
110     Node(E item, boolean isData) {
111     super(item);
112     this.isData = isData;
113     }
114    
115 jsr166 1.4 // Unsafe mechanics
116    
117     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
118     private static final long nextOffset =
119     objectFieldOffset(UNSAFE, "next", Node.class);
120 jsr166 1.1
121     final boolean casNext(Node<E> cmp, Node<E> val) {
122 jsr166 1.4 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
123 jsr166 1.1 }
124    
125     final void clearNext() {
126 jsr166 1.4 UNSAFE.putOrderedObject(this, nextOffset, this);
127 jsr166 1.1 }
128    
129     private static final long serialVersionUID = -3375979862319811754L;
130     }
131    
132     /**
133     * Padded version of AtomicReference used for head, tail and
134     * cleanMe, to alleviate contention across threads CASing one vs
135     * the other.
136     */
137     static final class PaddedAtomicReference<T> extends AtomicReference<T> {
138     // enough padding for 64bytes with 4byte refs
139     Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
140     PaddedAtomicReference(T r) { super(r); }
141     private static final long serialVersionUID = 8170090609809740854L;
142     }
143    
144    
145     /** head of the queue */
146     private transient final PaddedAtomicReference<Node<E>> head;
147    
148     /** tail of the queue */
149     private transient final PaddedAtomicReference<Node<E>> tail;
150    
151     /**
152     * Reference to a cancelled node that might not yet have been
153     * unlinked from queue because it was the last inserted node
154     * when it cancelled.
155     */
156     private transient final PaddedAtomicReference<Node<E>> cleanMe;
157    
158     /**
159     * Tries to cas nh as new head; if successful, unlink
160     * old head's next node to avoid garbage retention.
161     */
162     private boolean advanceHead(Node<E> h, Node<E> nh) {
163     if (h == head.get() && head.compareAndSet(h, nh)) {
164     h.clearNext(); // forget old next
165     return true;
166     }
167     return false;
168     }
169    
170     /**
171     * Puts or takes an item. Used for most queue operations (except
172     * poll() and tryTransfer()). See the similar code in
173     * SynchronousQueue for detailed explanation.
174     *
175     * @param e the item or if null, signifies that this is a take
176     * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
177     * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
178     * @return an item, or null on failure
179     */
180     private E xfer(E e, int mode, long nanos) {
181     boolean isData = (e != null);
182     Node<E> s = null;
183     final PaddedAtomicReference<Node<E>> head = this.head;
184     final PaddedAtomicReference<Node<E>> tail = this.tail;
185    
186     for (;;) {
187     Node<E> t = tail.get();
188     Node<E> h = head.get();
189    
190     if (t != null && (t == h || t.isData == isData)) {
191     if (s == null)
192     s = new Node<E>(e, isData);
193     Node<E> last = t.next;
194     if (last != null) {
195     if (t == tail.get())
196     tail.compareAndSet(t, last);
197     }
198     else if (t.casNext(null, s)) {
199     tail.compareAndSet(t, s);
200     return awaitFulfill(t, s, e, mode, nanos);
201     }
202     }
203    
204     else if (h != null) {
205     Node<E> first = h.next;
206     if (t == tail.get() && first != null &&
207     advanceHead(h, first)) {
208     Object x = first.get();
209     if (x != first && first.compareAndSet(x, e)) {
210     LockSupport.unpark(first.waiter);
211     return isData ? e : (E) x;
212     }
213     }
214     }
215     }
216     }
217    
218    
219     /**
220     * Version of xfer for poll() and tryTransfer, which
221     * simplifies control paths both here and in xfer.
222     */
223     private E fulfill(E e) {
224     boolean isData = (e != null);
225     final PaddedAtomicReference<Node<E>> head = this.head;
226     final PaddedAtomicReference<Node<E>> tail = this.tail;
227    
228     for (;;) {
229     Node<E> t = tail.get();
230     Node<E> h = head.get();
231    
232     if (t != null && (t == h || t.isData == isData)) {
233     Node<E> last = t.next;
234     if (t == tail.get()) {
235     if (last != null)
236     tail.compareAndSet(t, last);
237     else
238     return null;
239     }
240     }
241     else if (h != null) {
242     Node<E> first = h.next;
243     if (t == tail.get() &&
244     first != null &&
245     advanceHead(h, first)) {
246     Object x = first.get();
247     if (x != first && first.compareAndSet(x, e)) {
248     LockSupport.unpark(first.waiter);
249     return isData ? e : (E) x;
250     }
251     }
252     }
253     }
254     }
255    
256     /**
257     * Spins/blocks until node s is fulfilled or caller gives up,
258     * depending on wait mode.
259     *
260     * @param pred the predecessor of waiting node
261     * @param s the waiting node
262     * @param e the comparison value for checking match
263     * @param mode mode
264     * @param nanos timeout value
265     * @return matched item, or s if cancelled
266     */
267     private E awaitFulfill(Node<E> pred, Node<E> s, E e,
268     int mode, long nanos) {
269     if (mode == NOWAIT)
270     return null;
271    
272     long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
273     Thread w = Thread.currentThread();
274     int spins = -1; // set to desired spin count below
275     for (;;) {
276     if (w.isInterrupted())
277     s.compareAndSet(e, s);
278     Object x = s.get();
279     if (x != e) { // Node was matched or cancelled
280     advanceHead(pred, s); // unlink if head
281     if (x == s) { // was cancelled
282     clean(pred, s);
283     return null;
284     }
285     else if (x != null) {
286     s.set(s); // avoid garbage retention
287     return (E) x;
288     }
289     else
290     return e;
291     }
292     if (mode == TIMEOUT) {
293     long now = System.nanoTime();
294     nanos -= now - lastTime;
295     lastTime = now;
296     if (nanos <= 0) {
297     s.compareAndSet(e, s); // try to cancel
298     continue;
299     }
300     }
301     if (spins < 0) {
302     Node<E> h = head.get(); // only spin if at head
303     spins = ((h != null && h.next == s) ?
304     ((mode == TIMEOUT) ?
305     maxTimedSpins : maxUntimedSpins) : 0);
306     }
307     if (spins > 0)
308     --spins;
309     else if (s.waiter == null)
310     s.waiter = w;
311     else if (mode != TIMEOUT) {
312     LockSupport.park(this);
313     s.waiter = null;
314     spins = -1;
315     }
316     else if (nanos > spinForTimeoutThreshold) {
317     LockSupport.parkNanos(this, nanos);
318     s.waiter = null;
319     spins = -1;
320     }
321     }
322     }
323    
324     /**
325     * Returns validated tail for use in cleaning methods.
326     */
327     private Node<E> getValidatedTail() {
328     for (;;) {
329     Node<E> h = head.get();
330     Node<E> first = h.next;
331     if (first != null && first.next == first) { // help advance
332     advanceHead(h, first);
333     continue;
334     }
335     Node<E> t = tail.get();
336     Node<E> last = t.next;
337     if (t == tail.get()) {
338     if (last != null)
339     tail.compareAndSet(t, last); // help advance
340     else
341     return t;
342     }
343     }
344     }
345    
346     /**
347     * Gets rid of cancelled node s with original predecessor pred.
348     *
349     * @param pred predecessor of cancelled node
350     * @param s the cancelled node
351     */
352     private void clean(Node<E> pred, Node<E> s) {
353     Thread w = s.waiter;
354     if (w != null) { // Wake up thread
355     s.waiter = null;
356     if (w != Thread.currentThread())
357     LockSupport.unpark(w);
358     }
359    
360     if (pred == null)
361     return;
362    
363     /*
364     * At any given time, exactly one node on list cannot be
365     * deleted -- the last inserted node. To accommodate this, if
366     * we cannot delete s, we save its predecessor as "cleanMe",
367     * processing the previously saved version first. At least one
368     * of node s or the node previously saved can always be
369     * processed, so this always terminates.
370     */
371     while (pred.next == s) {
372     Node<E> oldpred = reclean(); // First, help get rid of cleanMe
373     Node<E> t = getValidatedTail();
374     if (s != t) { // If not tail, try to unsplice
375     Node<E> sn = s.next; // s.next == s means s already off list
376     if (sn == s || pred.casNext(s, sn))
377     break;
378     }
379     else if (oldpred == pred || // Already saved
380     (oldpred == null && cleanMe.compareAndSet(null, pred)))
381     break; // Postpone cleaning
382     }
383     }
384    
385     /**
386     * Tries to unsplice the cancelled node held in cleanMe that was
387     * previously uncleanable because it was at tail.
388     *
389     * @return current cleanMe node (or null)
390     */
391     private Node<E> reclean() {
392     /*
393     * cleanMe is, or at one time was, predecessor of cancelled
394     * node s that was the tail so could not be unspliced. If s
395     * is no longer the tail, try to unsplice if necessary and
396     * make cleanMe slot available. This differs from similar
397     * code in clean() because we must check that pred still
398     * points to a cancelled node that must be unspliced -- if
399     * not, we can (must) clear cleanMe without unsplicing.
400     * This can loop only due to contention on casNext or
401     * clearing cleanMe.
402     */
403     Node<E> pred;
404     while ((pred = cleanMe.get()) != null) {
405     Node<E> t = getValidatedTail();
406     Node<E> s = pred.next;
407     if (s != t) {
408     Node<E> sn;
409     if (s == null || s == pred || s.get() != s ||
410     (sn = s.next) == s || pred.casNext(s, sn))
411     cleanMe.compareAndSet(pred, null);
412     }
413     else // s is still tail; cannot clean
414     break;
415     }
416     return pred;
417     }
418    
419     /**
420     * Creates an initially empty {@code LinkedTransferQueue}.
421     */
422     public LinkedTransferQueue() {
423     Node<E> dummy = new Node<E>(null, false);
424     head = new PaddedAtomicReference<Node<E>>(dummy);
425     tail = new PaddedAtomicReference<Node<E>>(dummy);
426     cleanMe = new PaddedAtomicReference<Node<E>>(null);
427     }
428    
429     /**
430     * Creates a {@code LinkedTransferQueue}
431     * initially containing the elements of the given collection,
432     * added in traversal order of the collection's iterator.
433     *
434     * @param c the collection of elements to initially contain
435     * @throws NullPointerException if the specified collection or any
436     * of its elements are null
437     */
438     public LinkedTransferQueue(Collection<? extends E> c) {
439     this();
440     addAll(c);
441     }
442    
443 jsr166 1.4 /**
444     * @throws InterruptedException {@inheritDoc}
445     * @throws NullPointerException {@inheritDoc}
446     */
447 jsr166 1.1 public void put(E e) throws InterruptedException {
448     if (e == null) throw new NullPointerException();
449     if (Thread.interrupted()) throw new InterruptedException();
450     xfer(e, NOWAIT, 0);
451     }
452    
453 jsr166 1.4 /**
454     * @throws InterruptedException {@inheritDoc}
455     * @throws NullPointerException {@inheritDoc}
456     */
457 jsr166 1.1 public boolean offer(E e, long timeout, TimeUnit unit)
458     throws InterruptedException {
459     if (e == null) throw new NullPointerException();
460     if (Thread.interrupted()) throw new InterruptedException();
461     xfer(e, NOWAIT, 0);
462     return true;
463     }
464    
465 jsr166 1.4 /**
466     * @throws NullPointerException {@inheritDoc}
467     */
468 jsr166 1.1 public boolean offer(E e) {
469     if (e == null) throw new NullPointerException();
470     xfer(e, NOWAIT, 0);
471     return true;
472     }
473    
474 jsr166 1.4 /**
475     * @throws NullPointerException {@inheritDoc}
476     */
477 jsr166 1.1 public boolean add(E e) {
478     if (e == null) throw new NullPointerException();
479     xfer(e, NOWAIT, 0);
480     return true;
481     }
482    
483 jsr166 1.4 /**
484     * @throws InterruptedException {@inheritDoc}
485     * @throws NullPointerException {@inheritDoc}
486     */
487 jsr166 1.1 public void transfer(E e) throws InterruptedException {
488     if (e == null) throw new NullPointerException();
489     if (xfer(e, WAIT, 0) == null) {
490     Thread.interrupted();
491     throw new InterruptedException();
492     }
493     }
494    
495 jsr166 1.4 /**
496     * @throws InterruptedException {@inheritDoc}
497     * @throws NullPointerException {@inheritDoc}
498     */
499 jsr166 1.1 public boolean tryTransfer(E e, long timeout, TimeUnit unit)
500     throws InterruptedException {
501     if (e == null) throw new NullPointerException();
502     if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null)
503     return true;
504     if (!Thread.interrupted())
505     return false;
506     throw new InterruptedException();
507     }
508    
509 jsr166 1.4 /**
510     * @throws NullPointerException {@inheritDoc}
511     */
512 jsr166 1.1 public boolean tryTransfer(E e) {
513     if (e == null) throw new NullPointerException();
514     return fulfill(e) != null;
515     }
516    
517 jsr166 1.4 /**
518     * @throws InterruptedException {@inheritDoc}
519     */
520 jsr166 1.1 public E take() throws InterruptedException {
521     Object e = xfer(null, WAIT, 0);
522     if (e != null)
523     return (E) e;
524     Thread.interrupted();
525     throw new InterruptedException();
526     }
527    
528 jsr166 1.4 /**
529     * @throws InterruptedException {@inheritDoc}
530     */
531 jsr166 1.1 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
532     Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
533     if (e != null || !Thread.interrupted())
534     return (E) e;
535     throw new InterruptedException();
536     }
537    
538     public E poll() {
539     return fulfill(null);
540     }
541    
542 jsr166 1.4 /**
543     * @throws NullPointerException {@inheritDoc}
544     * @throws IllegalArgumentException {@inheritDoc}
545     */
546 jsr166 1.1 public int drainTo(Collection<? super E> c) {
547     if (c == null)
548     throw new NullPointerException();
549     if (c == this)
550     throw new IllegalArgumentException();
551     int n = 0;
552     E e;
553     while ( (e = poll()) != null) {
554     c.add(e);
555     ++n;
556     }
557     return n;
558     }
559    
560 jsr166 1.4 /**
561     * @throws NullPointerException {@inheritDoc}
562     * @throws IllegalArgumentException {@inheritDoc}
563     */
564 jsr166 1.1 public int drainTo(Collection<? super E> c, int maxElements) {
565     if (c == null)
566     throw new NullPointerException();
567     if (c == this)
568     throw new IllegalArgumentException();
569     int n = 0;
570     E e;
571     while (n < maxElements && (e = poll()) != null) {
572     c.add(e);
573     ++n;
574     }
575     return n;
576     }
577    
578     // Traversal-based methods
579    
580     /**
581     * Returns head after performing any outstanding helping steps.
582     */
583     private Node<E> traversalHead() {
584     for (;;) {
585     Node<E> t = tail.get();
586     Node<E> h = head.get();
587     if (h != null && t != null) {
588     Node<E> last = t.next;
589     Node<E> first = h.next;
590     if (t == tail.get()) {
591     if (last != null)
592     tail.compareAndSet(t, last);
593     else if (first != null) {
594     Object x = first.get();
595     if (x == first)
596     advanceHead(h, first);
597     else
598     return h;
599     }
600     else
601     return h;
602     }
603     }
604     reclean();
605     }
606     }
607    
608    
609     public Iterator<E> iterator() {
610     return new Itr();
611     }
612    
613     /**
614     * Iterators. Basic strategy is to traverse list, treating
615     * non-data (i.e., request) nodes as terminating list.
616     * Once a valid data node is found, the item is cached
617     * so that the next call to next() will return it even
618     * if subsequently removed.
619     */
620     class Itr implements Iterator<E> {
621     Node<E> next; // node to return next
622     Node<E> pnext; // predecessor of next
623     Node<E> snext; // successor of next
624     Node<E> curr; // last returned node, for remove()
625     Node<E> pcurr; // predecessor of curr, for remove()
626     E nextItem; // Cache of next item, once committed to in next
627    
628     Itr() {
629     findNext();
630     }
631    
632     /**
633     * Ensures next points to next valid node, or null if none.
634     */
635     void findNext() {
636     for (;;) {
637     Node<E> pred = pnext;
638     Node<E> q = next;
639     if (pred == null || pred == q) {
640     pred = traversalHead();
641     q = pred.next;
642     }
643     if (q == null || !q.isData) {
644     next = null;
645     return;
646     }
647     Object x = q.get();
648     Node<E> s = q.next;
649     if (x != null && q != x && q != s) {
650     nextItem = (E) x;
651     snext = s;
652     pnext = pred;
653     next = q;
654     return;
655     }
656     pnext = q;
657     next = s;
658     }
659     }
660    
661     public boolean hasNext() {
662     return next != null;
663     }
664    
665     public E next() {
666     if (next == null) throw new NoSuchElementException();
667     pcurr = pnext;
668     curr = next;
669     pnext = next;
670     next = snext;
671     E x = nextItem;
672     findNext();
673     return x;
674     }
675    
676     public void remove() {
677     Node<E> p = curr;
678     if (p == null)
679     throw new IllegalStateException();
680     Object x = p.get();
681     if (x != null && x != p && p.compareAndSet(x, p))
682     clean(pcurr, p);
683     }
684     }
685    
686     public E peek() {
687     for (;;) {
688     Node<E> h = traversalHead();
689     Node<E> p = h.next;
690     if (p == null)
691     return null;
692     Object x = p.get();
693     if (p != x) {
694     if (!p.isData)
695     return null;
696     if (x != null)
697     return (E) x;
698     }
699     }
700     }
701    
702     public boolean isEmpty() {
703     for (;;) {
704     Node<E> h = traversalHead();
705     Node<E> p = h.next;
706     if (p == null)
707     return true;
708     Object x = p.get();
709     if (p != x) {
710     if (!p.isData)
711     return true;
712     if (x != null)
713     return false;
714     }
715     }
716     }
717    
718     public boolean hasWaitingConsumer() {
719     for (;;) {
720     Node<E> h = traversalHead();
721     Node<E> p = h.next;
722     if (p == null)
723     return false;
724     Object x = p.get();
725     if (p != x)
726     return !p.isData;
727     }
728     }
729    
730     /**
731     * Returns the number of elements in this queue. If this queue
732     * contains more than {@code Integer.MAX_VALUE} elements, returns
733     * {@code Integer.MAX_VALUE}.
734     *
735     * <p>Beware that, unlike in most collections, this method is
736     * <em>NOT</em> a constant-time operation. Because of the
737     * asynchronous nature of these queues, determining the current
738     * number of elements requires an O(n) traversal.
739     *
740     * @return the number of elements in this queue
741     */
742     public int size() {
743     int count = 0;
744     Node<E> h = traversalHead();
745     for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
746     Object x = p.get();
747     if (x != null && x != p) {
748     if (++count == Integer.MAX_VALUE) // saturated
749     break;
750     }
751     }
752     return count;
753     }
754    
755     public int getWaitingConsumerCount() {
756     int count = 0;
757     Node<E> h = traversalHead();
758     for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
759     if (p.get() == null) {
760     if (++count == Integer.MAX_VALUE)
761     break;
762     }
763     }
764     return count;
765     }
766    
767     public int remainingCapacity() {
768     return Integer.MAX_VALUE;
769     }
770    
771     public boolean remove(Object o) {
772     if (o == null)
773     return false;
774     for (;;) {
775     Node<E> pred = traversalHead();
776     for (;;) {
777     Node<E> q = pred.next;
778     if (q == null || !q.isData)
779     return false;
780     if (q == pred) // restart
781     break;
782     Object x = q.get();
783     if (x != null && x != q && o.equals(x) &&
784     q.compareAndSet(x, q)) {
785     clean(pred, q);
786     return true;
787     }
788     pred = q;
789     }
790     }
791     }
792    
793     /**
794     * Save the state to a stream (that is, serialize it).
795     *
796     * @serialData All of the elements (each an {@code E}) in
797     * the proper order, followed by a null
798     * @param s the stream
799     */
800     private void writeObject(java.io.ObjectOutputStream s)
801     throws java.io.IOException {
802     s.defaultWriteObject();
803     for (E e : this)
804     s.writeObject(e);
805     // Use trailing null as sentinel
806     s.writeObject(null);
807     }
808    
809     /**
810     * Reconstitute the Queue instance from a stream (that is,
811     * deserialize it).
812     *
813     * @param s the stream
814     */
815     private void readObject(java.io.ObjectInputStream s)
816     throws java.io.IOException, ClassNotFoundException {
817     s.defaultReadObject();
818     resetHeadAndTail();
819     for (;;) {
820     @SuppressWarnings("unchecked") E item = (E) s.readObject();
821     if (item == null)
822     break;
823     else
824     offer(item);
825     }
826     }
827    
828     // Support for resetting head/tail while deserializing
829     private void resetHeadAndTail() {
830     Node<E> dummy = new Node<E>(null, false);
831     UNSAFE.putObjectVolatile(this, headOffset,
832     new PaddedAtomicReference<Node<E>>(dummy));
833     UNSAFE.putObjectVolatile(this, tailOffset,
834     new PaddedAtomicReference<Node<E>>(dummy));
835     UNSAFE.putObjectVolatile(this, cleanMeOffset,
836     new PaddedAtomicReference<Node<E>>(null));
837     }
838    
839 jsr166 1.3 // Unsafe mechanics
840 jsr166 1.1
841 jsr166 1.3 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
842     private static final long headOffset =
843 jsr166 1.4 objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
844 jsr166 1.3 private static final long tailOffset =
845 jsr166 1.4 objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
846 jsr166 1.3 private static final long cleanMeOffset =
847 jsr166 1.4 objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
848    
849 jsr166 1.3
850 jsr166 1.4 static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
851     String field, Class<?> klazz) {
852 jsr166 1.1 try {
853 jsr166 1.3 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
854 jsr166 1.1 } catch (NoSuchFieldException e) {
855 jsr166 1.3 // Convert Exception to corresponding Error
856     NoSuchFieldError error = new NoSuchFieldError(field);
857 jsr166 1.1 error.initCause(e);
858     throw error;
859     }
860     }
861     }