ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.14 by jsr166, Thu Mar 19 05:10:42 2009 UTC vs.
Revision 1.41 by jsr166, Sat Aug 1 20:44:05 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
14 < import sun.misc.Unsafe;
15 < import java.lang.reflect.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.ConcurrentModificationException;
14 > import java.util.Iterator;
15 > import java.util.NoSuchElementException;
16 > import java.util.Queue;
17 > import java.util.concurrent.locks.LockSupport;
18 > import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 44 | Line 48 | import java.lang.reflect.*;
48   * @since 1.7
49   * @author Doug Lea
50   * @param <E> the type of elements held in this collection
47 *
51   */
52   public class LinkedTransferQueue<E> extends AbstractQueue<E>
53      implements TransferQueue<E>, java.io.Serializable {
# Line 81 | Line 84 | public class LinkedTransferQueue<E> exte
84       * seems not to vary with number of CPUs (beyond 2) so is just
85       * a constant.
86       */
87 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
87 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
88  
89      /**
90       * The number of times to spin before blocking in untimed waits.
# Line 103 | Line 106 | public class LinkedTransferQueue<E> exte
106       * garbage retention. Similarly, setting the next field to this is
107       * used as sentinel that node is off list.
108       */
109 <    static final class QNode extends AtomicReference<Object> {
110 <        volatile QNode next;
109 >    static final class Node<E> extends AtomicReference<Object> {
110 >        volatile Node<E> next;
111          volatile Thread waiter;       // to control park/unpark
112          final boolean isData;
113 <        QNode(Object item, boolean isData) {
113 >
114 >        Node(E item, boolean isData) {
115              super(item);
116              this.isData = isData;
117          }
118  
119 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
116 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117 <            (QNode.class, QNode.class, "next");
119 >        // Unsafe mechanics
120  
121 <        boolean casNext(QNode cmp, QNode val) {
122 <            return nextUpdater.compareAndSet(this, cmp, val);
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124 >
125 >        final boolean casNext(Node<E> cmp, Node<E> val) {
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127 >        }
128 >
129 >        final void clearNext() {
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131          }
132 +
133 +        /**
134 +         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 +         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 +         * into a jdk.
137 +         *
138 +         * @return a sun.misc.Unsafe
139 +         */
140 +        private static sun.misc.Unsafe getUnsafe() {
141 +            try {
142 +                return sun.misc.Unsafe.getUnsafe();
143 +            } catch (SecurityException se) {
144 +                try {
145 +                    return java.security.AccessController.doPrivileged
146 +                        (new java.security
147 +                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 +                            public sun.misc.Unsafe run() throws Exception {
149 +                                java.lang.reflect.Field f = sun.misc
150 +                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 +                                f.setAccessible(true);
152 +                                return (sun.misc.Unsafe) f.get(null);
153 +                            }});
154 +                } catch (java.security.PrivilegedActionException e) {
155 +                    throw new RuntimeException("Could not initialize intrinsics",
156 +                                               e.getCause());
157 +                }
158 +            }
159 +        }
160 +
161 +        private static final long serialVersionUID = -3375979862319811754L;
162      }
163  
164      /**
# Line 130 | Line 170 | public class LinkedTransferQueue<E> exte
170          // enough padding for 64bytes with 4byte refs
171          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
172          PaddedAtomicReference(T r) { super(r); }
173 +        private static final long serialVersionUID = 8170090609809740854L;
174      }
175  
176  
177      /** head of the queue */
178 <    private transient final PaddedAtomicReference<QNode> head;
178 >    private transient final PaddedAtomicReference<Node<E>> head;
179 >
180      /** tail of the queue */
181 <    private transient final PaddedAtomicReference<QNode> tail;
181 >    private transient final PaddedAtomicReference<Node<E>> tail;
182  
183      /**
184       * Reference to a cancelled node that might not yet have been
185       * unlinked from queue because it was the last inserted node
186       * when it cancelled.
187       */
188 <    private transient final PaddedAtomicReference<QNode> cleanMe;
188 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
189  
190      /**
191       * Tries to cas nh as new head; if successful, unlink
192       * old head's next node to avoid garbage retention.
193       */
194 <    private boolean advanceHead(QNode h, QNode nh) {
194 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
195          if (h == head.get() && head.compareAndSet(h, nh)) {
196 <            h.next = h; // forget old next
196 >            h.clearNext(); // forget old next
197              return true;
198          }
199          return false;
# Line 161 | Line 203 | public class LinkedTransferQueue<E> exte
203       * Puts or takes an item. Used for most queue operations (except
204       * poll() and tryTransfer()). See the similar code in
205       * SynchronousQueue for detailed explanation.
206 +     *
207       * @param e the item or if null, signifies that this is a take
208       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
209       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
210       * @return an item, or null on failure
211       */
212 <    private Object xfer(Object e, int mode, long nanos) {
212 >    private E xfer(E e, int mode, long nanos) {
213          boolean isData = (e != null);
214 <        QNode s = null;
215 <        final PaddedAtomicReference<QNode> head = this.head;
216 <        final PaddedAtomicReference<QNode> tail = this.tail;
214 >        Node<E> s = null;
215 >        final PaddedAtomicReference<Node<E>> head = this.head;
216 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
217  
218          for (;;) {
219 <            QNode t = tail.get();
220 <            QNode h = head.get();
219 >            Node<E> t = tail.get();
220 >            Node<E> h = head.get();
221  
222              if (t != null && (t == h || t.isData == isData)) {
223                  if (s == null)
224 <                    s = new QNode(e, isData);
225 <                QNode last = t.next;
224 >                    s = new Node<E>(e, isData);
225 >                Node<E> last = t.next;
226                  if (last != null) {
227                      if (t == tail.get())
228                          tail.compareAndSet(t, last);
# Line 191 | Line 234 | public class LinkedTransferQueue<E> exte
234              }
235  
236              else if (h != null) {
237 <                QNode first = h.next;
237 >                Node<E> first = h.next;
238                  if (t == tail.get() && first != null &&
239                      advanceHead(h, first)) {
240                      Object x = first.get();
241                      if (x != first && first.compareAndSet(x, e)) {
242                          LockSupport.unpark(first.waiter);
243 <                        return isData? e : x;
243 >                        return isData ? e : (E) x;
244                      }
245                  }
246              }
# Line 207 | Line 250 | public class LinkedTransferQueue<E> exte
250  
251      /**
252       * Version of xfer for poll() and tryTransfer, which
253 <     * simplifies control paths both here and in xfer
253 >     * simplifies control paths both here and in xfer.
254       */
255 <    private Object fulfill(Object e) {
255 >    private E fulfill(E e) {
256          boolean isData = (e != null);
257 <        final PaddedAtomicReference<QNode> head = this.head;
258 <        final PaddedAtomicReference<QNode> tail = this.tail;
257 >        final PaddedAtomicReference<Node<E>> head = this.head;
258 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
259  
260          for (;;) {
261 <            QNode t = tail.get();
262 <            QNode h = head.get();
261 >            Node<E> t = tail.get();
262 >            Node<E> h = head.get();
263  
264              if (t != null && (t == h || t.isData == isData)) {
265 <                QNode last = t.next;
265 >                Node<E> last = t.next;
266                  if (t == tail.get()) {
267                      if (last != null)
268                          tail.compareAndSet(t, last);
# Line 228 | Line 271 | public class LinkedTransferQueue<E> exte
271                  }
272              }
273              else if (h != null) {
274 <                QNode first = h.next;
274 >                Node<E> first = h.next;
275                  if (t == tail.get() &&
276                      first != null &&
277                      advanceHead(h, first)) {
278                      Object x = first.get();
279                      if (x != first && first.compareAndSet(x, e)) {
280                          LockSupport.unpark(first.waiter);
281 <                        return isData? e : x;
281 >                        return isData ? e : (E) x;
282                      }
283                  }
284              }
# Line 251 | Line 294 | public class LinkedTransferQueue<E> exte
294       * @param e the comparison value for checking match
295       * @param mode mode
296       * @param nanos timeout value
297 <     * @return matched item, or s if cancelled
297 >     * @return matched item, or null if cancelled
298       */
299 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
300 <                                int mode, long nanos) {
299 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
300 >                           int mode, long nanos) {
301          if (mode == NOWAIT)
302              return null;
303  
304 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
304 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
305          Thread w = Thread.currentThread();
306          int spins = -1; // set to desired spin count below
307          for (;;) {
# Line 267 | Line 310 | public class LinkedTransferQueue<E> exte
310              Object x = s.get();
311              if (x != e) {                 // Node was matched or cancelled
312                  advanceHead(pred, s);     // unlink if head
313 <                if (x == s) {              // was cancelled
313 >                if (x == s) {             // was cancelled
314                      clean(pred, s);
315                      return null;
316                  }
317                  else if (x != null) {
318                      s.set(s);             // avoid garbage retention
319 <                    return x;
319 >                    return (E) x;
320                  }
321                  else
322                      return e;
# Line 288 | Line 331 | public class LinkedTransferQueue<E> exte
331                  }
332              }
333              if (spins < 0) {
334 <                QNode h = head.get(); // only spin if at head
334 >                Node<E> h = head.get(); // only spin if at head
335                  spins = ((h != null && h.next == s) ?
336 <                         (mode == TIMEOUT?
336 >                         ((mode == TIMEOUT) ?
337                            maxTimedSpins : maxUntimedSpins) : 0);
338              }
339              if (spins > 0)
# Line 311 | Line 354 | public class LinkedTransferQueue<E> exte
354      }
355  
356      /**
357 <     * Returns validated tail for use in cleaning methods
357 >     * Returns validated tail for use in cleaning methods.
358       */
359 <    private QNode getValidatedTail() {
359 >    private Node<E> getValidatedTail() {
360          for (;;) {
361 <            QNode h = head.get();
362 <            QNode first = h.next;
363 <            if (first != null && first.next == first) { // help advance
361 >            Node<E> h = head.get();
362 >            Node<E> first = h.next;
363 >            if (first != null && first.get() == first) { // help advance
364                  advanceHead(h, first);
365                  continue;
366              }
367 <            QNode t = tail.get();
368 <            QNode last = t.next;
367 >            Node<E> t = tail.get();
368 >            Node<E> last = t.next;
369              if (t == tail.get()) {
370                  if (last != null)
371                      tail.compareAndSet(t, last); // help advance
# Line 334 | Line 377 | public class LinkedTransferQueue<E> exte
377  
378      /**
379       * Gets rid of cancelled node s with original predecessor pred.
380 +     *
381       * @param pred predecessor of cancelled node
382       * @param s the cancelled node
383       */
384 <    private void clean(QNode pred, QNode s) {
384 >    private void clean(Node<E> pred, Node<E> s) {
385          Thread w = s.waiter;
386          if (w != null) {             // Wake up thread
387              s.waiter = null;
388              if (w != Thread.currentThread())
389                  LockSupport.unpark(w);
390          }
391 +
392 +        if (pred == null)
393 +            return;
394 +
395          /*
396           * At any given time, exactly one node on list cannot be
397           * deleted -- the last inserted node. To accommodate this, if
# Line 353 | Line 401 | public class LinkedTransferQueue<E> exte
401           * processed, so this always terminates.
402           */
403          while (pred.next == s) {
404 <            QNode oldpred = reclean();  // First, help get rid of cleanMe
405 <            QNode t = getValidatedTail();
404 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
405 >            Node<E> t = getValidatedTail();
406              if (s != t) {               // If not tail, try to unsplice
407 <                QNode sn = s.next;      // s.next == s means s already off list
407 >                Node<E> sn = s.next;      // s.next == s means s already off list
408                  if (sn == s || pred.casNext(s, sn))
409                      break;
410              }
# Line 369 | Line 417 | public class LinkedTransferQueue<E> exte
417      /**
418       * Tries to unsplice the cancelled node held in cleanMe that was
419       * previously uncleanable because it was at tail.
420 +     *
421       * @return current cleanMe node (or null)
422       */
423 <    private QNode reclean() {
423 >    private Node<E> reclean() {
424          /*
425           * cleanMe is, or at one time was, predecessor of cancelled
426           * node s that was the tail so could not be unspliced.  If s
# Line 383 | Line 432 | public class LinkedTransferQueue<E> exte
432           * This can loop only due to contention on casNext or
433           * clearing cleanMe.
434           */
435 <        QNode pred;
435 >        Node<E> pred;
436          while ((pred = cleanMe.get()) != null) {
437 <            QNode t = getValidatedTail();
438 <            QNode s = pred.next;
437 >            Node<E> t = getValidatedTail();
438 >            Node<E> s = pred.next;
439              if (s != t) {
440 <                QNode sn;
440 >                Node<E> sn;
441                  if (s == null || s == pred || s.get() != s ||
442                      (sn = s.next) == s || pred.casNext(s, sn))
443                      cleanMe.compareAndSet(pred, null);
# Line 403 | Line 452 | public class LinkedTransferQueue<E> exte
452       * Creates an initially empty {@code LinkedTransferQueue}.
453       */
454      public LinkedTransferQueue() {
455 <        QNode dummy = new QNode(null, false);
456 <        head = new PaddedAtomicReference<QNode>(dummy);
457 <        tail = new PaddedAtomicReference<QNode>(dummy);
458 <        cleanMe = new PaddedAtomicReference<QNode>(null);
455 >        Node<E> dummy = new Node<E>(null, false);
456 >        head = new PaddedAtomicReference<Node<E>>(dummy);
457 >        tail = new PaddedAtomicReference<Node<E>>(dummy);
458 >        cleanMe = new PaddedAtomicReference<Node<E>>(null);
459      }
460  
461      /**
462       * Creates a {@code LinkedTransferQueue}
463       * initially containing the elements of the given collection,
464       * added in traversal order of the collection's iterator.
465 +     *
466       * @param c the collection of elements to initially contain
467       * @throws NullPointerException if the specified collection or any
468       *         of its elements are null
# Line 422 | Line 472 | public class LinkedTransferQueue<E> exte
472          addAll(c);
473      }
474  
475 <    public void put(E e) throws InterruptedException {
476 <        if (e == null) throw new NullPointerException();
477 <        if (Thread.interrupted()) throw new InterruptedException();
478 <        xfer(e, NOWAIT, 0);
475 >    /**
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480 >     */
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485 <    public boolean offer(E e, long timeout, TimeUnit unit)
486 <        throws InterruptedException {
487 <        if (e == null) throw new NullPointerException();
488 <        if (Thread.interrupted()) throw new InterruptedException();
489 <        xfer(e, NOWAIT, 0);
490 <        return true;
485 >    /**
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493 >     */
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498 +    /**
499 +     * Inserts the specified element at the tail of this queue.
500 +     * As the queue is unbounded, this method will never return {@code false}.
501 +     *
502 +     * @return {@code true} (as specified by
503 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 +     * @throws NullPointerException if the specified element is null
505 +     */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
508          xfer(e, NOWAIT, 0);
509          return true;
510      }
511  
512 +    /**
513 +     * Inserts the specified element at the tail of this queue.
514 +     * As the queue is unbounded, this method will never throw
515 +     * {@link IllegalStateException} or return {@code false}.
516 +     *
517 +     * @return {@code true} (as specified by {@link Collection#add})
518 +     * @throws NullPointerException if the specified element is null
519 +     */
520 +    public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the element to a waiting consumer immediately, if possible.
526 +     *
527 +     * <p>More precisely, transfers the specified element immediately
528 +     * if there exists a consumer already waiting to receive it (in
529 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
530 +     * otherwise returning {@code false} without enqueuing the element.
531 +     *
532 +     * @throws NullPointerException if the specified element is null
533 +     */
534 +    public boolean tryTransfer(E e) {
535 +        if (e == null) throw new NullPointerException();
536 +        return fulfill(e) != null;
537 +    }
538 +
539 +    /**
540 +     * Transfers the element to a consumer, waiting if necessary to do so.
541 +     *
542 +     * <p>More precisely, transfers the specified element immediately
543 +     * if there exists a consumer already waiting to receive it (in
544 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
545 +     * else inserts the specified element at the tail of this queue
546 +     * and waits until the element is received by a consumer.
547 +     *
548 +     * @throws NullPointerException if the specified element is null
549 +     */
550      public void transfer(E e) throws InterruptedException {
551          if (e == null) throw new NullPointerException();
552          if (xfer(e, WAIT, 0) == null) {
# Line 450 | Line 555 | public class LinkedTransferQueue<E> exte
555          }
556      }
557  
558 +    /**
559 +     * Transfers the element to a consumer if it is possible to do so
560 +     * before the timeout elapses.
561 +     *
562 +     * <p>More precisely, transfers the specified element immediately
563 +     * if there exists a consumer already waiting to receive it (in
564 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
565 +     * else inserts the specified element at the tail of this queue
566 +     * and waits until the element is received by a consumer,
567 +     * returning {@code false} if the specified wait time elapses
568 +     * before the element can be transferred.
569 +     *
570 +     * @throws NullPointerException if the specified element is null
571 +     */
572      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
573          throws InterruptedException {
574          if (e == null) throw new NullPointerException();
# Line 460 | Line 579 | public class LinkedTransferQueue<E> exte
579          throw new InterruptedException();
580      }
581  
463    public boolean tryTransfer(E e) {
464        if (e == null) throw new NullPointerException();
465        return fulfill(e) != null;
466    }
467
582      public E take() throws InterruptedException {
583 <        Object e = xfer(null, WAIT, 0);
583 >        E e = xfer(null, WAIT, 0);
584          if (e != null)
585 <            return (E)e;
585 >            return e;
586          Thread.interrupted();
587          throw new InterruptedException();
588      }
589  
590      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
591 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
591 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
592          if (e != null || !Thread.interrupted())
593 <            return (E)e;
593 >            return e;
594          throw new InterruptedException();
595      }
596  
597      public E poll() {
598 <        return (E)fulfill(null);
598 >        return fulfill(null);
599      }
600  
601 +    /**
602 +     * @throws NullPointerException     {@inheritDoc}
603 +     * @throws IllegalArgumentException {@inheritDoc}
604 +     */
605      public int drainTo(Collection<? super E> c) {
606          if (c == null)
607              throw new NullPointerException();
# Line 498 | Line 616 | public class LinkedTransferQueue<E> exte
616          return n;
617      }
618  
619 +    /**
620 +     * @throws NullPointerException     {@inheritDoc}
621 +     * @throws IllegalArgumentException {@inheritDoc}
622 +     */
623      public int drainTo(Collection<? super E> c, int maxElements) {
624          if (c == null)
625              throw new NullPointerException();
# Line 515 | Line 637 | public class LinkedTransferQueue<E> exte
637      // Traversal-based methods
638  
639      /**
640 <     * Return head after performing any outstanding helping steps
640 >     * Returns head after performing any outstanding helping steps.
641       */
642 <    private QNode traversalHead() {
642 >    private Node<E> traversalHead() {
643          for (;;) {
644 <            QNode t = tail.get();
645 <            QNode h = head.get();
644 >            Node<E> t = tail.get();
645 >            Node<E> h = head.get();
646              if (h != null && t != null) {
647 <                QNode last = t.next;
648 <                QNode first = h.next;
647 >                Node<E> last = t.next;
648 >                Node<E> first = h.next;
649                  if (t == tail.get()) {
650                      if (last != null)
651                          tail.compareAndSet(t, last);
# Line 538 | Line 660 | public class LinkedTransferQueue<E> exte
660                          return h;
661                  }
662              }
663 +            reclean();
664          }
665      }
666  
667 <
667 >    /**
668 >     * Returns an iterator over the elements in this queue in proper
669 >     * sequence, from head to tail.
670 >     *
671 >     * <p>The returned iterator is a "weakly consistent" iterator that
672 >     * will never throw
673 >     * {@link ConcurrentModificationException ConcurrentModificationException},
674 >     * and guarantees to traverse elements as they existed upon
675 >     * construction of the iterator, and may (but is not guaranteed
676 >     * to) reflect any modifications subsequent to construction.
677 >     *
678 >     * @return an iterator over the elements in this queue in proper sequence
679 >     */
680      public Iterator<E> iterator() {
681          return new Itr();
682      }
# Line 554 | Line 689 | public class LinkedTransferQueue<E> exte
689       * if subsequently removed.
690       */
691      class Itr implements Iterator<E> {
692 <        QNode nextNode;    // Next node to return next
693 <        QNode currentNode; // last returned node, for remove()
694 <        QNode prevNode;    // predecessor of last returned node
695 <        E nextItem;        // Cache of next item, once commited to in next
692 >        Node<E> next;        // node to return next
693 >        Node<E> pnext;       // predecessor of next
694 >        Node<E> curr;        // last returned node, for remove()
695 >        Node<E> pcurr;       // predecessor of curr, for remove()
696 >        E nextItem;          // Cache of next item, once committed to in next
697  
698          Itr() {
563            nextNode = traversalHead();
699              advance();
700          }
701  
702 <        E advance() {
703 <            prevNode = currentNode;
704 <            currentNode = nextNode;
705 <            E x = nextItem;
702 >        /**
703 >         * Moves to next valid node and returns item to return for
704 >         * next(), or null if no such.
705 >         */
706 >        private E advance() {
707 >            pcurr = pnext;
708 >            curr = next;
709 >            E item = nextItem;
710  
572            QNode p = nextNode.next;
711              for (;;) {
712 <                if (p == null || !p.isData) {
713 <                    nextNode = null;
714 <                    nextItem = null;
715 <                    return x;
716 <                }
717 <                Object item = p.get();
718 <                if (item != p && item != null) {
719 <                    nextNode = p;
720 <                    nextItem = (E)item;
721 <                    return x;
712 >                pnext = (next == null) ? traversalHead() : next;
713 >                next = pnext.next;
714 >                if (next == pnext) {
715 >                    next = null;
716 >                    continue;  // restart
717 >                }
718 >                if (next == null)
719 >                    break;
720 >                Object x = next.get();
721 >                if (x != null && x != next) {
722 >                    nextItem = (E) x;
723 >                    break;
724                  }
585                prevNode = p;
586                p = p.next;
725              }
726 +            return item;
727          }
728  
729          public boolean hasNext() {
730 <            return nextNode != null;
730 >            return next != null;
731          }
732  
733          public E next() {
734 <            if (nextNode == null) throw new NoSuchElementException();
734 >            if (next == null)
735 >                throw new NoSuchElementException();
736              return advance();
737          }
738  
739          public void remove() {
740 <            QNode p = currentNode;
741 <            QNode prev = prevNode;
602 <            if (prev == null || p == null)
740 >            Node<E> p = curr;
741 >            if (p == null)
742                  throw new IllegalStateException();
743              Object x = p.get();
744              if (x != null && x != p && p.compareAndSet(x, p))
745 <                clean(prev, p);
745 >                clean(pcurr, p);
746          }
747      }
748  
749      public E peek() {
750          for (;;) {
751 <            QNode h = traversalHead();
752 <            QNode p = h.next;
751 >            Node<E> h = traversalHead();
752 >            Node<E> p = h.next;
753              if (p == null)
754                  return null;
755              Object x = p.get();
# Line 618 | Line 757 | public class LinkedTransferQueue<E> exte
757                  if (!p.isData)
758                      return null;
759                  if (x != null)
760 <                    return (E)x;
760 >                    return (E) x;
761              }
762          }
763      }
764  
765 +    /**
766 +     * Returns {@code true} if this queue contains no elements.
767 +     *
768 +     * @return {@code true} if this queue contains no elements
769 +     */
770      public boolean isEmpty() {
771          for (;;) {
772 <            QNode h = traversalHead();
773 <            QNode p = h.next;
772 >            Node<E> h = traversalHead();
773 >            Node<E> p = h.next;
774              if (p == null)
775                  return true;
776              Object x = p.get();
# Line 641 | Line 785 | public class LinkedTransferQueue<E> exte
785  
786      public boolean hasWaitingConsumer() {
787          for (;;) {
788 <            QNode h = traversalHead();
789 <            QNode p = h.next;
788 >            Node<E> h = traversalHead();
789 >            Node<E> p = h.next;
790              if (p == null)
791                  return false;
792              Object x = p.get();
# Line 664 | Line 808 | public class LinkedTransferQueue<E> exte
808       * @return the number of elements in this queue
809       */
810      public int size() {
811 <        int count = 0;
812 <        QNode h = traversalHead();
813 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
814 <            Object x = p.get();
815 <            if (x != null && x != p) {
816 <                if (++count == Integer.MAX_VALUE) // saturated
811 >        for (;;) {
812 >            int count = 0;
813 >            Node<E> pred = traversalHead();
814 >            for (;;) {
815 >                Node<E> q = pred.next;
816 >                if (q == pred) // restart
817                      break;
818 +                if (q == null || !q.isData)
819 +                    return count;
820 +                Object x = q.get();
821 +                if (x != null && x != q) {
822 +                    if (++count == Integer.MAX_VALUE) // saturated
823 +                        return count;
824 +                }
825 +                pred = q;
826              }
827          }
676        return count;
828      }
829  
830      public int getWaitingConsumerCount() {
831 <        int count = 0;
832 <        QNode h = traversalHead();
833 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
834 <            if (p.get() == null) {
835 <                if (++count == Integer.MAX_VALUE)
831 >        // converse of size -- count valid non-data nodes
832 >        for (;;) {
833 >            int count = 0;
834 >            Node<E> pred = traversalHead();
835 >            for (;;) {
836 >                Node<E> q = pred.next;
837 >                if (q == pred) // restart
838                      break;
839 +                if (q == null || q.isData)
840 +                    return count;
841 +                Object x = q.get();
842 +                if (x == null) {
843 +                    if (++count == Integer.MAX_VALUE) // saturated
844 +                        return count;
845 +                }
846 +                pred = q;
847              }
848          }
688        return count;
849      }
850  
851 +    public boolean remove(Object o) {
852 +        if (o == null)
853 +            return false;
854 +        for (;;) {
855 +            Node<E> pred = traversalHead();
856 +            for (;;) {
857 +                Node<E> q = pred.next;
858 +                if (q == pred) // restart
859 +                    break;
860 +                if (q == null || !q.isData)
861 +                    return false;
862 +                Object x = q.get();
863 +                if (x != null && x != q && o.equals(x) &&
864 +                    q.compareAndSet(x, q)) {
865 +                    clean(pred, q);
866 +                    return true;
867 +                }
868 +                pred = q;
869 +            }
870 +        }
871 +    }
872 +
873 +    /**
874 +     * Always returns {@code Integer.MAX_VALUE} because a
875 +     * {@code LinkedTransferQueue} is not capacity constrained.
876 +     *
877 +     * @return {@code Integer.MAX_VALUE} (as specified by
878 +     *         {@link BlockingQueue#remainingCapacity()})
879 +     */
880      public int remainingCapacity() {
881          return Integer.MAX_VALUE;
882      }
# Line 702 | Line 891 | public class LinkedTransferQueue<E> exte
891      private void writeObject(java.io.ObjectOutputStream s)
892          throws java.io.IOException {
893          s.defaultWriteObject();
894 <        for (Iterator<E> it = iterator(); it.hasNext(); )
895 <            s.writeObject(it.next());
894 >        for (E e : this)
895 >            s.writeObject(e);
896          // Use trailing null as sentinel
897          s.writeObject(null);
898      }
# Line 711 | Line 900 | public class LinkedTransferQueue<E> exte
900      /**
901       * Reconstitute the Queue instance from a stream (that is,
902       * deserialize it).
903 +     *
904       * @param s the stream
905       */
906      private void readObject(java.io.ObjectInputStream s)
# Line 718 | Line 908 | public class LinkedTransferQueue<E> exte
908          s.defaultReadObject();
909          resetHeadAndTail();
910          for (;;) {
911 <            E item = (E)s.readObject();
911 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
912              if (item == null)
913                  break;
914              else
# Line 726 | Line 916 | public class LinkedTransferQueue<E> exte
916          }
917      }
918  
729
919      // Support for resetting head/tail while deserializing
920      private void resetHeadAndTail() {
921 <        QNode dummy = new QNode(null, false);
922 <        _unsafe.putObjectVolatile(this, headOffset,
923 <                                  new PaddedAtomicReference<QNode>(dummy));
924 <        _unsafe.putObjectVolatile(this, tailOffset,
925 <                                  new PaddedAtomicReference<QNode>(dummy));
926 <        _unsafe.putObjectVolatile(this, cleanMeOffset,
927 <                                  new PaddedAtomicReference<QNode>(null));
921 >        Node<E> dummy = new Node<E>(null, false);
922 >        UNSAFE.putObjectVolatile(this, headOffset,
923 >                                 new PaddedAtomicReference<Node<E>>(dummy));
924 >        UNSAFE.putObjectVolatile(this, tailOffset,
925 >                                 new PaddedAtomicReference<Node<E>>(dummy));
926 >        UNSAFE.putObjectVolatile(this, cleanMeOffset,
927 >                                 new PaddedAtomicReference<Node<E>>(null));
928 >    }
929 >
930 >    // Unsafe mechanics
931 >
932 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
933 >    private static final long headOffset =
934 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
935 >    private static final long tailOffset =
936 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
937 >    private static final long cleanMeOffset =
938 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
939 >
940 >
941 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
942 >                                  String field, Class<?> klazz) {
943 >        try {
944 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
945 >        } catch (NoSuchFieldException e) {
946 >            // Convert Exception to corresponding Error
947 >            NoSuchFieldError error = new NoSuchFieldError(field);
948 >            error.initCause(e);
949 >            throw error;
950 >        }
951      }
952  
953 <    // Temporary Unsafe mechanics for preliminary release
954 <    private static Unsafe getUnsafe() throws Throwable {
953 >    /**
954 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
955 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
956 >     * into a jdk.
957 >     *
958 >     * @return a sun.misc.Unsafe
959 >     */
960 >    private static sun.misc.Unsafe getUnsafe() {
961          try {
962 <            return Unsafe.getUnsafe();
962 >            return sun.misc.Unsafe.getUnsafe();
963          } catch (SecurityException se) {
964              try {
965                  return java.security.AccessController.doPrivileged
966 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
967 <                        public Unsafe run() throws Exception {
968 <                            return getUnsafePrivileged();
966 >                    (new java.security
967 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
968 >                        public sun.misc.Unsafe run() throws Exception {
969 >                            java.lang.reflect.Field f = sun.misc
970 >                                .Unsafe.class.getDeclaredField("theUnsafe");
971 >                            f.setAccessible(true);
972 >                            return (sun.misc.Unsafe) f.get(null);
973                          }});
974              } catch (java.security.PrivilegedActionException e) {
975 <                throw e.getCause();
975 >                throw new RuntimeException("Could not initialize intrinsics",
976 >                                           e.getCause());
977              }
978          }
979      }
757
758    private static Unsafe getUnsafePrivileged()
759            throws NoSuchFieldException, IllegalAccessException {
760        Field f = Unsafe.class.getDeclaredField("theUnsafe");
761        f.setAccessible(true);
762        return (Unsafe) f.get(null);
763    }
764
765    private static long fieldOffset(String fieldName)
766            throws NoSuchFieldException {
767        return _unsafe.objectFieldOffset
768            (LinkedTransferQueue.class.getDeclaredField(fieldName));
769    }
770
771    private static final Unsafe _unsafe;
772    private static final long headOffset;
773    private static final long tailOffset;
774    private static final long cleanMeOffset;
775    static {
776        try {
777            _unsafe = getUnsafe();
778            headOffset = fieldOffset("head");
779            tailOffset = fieldOffset("tail");
780            cleanMeOffset = fieldOffset("cleanMe");
781        } catch (Throwable e) {
782            throw new RuntimeException("Could not initialize intrinsics", e);
783        }
784    }
785
980   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines