ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.9 by dl, Sun Nov 16 20:24:54 2008 UTC vs.
Revision 1.26 by jsr166, Sat Jul 25 00:34:00 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
14 < import sun.misc.Unsafe;
15 < import java.lang.reflect.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.Iterator;
14 > import java.util.NoSuchElementException;
15 > import java.util.concurrent.locks.LockSupport;
16 > import java.util.concurrent.atomic.AtomicReference;
17 > import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18  
19   /**
20   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 21 | Line 24 | import java.lang.reflect.*;
24   * producer.  The <em>tail</em> of the queue is that element that has
25   * been on the queue the shortest time for some producer.
26   *
27 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
27 > * <p>Beware that, unlike in most collections, the {@code size}
28   * method is <em>NOT</em> a constant-time operation. Because of the
29   * asynchronous nature of these queues, determining the current number
30   * of elements requires a traversal of the elements.
# Line 44 | Line 47 | import java.lang.reflect.*;
47   * @since 1.7
48   * @author Doug Lea
49   * @param <E> the type of elements held in this collection
47 *
50   */
51   public class LinkedTransferQueue<E> extends AbstractQueue<E>
52      implements TransferQueue<E>, java.io.Serializable {
# Line 81 | Line 83 | public class LinkedTransferQueue<E> exte
83       * seems not to vary with number of CPUs (beyond 2) so is just
84       * a constant.
85       */
86 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
86 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
87  
88      /**
89       * The number of times to spin before blocking in untimed waits.
# Line 103 | Line 105 | public class LinkedTransferQueue<E> exte
105       * garbage retention. Similarly, setting the next field to this is
106       * used as sentinel that node is off list.
107       */
108 <    static final class QNode extends AtomicReference<Object> {
109 <        volatile QNode next;
108 >    static final class Node<E> extends AtomicReference<Object> {
109 >        volatile Node<E> next;
110          volatile Thread waiter;       // to control park/unpark
111          final boolean isData;
112 <        QNode(Object item, boolean isData) {
112 >
113 >        Node(E item, boolean isData) {
114              super(item);
115              this.isData = isData;
116          }
117  
118 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
118 >        @SuppressWarnings("rawtypes")
119 >        static final AtomicReferenceFieldUpdater<Node, Node>
120              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
121 <            (QNode.class, QNode.class, "next");
121 >            (Node.class, Node.class, "next");
122  
123 <        boolean casNext(QNode cmp, QNode val) {
123 >        final boolean casNext(Node<E> cmp, Node<E> val) {
124              return nextUpdater.compareAndSet(this, cmp, val);
125          }
126 +
127 +        final void clearNext() {
128 +            nextUpdater.lazySet(this, this);
129 +        }
130 +
131 +        private static final long serialVersionUID = -3375979862319811754L;
132      }
133  
134      /**
# Line 130 | Line 140 | public class LinkedTransferQueue<E> exte
140          // enough padding for 64bytes with 4byte refs
141          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
142          PaddedAtomicReference(T r) { super(r); }
143 +        private static final long serialVersionUID = 8170090609809740854L;
144      }
145  
146  
147      /** head of the queue */
148 <    private transient final PaddedAtomicReference<QNode> head;
148 >    private transient final PaddedAtomicReference<Node<E>> head;
149 >
150      /** tail of the queue */
151 <    private transient final PaddedAtomicReference<QNode> tail;
151 >    private transient final PaddedAtomicReference<Node<E>> tail;
152  
153      /**
154       * Reference to a cancelled node that might not yet have been
155       * unlinked from queue because it was the last inserted node
156       * when it cancelled.
157       */
158 <    private transient final PaddedAtomicReference<QNode> cleanMe;
158 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
159  
160      /**
161       * Tries to cas nh as new head; if successful, unlink
162       * old head's next node to avoid garbage retention.
163       */
164 <    private boolean advanceHead(QNode h, QNode nh) {
164 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
165          if (h == head.get() && head.compareAndSet(h, nh)) {
166 <            h.next = h; // forget old next
166 >            h.clearNext(); // forget old next
167              return true;
168          }
169          return false;
# Line 161 | Line 173 | public class LinkedTransferQueue<E> exte
173       * Puts or takes an item. Used for most queue operations (except
174       * poll() and tryTransfer()). See the similar code in
175       * SynchronousQueue for detailed explanation.
176 +     *
177       * @param e the item or if null, signifies that this is a take
178       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
179       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
180       * @return an item, or null on failure
181       */
182 <    private Object xfer(Object e, int mode, long nanos) {
182 >    private E xfer(E e, int mode, long nanos) {
183          boolean isData = (e != null);
184 <        QNode s = null;
185 <        final PaddedAtomicReference<QNode> head = this.head;
186 <        final PaddedAtomicReference<QNode> tail = this.tail;
184 >        Node<E> s = null;
185 >        final PaddedAtomicReference<Node<E>> head = this.head;
186 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
187  
188          for (;;) {
189 <            QNode t = tail.get();
190 <            QNode h = head.get();
189 >            Node<E> t = tail.get();
190 >            Node<E> h = head.get();
191  
192              if (t != null && (t == h || t.isData == isData)) {
193                  if (s == null)
194 <                    s = new QNode(e, isData);
195 <                QNode last = t.next;
194 >                    s = new Node<E>(e, isData);
195 >                Node<E> last = t.next;
196                  if (last != null) {
197                      if (t == tail.get())
198                          tail.compareAndSet(t, last);
# Line 191 | Line 204 | public class LinkedTransferQueue<E> exte
204              }
205  
206              else if (h != null) {
207 <                QNode first = h.next;
207 >                Node<E> first = h.next;
208                  if (t == tail.get() && first != null &&
209                      advanceHead(h, first)) {
210                      Object x = first.get();
211                      if (x != first && first.compareAndSet(x, e)) {
212                          LockSupport.unpark(first.waiter);
213 <                        return isData? e : x;
213 >                        return isData ? e : (E) x;
214                      }
215                  }
216              }
# Line 207 | Line 220 | public class LinkedTransferQueue<E> exte
220  
221      /**
222       * Version of xfer for poll() and tryTransfer, which
223 <     * simplifies control paths both here and in xfer
223 >     * simplifies control paths both here and in xfer.
224       */
225 <    private Object fulfill(Object e) {
225 >    private E fulfill(E e) {
226          boolean isData = (e != null);
227 <        final PaddedAtomicReference<QNode> head = this.head;
228 <        final PaddedAtomicReference<QNode> tail = this.tail;
227 >        final PaddedAtomicReference<Node<E>> head = this.head;
228 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
229  
230          for (;;) {
231 <            QNode t = tail.get();
232 <            QNode h = head.get();
231 >            Node<E> t = tail.get();
232 >            Node<E> h = head.get();
233  
234              if (t != null && (t == h || t.isData == isData)) {
235 <                QNode last = t.next;
235 >                Node<E> last = t.next;
236                  if (t == tail.get()) {
237                      if (last != null)
238                          tail.compareAndSet(t, last);
# Line 228 | Line 241 | public class LinkedTransferQueue<E> exte
241                  }
242              }
243              else if (h != null) {
244 <                QNode first = h.next;
244 >                Node<E> first = h.next;
245                  if (t == tail.get() &&
246                      first != null &&
247                      advanceHead(h, first)) {
248                      Object x = first.get();
249                      if (x != first && first.compareAndSet(x, e)) {
250                          LockSupport.unpark(first.waiter);
251 <                        return isData? e : x;
251 >                        return isData ? e : (E) x;
252                      }
253                  }
254              }
# Line 253 | Line 266 | public class LinkedTransferQueue<E> exte
266       * @param nanos timeout value
267       * @return matched item, or s if cancelled
268       */
269 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
270 <                                int mode, long nanos) {
269 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
270 >                           int mode, long nanos) {
271          if (mode == NOWAIT)
272              return null;
273  
274 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
274 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
275          Thread w = Thread.currentThread();
276          int spins = -1; // set to desired spin count below
277          for (;;) {
# Line 267 | Line 280 | public class LinkedTransferQueue<E> exte
280              Object x = s.get();
281              if (x != e) {                 // Node was matched or cancelled
282                  advanceHead(pred, s);     // unlink if head
283 <                if (x == s) {              // was cancelled
283 >                if (x == s) {             // was cancelled
284                      clean(pred, s);
285                      return null;
286                  }
287                  else if (x != null) {
288                      s.set(s);             // avoid garbage retention
289 <                    return x;
289 >                    return (E) x;
290                  }
291                  else
292                      return e;
# Line 288 | Line 301 | public class LinkedTransferQueue<E> exte
301                  }
302              }
303              if (spins < 0) {
304 <                QNode h = head.get(); // only spin if at head
304 >                Node<E> h = head.get(); // only spin if at head
305                  spins = ((h != null && h.next == s) ?
306 <                         (mode == TIMEOUT?
306 >                         ((mode == TIMEOUT) ?
307                            maxTimedSpins : maxUntimedSpins) : 0);
308              }
309              if (spins > 0)
# Line 298 | Line 311 | public class LinkedTransferQueue<E> exte
311              else if (s.waiter == null)
312                  s.waiter = w;
313              else if (mode != TIMEOUT) {
314 <                //                LockSupport.park(this);
302 <                LockSupport.park(); // allows run on java5
314 >                LockSupport.park(this);
315                  s.waiter = null;
316                  spins = -1;
317              }
318              else if (nanos > spinForTimeoutThreshold) {
319 <                //                LockSupport.parkNanos(this, nanos);
308 <                LockSupport.parkNanos(nanos);
319 >                LockSupport.parkNanos(this, nanos);
320                  s.waiter = null;
321                  spins = -1;
322              }
# Line 313 | Line 324 | public class LinkedTransferQueue<E> exte
324      }
325  
326      /**
327 <     * Returns validated tail for use in cleaning methods
327 >     * Returns validated tail for use in cleaning methods.
328       */
329 <    private QNode getValidatedTail() {
329 >    private Node<E> getValidatedTail() {
330          for (;;) {
331 <            QNode h = head.get();
332 <            QNode first = h.next;
331 >            Node<E> h = head.get();
332 >            Node<E> first = h.next;
333              if (first != null && first.next == first) { // help advance
334                  advanceHead(h, first);
335                  continue;
336              }
337 <            QNode t = tail.get();
338 <            QNode last = t.next;
337 >            Node<E> t = tail.get();
338 >            Node<E> last = t.next;
339              if (t == tail.get()) {
340                  if (last != null)
341                      tail.compareAndSet(t, last); // help advance
# Line 332 | Line 343 | public class LinkedTransferQueue<E> exte
343                      return t;
344              }
345          }
346 <    }    
346 >    }
347  
348      /**
349       * Gets rid of cancelled node s with original predecessor pred.
350 +     *
351       * @param pred predecessor of cancelled node
352       * @param s the cancelled node
353       */
354 <    private void clean(QNode pred, QNode s) {
354 >    private void clean(Node<E> pred, Node<E> s) {
355          Thread w = s.waiter;
356          if (w != null) {             // Wake up thread
357              s.waiter = null;
358              if (w != Thread.currentThread())
359                  LockSupport.unpark(w);
360          }
361 +
362 +        if (pred == null)
363 +            return;
364 +
365          /*
366           * At any given time, exactly one node on list cannot be
367           * deleted -- the last inserted node. To accommodate this, if
# Line 355 | Line 371 | public class LinkedTransferQueue<E> exte
371           * processed, so this always terminates.
372           */
373          while (pred.next == s) {
374 <            QNode oldpred = reclean();  // First, help get rid of cleanMe
375 <            QNode t = getValidatedTail();
374 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
375 >            Node<E> t = getValidatedTail();
376              if (s != t) {               // If not tail, try to unsplice
377 <                QNode sn = s.next;      // s.next == s means s already off list
378 <                if (sn == s || pred.casNext(s, sn))
377 >                Node<E> sn = s.next;      // s.next == s means s already off list
378 >                if (sn == s || pred.casNext(s, sn))
379                      break;
380              }
381              else if (oldpred == pred || // Already saved
# Line 371 | Line 387 | public class LinkedTransferQueue<E> exte
387      /**
388       * Tries to unsplice the cancelled node held in cleanMe that was
389       * previously uncleanable because it was at tail.
390 +     *
391       * @return current cleanMe node (or null)
392       */
393 <    private QNode reclean() {
394 <        /*
393 >    private Node<E> reclean() {
394 >        /*
395           * cleanMe is, or at one time was, predecessor of cancelled
396           * node s that was the tail so could not be unspliced.  If s
397           * is no longer the tail, try to unsplice if necessary and
# Line 383 | Line 400 | public class LinkedTransferQueue<E> exte
400           * points to a cancelled node that must be unspliced -- if
401           * not, we can (must) clear cleanMe without unsplicing.
402           * This can loop only due to contention on casNext or
403 <         * clearing cleanMe.
403 >         * clearing cleanMe.
404           */
405 <        QNode pred;
405 >        Node<E> pred;
406          while ((pred = cleanMe.get()) != null) {
407 <            QNode t = getValidatedTail();
408 <            QNode s = pred.next;
409 <            if (s != t) {
410 <                QNode sn;
407 >            Node<E> t = getValidatedTail();
408 >            Node<E> s = pred.next;
409 >            if (s != t) {
410 >                Node<E> sn;
411                  if (s == null || s == pred || s.get() != s ||
412                      (sn = s.next) == s || pred.casNext(s, sn))
413                      cleanMe.compareAndSet(pred, null);
# Line 402 | Line 419 | public class LinkedTransferQueue<E> exte
419      }
420  
421      /**
422 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
422 >     * Creates an initially empty {@code LinkedTransferQueue}.
423       */
424      public LinkedTransferQueue() {
425 <        QNode dummy = new QNode(null, false);
426 <        head = new PaddedAtomicReference<QNode>(dummy);
427 <        tail = new PaddedAtomicReference<QNode>(dummy);
428 <        cleanMe = new PaddedAtomicReference<QNode>(null);
425 >        Node<E> dummy = new Node<E>(null, false);
426 >        head = new PaddedAtomicReference<Node<E>>(dummy);
427 >        tail = new PaddedAtomicReference<Node<E>>(dummy);
428 >        cleanMe = new PaddedAtomicReference<Node<E>>(null);
429      }
430  
431      /**
432 <     * Creates a <tt>LinkedTransferQueue</tt>
432 >     * Creates a {@code LinkedTransferQueue}
433       * initially containing the elements of the given collection,
434       * added in traversal order of the collection's iterator.
435 +     *
436       * @param c the collection of elements to initially contain
437       * @throws NullPointerException if the specified collection or any
438       *         of its elements are null
# Line 444 | Line 462 | public class LinkedTransferQueue<E> exte
462          return true;
463      }
464  
465 +    public boolean add(E e) {
466 +        if (e == null) throw new NullPointerException();
467 +        xfer(e, NOWAIT, 0);
468 +        return true;
469 +    }
470 +
471      public void transfer(E e) throws InterruptedException {
472          if (e == null) throw new NullPointerException();
473          if (xfer(e, WAIT, 0) == null) {
# Line 470 | Line 494 | public class LinkedTransferQueue<E> exte
494      public E take() throws InterruptedException {
495          Object e = xfer(null, WAIT, 0);
496          if (e != null)
497 <            return (E)e;
497 >            return (E) e;
498          Thread.interrupted();
499          throw new InterruptedException();
500      }
# Line 478 | Line 502 | public class LinkedTransferQueue<E> exte
502      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
503          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
504          if (e != null || !Thread.interrupted())
505 <            return (E)e;
505 >            return (E) e;
506          throw new InterruptedException();
507      }
508  
509      public E poll() {
510 <        return (E)fulfill(null);
510 >        return fulfill(null);
511      }
512  
513      public int drainTo(Collection<? super E> c) {
# Line 517 | Line 541 | public class LinkedTransferQueue<E> exte
541      // Traversal-based methods
542  
543      /**
544 <     * Return head after performing any outstanding helping steps
544 >     * Returns head after performing any outstanding helping steps.
545       */
546 <    private QNode traversalHead() {
546 >    private Node<E> traversalHead() {
547          for (;;) {
548 <            QNode t = tail.get();
549 <            QNode h = head.get();
548 >            Node<E> t = tail.get();
549 >            Node<E> h = head.get();
550              if (h != null && t != null) {
551 <                QNode last = t.next;
552 <                QNode first = h.next;
551 >                Node<E> last = t.next;
552 >                Node<E> first = h.next;
553                  if (t == tail.get()) {
554                      if (last != null)
555                          tail.compareAndSet(t, last);
# Line 540 | Line 564 | public class LinkedTransferQueue<E> exte
564                          return h;
565                  }
566              }
567 +            reclean();
568          }
569      }
570  
# Line 556 | Line 581 | public class LinkedTransferQueue<E> exte
581       * if subsequently removed.
582       */
583      class Itr implements Iterator<E> {
584 <        QNode nextNode;    // Next node to return next
585 <        QNode currentNode; // last returned node, for remove()
586 <        QNode prevNode;    // predecessor of last returned node
587 <        E nextItem;        // Cache of next item, once commited to in next
584 >        Node<E> next;        // node to return next
585 >        Node<E> pnext;       // predecessor of next
586 >        Node<E> snext;       // successor of next
587 >        Node<E> curr;        // last returned node, for remove()
588 >        Node<E> pcurr;       // predecessor of curr, for remove()
589 >        E nextItem;        // Cache of next item, once committed to in next
590  
591          Itr() {
592 <            nextNode = traversalHead();
566 <            advance();
592 >            findNext();
593          }
594  
595 <        E advance() {
596 <            prevNode = currentNode;
597 <            currentNode = nextNode;
598 <            E x = nextItem;
573 <
574 <            QNode p = nextNode.next;
595 >        /**
596 >         * Ensures next points to next valid node, or null if none.
597 >         */
598 >        void findNext() {
599              for (;;) {
600 <                if (p == null || !p.isData) {
601 <                    nextNode = null;
602 <                    nextItem = null;
603 <                    return x;
604 <                }
605 <                Object item = p.get();
606 <                if (item != p && item != null) {
607 <                    nextNode = p;
608 <                    nextItem = (E)item;
609 <                    return x;
600 >                Node<E> pred = pnext;
601 >                Node<E> q = next;
602 >                if (pred == null || pred == q) {
603 >                    pred = traversalHead();
604 >                    q = pred.next;
605 >                }
606 >                if (q == null || !q.isData) {
607 >                    next = null;
608 >                    return;
609 >                }
610 >                Object x = q.get();
611 >                Node<E> s = q.next;
612 >                if (x != null && q != x && q != s) {
613 >                    nextItem = (E) x;
614 >                    snext = s;
615 >                    pnext = pred;
616 >                    next = q;
617 >                    return;
618                  }
619 <                prevNode = p;
620 <                p = p.next;
619 >                pnext = q;
620 >                next = s;
621              }
622          }
623  
624          public boolean hasNext() {
625 <            return nextNode != null;
625 >            return next != null;
626          }
627  
628          public E next() {
629 <            if (nextNode == null) throw new NoSuchElementException();
630 <            return advance();
629 >            if (next == null) throw new NoSuchElementException();
630 >            pcurr = pnext;
631 >            curr = next;
632 >            pnext = next;
633 >            next = snext;
634 >            E x = nextItem;
635 >            findNext();
636 >            return x;
637          }
638  
639          public void remove() {
640 <            QNode p = currentNode;
641 <            QNode prev = prevNode;
604 <            if (prev == null || p == null)
640 >            Node<E> p = curr;
641 >            if (p == null)
642                  throw new IllegalStateException();
643              Object x = p.get();
644              if (x != null && x != p && p.compareAndSet(x, p))
645 <                clean(prev, p);
645 >                clean(pcurr, p);
646          }
647      }
648  
649      public E peek() {
650          for (;;) {
651 <            QNode h = traversalHead();
652 <            QNode p = h.next;
651 >            Node<E> h = traversalHead();
652 >            Node<E> p = h.next;
653              if (p == null)
654                  return null;
655              Object x = p.get();
# Line 620 | Line 657 | public class LinkedTransferQueue<E> exte
657                  if (!p.isData)
658                      return null;
659                  if (x != null)
660 <                    return (E)x;
660 >                    return (E) x;
661              }
662          }
663      }
664  
665      public boolean isEmpty() {
666          for (;;) {
667 <            QNode h = traversalHead();
668 <            QNode p = h.next;
667 >            Node<E> h = traversalHead();
668 >            Node<E> p = h.next;
669              if (p == null)
670                  return true;
671              Object x = p.get();
# Line 643 | Line 680 | public class LinkedTransferQueue<E> exte
680  
681      public boolean hasWaitingConsumer() {
682          for (;;) {
683 <            QNode h = traversalHead();
684 <            QNode p = h.next;
683 >            Node<E> h = traversalHead();
684 >            Node<E> p = h.next;
685              if (p == null)
686                  return false;
687              Object x = p.get();
# Line 655 | Line 692 | public class LinkedTransferQueue<E> exte
692  
693      /**
694       * Returns the number of elements in this queue.  If this queue
695 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
696 <     * <tt>Integer.MAX_VALUE</tt>.
695 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
696 >     * {@code Integer.MAX_VALUE}.
697       *
698       * <p>Beware that, unlike in most collections, this method is
699       * <em>NOT</em> a constant-time operation. Because of the
# Line 667 | Line 704 | public class LinkedTransferQueue<E> exte
704       */
705      public int size() {
706          int count = 0;
707 <        QNode h = traversalHead();
708 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
707 >        Node<E> h = traversalHead();
708 >        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
709              Object x = p.get();
710              if (x != null && x != p) {
711                  if (++count == Integer.MAX_VALUE) // saturated
# Line 680 | Line 717 | public class LinkedTransferQueue<E> exte
717  
718      public int getWaitingConsumerCount() {
719          int count = 0;
720 <        QNode h = traversalHead();
721 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
720 >        Node<E> h = traversalHead();
721 >        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
722              if (p.get() == null) {
723                  if (++count == Integer.MAX_VALUE)
724                      break;
# Line 694 | Line 731 | public class LinkedTransferQueue<E> exte
731          return Integer.MAX_VALUE;
732      }
733  
734 +    public boolean remove(Object o) {
735 +        if (o == null)
736 +            return false;
737 +        for (;;) {
738 +            Node<E> pred = traversalHead();
739 +            for (;;) {
740 +                Node<E> q = pred.next;
741 +                if (q == null || !q.isData)
742 +                    return false;
743 +                if (q == pred) // restart
744 +                    break;
745 +                Object x = q.get();
746 +                if (x != null && x != q && o.equals(x) &&
747 +                    q.compareAndSet(x, q)) {
748 +                    clean(pred, q);
749 +                    return true;
750 +                }
751 +                pred = q;
752 +            }
753 +        }
754 +    }
755 +
756      /**
757       * Save the state to a stream (that is, serialize it).
758       *
759 <     * @serialData All of the elements (each an <tt>E</tt>) in
759 >     * @serialData All of the elements (each an {@code E}) in
760       * the proper order, followed by a null
761       * @param s the stream
762       */
763      private void writeObject(java.io.ObjectOutputStream s)
764          throws java.io.IOException {
765          s.defaultWriteObject();
766 <        for (Iterator<E> it = iterator(); it.hasNext(); )
767 <            s.writeObject(it.next());
766 >        for (E e : this)
767 >            s.writeObject(e);
768          // Use trailing null as sentinel
769          s.writeObject(null);
770      }
# Line 713 | Line 772 | public class LinkedTransferQueue<E> exte
772      /**
773       * Reconstitute the Queue instance from a stream (that is,
774       * deserialize it).
775 +     *
776       * @param s the stream
777       */
778      private void readObject(java.io.ObjectInputStream s)
# Line 720 | Line 780 | public class LinkedTransferQueue<E> exte
780          s.defaultReadObject();
781          resetHeadAndTail();
782          for (;;) {
783 <            E item = (E)s.readObject();
783 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
784              if (item == null)
785                  break;
786              else
# Line 728 | Line 788 | public class LinkedTransferQueue<E> exte
788          }
789      }
790  
731
791      // Support for resetting head/tail while deserializing
792 +    private void resetHeadAndTail() {
793 +        Node<E> dummy = new Node<E>(null, false);
794 +        UNSAFE.putObjectVolatile(this, headOffset,
795 +                                 new PaddedAtomicReference<Node<E>>(dummy));
796 +        UNSAFE.putObjectVolatile(this, tailOffset,
797 +                                 new PaddedAtomicReference<Node<E>>(dummy));
798 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
799 +                                 new PaddedAtomicReference<Node<E>>(null));
800 +    }
801  
802 <    // Temporary Unsafe mechanics for preliminary release
803 <    private static final Unsafe _unsafe;
736 <    private static final long headOffset;
737 <    private static final long tailOffset;
738 <    private static final long cleanMeOffset;
739 <    static {
802 >    // Unsafe mechanics for jsr166y 3rd party package.
803 >    private static sun.misc.Unsafe getUnsafe() {
804          try {
805 <            if (LinkedTransferQueue.class.getClassLoader() != null) {
806 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
807 <                f.setAccessible(true);
808 <                _unsafe = (Unsafe)f.get(null);
805 >            return sun.misc.Unsafe.getUnsafe();
806 >        } catch (SecurityException se) {
807 >            try {
808 >                return java.security.AccessController.doPrivileged
809 >                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
810 >                        public sun.misc.Unsafe run() throws Exception {
811 >                            return getUnsafeByReflection();
812 >                        }});
813 >            } catch (java.security.PrivilegedActionException e) {
814 >                throw new RuntimeException("Could not initialize intrinsics",
815 >                                           e.getCause());
816              }
746            else
747                _unsafe = Unsafe.getUnsafe();
748            headOffset = _unsafe.objectFieldOffset
749                (LinkedTransferQueue.class.getDeclaredField("head"));
750            tailOffset = _unsafe.objectFieldOffset
751                (LinkedTransferQueue.class.getDeclaredField("tail"));
752            cleanMeOffset = _unsafe.objectFieldOffset
753                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
754        } catch (Exception e) {
755            throw new RuntimeException("Could not initialize intrinsics", e);
817          }
818      }
819  
820 <    private void resetHeadAndTail() {
821 <        QNode dummy = new QNode(null, false);
822 <        _unsafe.putObjectVolatile(this, headOffset,
823 <                                  new PaddedAtomicReference<QNode>(dummy));
824 <        _unsafe.putObjectVolatile(this, tailOffset,
825 <                                  new PaddedAtomicReference<QNode>(dummy));
826 <        _unsafe.putObjectVolatile(this, cleanMeOffset,
766 <                                  new PaddedAtomicReference<QNode>(null));
820 >    private static sun.misc.Unsafe getUnsafeByReflection()
821 >            throws NoSuchFieldException, IllegalAccessException {
822 >        java.lang.reflect.Field f =
823 >            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
824 >        f.setAccessible(true);
825 >        return (sun.misc.Unsafe) f.get(null);
826 >    }
827  
828 +    private static long fieldOffset(String fieldName, Class<?> klazz) {
829 +        try {
830 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
831 +        } catch (NoSuchFieldException e) {
832 +            // Convert Exception to Error
833 +            NoSuchFieldError error = new NoSuchFieldError(fieldName);
834 +            error.initCause(e);
835 +            throw error;
836 +        }
837      }
838  
839 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
840 +    static final long headOffset =
841 +        fieldOffset("head", LinkedTransferQueue.class);
842 +    static final long tailOffset =
843 +        fieldOffset("tail", LinkedTransferQueue.class);
844 +    static final long cleanMeOffset =
845 +        fieldOffset("cleanMe", LinkedTransferQueue.class);
846 +
847   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines