ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.24 by jsr166, Thu Jul 23 23:23:41 2009 UTC vs.
Revision 1.35 by jsr166, Thu Jul 30 22:45:39 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
14 < import sun.misc.Unsafe;
15 < import java.lang.reflect.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.ConcurrentModificationException;
14 > import java.util.Iterator;
15 > import java.util.NoSuchElementException;
16 > import java.util.Queue;
17 > import java.util.concurrent.locks.LockSupport;
18 > import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 102 | Line 106 | public class LinkedTransferQueue<E> exte
106       * garbage retention. Similarly, setting the next field to this is
107       * used as sentinel that node is off list.
108       */
109 <    static final class QNode extends AtomicReference<Object> {
110 <        volatile QNode next;
109 >    static final class Node<E> extends AtomicReference<Object> {
110 >        volatile Node<E> next;
111          volatile Thread waiter;       // to control park/unpark
112          final boolean isData;
113 <        QNode(Object item, boolean isData) {
113 >
114 >        Node(E item, boolean isData) {
115              super(item);
116              this.isData = isData;
117          }
118  
119 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
120 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
121 <            (QNode.class, QNode.class, "next");
119 >        // Unsafe mechanics
120 >
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124  
125 <        final boolean casNext(QNode cmp, QNode val) {
126 <            return nextUpdater.compareAndSet(this, cmp, val);
125 >        final boolean casNext(Node<E> cmp, Node<E> val) {
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127          }
128  
129          final void clearNext() {
130 <            nextUpdater.lazySet(this, this);
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160  
161          private static final long serialVersionUID = -3375979862319811754L;
# Line 140 | Line 175 | public class LinkedTransferQueue<E> exte
175  
176  
177      /** head of the queue */
178 <    private transient final PaddedAtomicReference<QNode> head;
178 >    private transient final PaddedAtomicReference<Node<E>> head;
179  
180      /** tail of the queue */
181 <    private transient final PaddedAtomicReference<QNode> tail;
181 >    private transient final PaddedAtomicReference<Node<E>> tail;
182  
183      /**
184       * Reference to a cancelled node that might not yet have been
185       * unlinked from queue because it was the last inserted node
186       * when it cancelled.
187       */
188 <    private transient final PaddedAtomicReference<QNode> cleanMe;
188 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
189  
190      /**
191       * Tries to cas nh as new head; if successful, unlink
192       * old head's next node to avoid garbage retention.
193       */
194 <    private boolean advanceHead(QNode h, QNode nh) {
194 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
195          if (h == head.get() && head.compareAndSet(h, nh)) {
196              h.clearNext(); // forget old next
197              return true;
# Line 174 | Line 209 | public class LinkedTransferQueue<E> exte
209       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
210       * @return an item, or null on failure
211       */
212 <    private Object xfer(Object e, int mode, long nanos) {
212 >    private E xfer(E e, int mode, long nanos) {
213          boolean isData = (e != null);
214 <        QNode s = null;
215 <        final PaddedAtomicReference<QNode> head = this.head;
216 <        final PaddedAtomicReference<QNode> tail = this.tail;
214 >        Node<E> s = null;
215 >        final PaddedAtomicReference<Node<E>> head = this.head;
216 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
217  
218          for (;;) {
219 <            QNode t = tail.get();
220 <            QNode h = head.get();
219 >            Node<E> t = tail.get();
220 >            Node<E> h = head.get();
221  
222              if (t != null && (t == h || t.isData == isData)) {
223                  if (s == null)
224 <                    s = new QNode(e, isData);
225 <                QNode last = t.next;
224 >                    s = new Node<E>(e, isData);
225 >                Node<E> last = t.next;
226                  if (last != null) {
227                      if (t == tail.get())
228                          tail.compareAndSet(t, last);
# Line 199 | Line 234 | public class LinkedTransferQueue<E> exte
234              }
235  
236              else if (h != null) {
237 <                QNode first = h.next;
237 >                Node<E> first = h.next;
238                  if (t == tail.get() && first != null &&
239                      advanceHead(h, first)) {
240                      Object x = first.get();
241                      if (x != first && first.compareAndSet(x, e)) {
242                          LockSupport.unpark(first.waiter);
243 <                        return isData ? e : x;
243 >                        return isData ? e : (E) x;
244                      }
245                  }
246              }
# Line 217 | Line 252 | public class LinkedTransferQueue<E> exte
252       * Version of xfer for poll() and tryTransfer, which
253       * simplifies control paths both here and in xfer.
254       */
255 <    private Object fulfill(Object e) {
255 >    private E fulfill(E e) {
256          boolean isData = (e != null);
257 <        final PaddedAtomicReference<QNode> head = this.head;
258 <        final PaddedAtomicReference<QNode> tail = this.tail;
257 >        final PaddedAtomicReference<Node<E>> head = this.head;
258 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
259  
260          for (;;) {
261 <            QNode t = tail.get();
262 <            QNode h = head.get();
261 >            Node<E> t = tail.get();
262 >            Node<E> h = head.get();
263  
264              if (t != null && (t == h || t.isData == isData)) {
265 <                QNode last = t.next;
265 >                Node<E> last = t.next;
266                  if (t == tail.get()) {
267                      if (last != null)
268                          tail.compareAndSet(t, last);
# Line 236 | Line 271 | public class LinkedTransferQueue<E> exte
271                  }
272              }
273              else if (h != null) {
274 <                QNode first = h.next;
274 >                Node<E> first = h.next;
275                  if (t == tail.get() &&
276                      first != null &&
277                      advanceHead(h, first)) {
278                      Object x = first.get();
279                      if (x != first && first.compareAndSet(x, e)) {
280                          LockSupport.unpark(first.waiter);
281 <                        return isData ? e : x;
281 >                        return isData ? e : (E) x;
282                      }
283                  }
284              }
# Line 261 | Line 296 | public class LinkedTransferQueue<E> exte
296       * @param nanos timeout value
297       * @return matched item, or s if cancelled
298       */
299 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
300 <                                int mode, long nanos) {
299 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
300 >                           int mode, long nanos) {
301          if (mode == NOWAIT)
302              return null;
303  
# Line 281 | Line 316 | public class LinkedTransferQueue<E> exte
316                  }
317                  else if (x != null) {
318                      s.set(s);             // avoid garbage retention
319 <                    return x;
319 >                    return (E) x;
320                  }
321                  else
322                      return e;
# Line 296 | Line 331 | public class LinkedTransferQueue<E> exte
331                  }
332              }
333              if (spins < 0) {
334 <                QNode h = head.get(); // only spin if at head
334 >                Node<E> h = head.get(); // only spin if at head
335                  spins = ((h != null && h.next == s) ?
336                           ((mode == TIMEOUT) ?
337                            maxTimedSpins : maxUntimedSpins) : 0);
# Line 321 | Line 356 | public class LinkedTransferQueue<E> exte
356      /**
357       * Returns validated tail for use in cleaning methods.
358       */
359 <    private QNode getValidatedTail() {
359 >    private Node<E> getValidatedTail() {
360          for (;;) {
361 <            QNode h = head.get();
362 <            QNode first = h.next;
361 >            Node<E> h = head.get();
362 >            Node<E> first = h.next;
363              if (first != null && first.next == first) { // help advance
364                  advanceHead(h, first);
365                  continue;
366              }
367 <            QNode t = tail.get();
368 <            QNode last = t.next;
367 >            Node<E> t = tail.get();
368 >            Node<E> last = t.next;
369              if (t == tail.get()) {
370                  if (last != null)
371                      tail.compareAndSet(t, last); // help advance
# Line 346 | Line 381 | public class LinkedTransferQueue<E> exte
381       * @param pred predecessor of cancelled node
382       * @param s the cancelled node
383       */
384 <    private void clean(QNode pred, QNode s) {
384 >    private void clean(Node<E> pred, Node<E> s) {
385          Thread w = s.waiter;
386          if (w != null) {             // Wake up thread
387              s.waiter = null;
# Line 366 | Line 401 | public class LinkedTransferQueue<E> exte
401           * processed, so this always terminates.
402           */
403          while (pred.next == s) {
404 <            QNode oldpred = reclean();  // First, help get rid of cleanMe
405 <            QNode t = getValidatedTail();
404 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
405 >            Node<E> t = getValidatedTail();
406              if (s != t) {               // If not tail, try to unsplice
407 <                QNode sn = s.next;      // s.next == s means s already off list
407 >                Node<E> sn = s.next;      // s.next == s means s already off list
408                  if (sn == s || pred.casNext(s, sn))
409                      break;
410              }
# Line 385 | Line 420 | public class LinkedTransferQueue<E> exte
420       *
421       * @return current cleanMe node (or null)
422       */
423 <    private QNode reclean() {
423 >    private Node<E> reclean() {
424          /*
425           * cleanMe is, or at one time was, predecessor of cancelled
426           * node s that was the tail so could not be unspliced.  If s
# Line 397 | Line 432 | public class LinkedTransferQueue<E> exte
432           * This can loop only due to contention on casNext or
433           * clearing cleanMe.
434           */
435 <        QNode pred;
435 >        Node<E> pred;
436          while ((pred = cleanMe.get()) != null) {
437 <            QNode t = getValidatedTail();
438 <            QNode s = pred.next;
437 >            Node<E> t = getValidatedTail();
438 >            Node<E> s = pred.next;
439              if (s != t) {
440 <                QNode sn;
440 >                Node<E> sn;
441                  if (s == null || s == pred || s.get() != s ||
442                      (sn = s.next) == s || pred.casNext(s, sn))
443                      cleanMe.compareAndSet(pred, null);
# Line 417 | Line 452 | public class LinkedTransferQueue<E> exte
452       * Creates an initially empty {@code LinkedTransferQueue}.
453       */
454      public LinkedTransferQueue() {
455 <        QNode dummy = new QNode(null, false);
456 <        head = new PaddedAtomicReference<QNode>(dummy);
457 <        tail = new PaddedAtomicReference<QNode>(dummy);
458 <        cleanMe = new PaddedAtomicReference<QNode>(null);
455 >        Node<E> dummy = new Node<E>(null, false);
456 >        head = new PaddedAtomicReference<Node<E>>(dummy);
457 >        tail = new PaddedAtomicReference<Node<E>>(dummy);
458 >        cleanMe = new PaddedAtomicReference<Node<E>>(null);
459      }
460  
461      /**
# Line 437 | Line 472 | public class LinkedTransferQueue<E> exte
472          addAll(c);
473      }
474  
475 <    public void put(E e) throws InterruptedException {
476 <        if (e == null) throw new NullPointerException();
477 <        if (Thread.interrupted()) throw new InterruptedException();
478 <        xfer(e, NOWAIT, 0);
475 >    /**
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480 >     */
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485 <    public boolean offer(E e, long timeout, TimeUnit unit)
486 <        throws InterruptedException {
487 <        if (e == null) throw new NullPointerException();
488 <        if (Thread.interrupted()) throw new InterruptedException();
489 <        xfer(e, NOWAIT, 0);
490 <        return true;
485 >    /**
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493 >     */
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498 +    /**
499 +     * Inserts the specified element at the tail of this queue.
500 +     * As the queue is unbounded, this method will never return {@code false}.
501 +     *
502 +     * @return {@code true} (as specified by
503 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 +     * @throws NullPointerException if the specified element is null
505 +     */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
508          xfer(e, NOWAIT, 0);
509          return true;
510      }
511  
512 +    /**
513 +     * Inserts the specified element at the tail of this queue.
514 +     * As the queue is unbounded this method will never throw
515 +     * {@link IllegalStateException} or return {@code false}.
516 +     *
517 +     * @return {@code true} (as specified by {@link Collection#add})
518 +     * @throws NullPointerException if the specified element is null
519 +     */
520      public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the specified element immediately if there exists a
526 +     * consumer already waiting to receive it (in {@link #take} or
527 +     * timed {@link #poll(Object,long,TimeUnit) poll}), otherwise
528 +     * returning {@code false} without enqueuing the element.
529 +     *
530 +     * @throws NullPointerException if the specified element is null
531 +     */
532 +    public boolean tryTransfer(E e) {
533          if (e == null) throw new NullPointerException();
534 <        xfer(e, NOWAIT, 0);
463 <        return true;
534 >        return fulfill(e) != null;
535      }
536  
537 +    /**
538 +     * Inserts the specified element at the tail of this queue,
539 +     * waiting if necessary for the element to be received by a
540 +     * consumer invoking {@code take} or {@code poll}.
541 +     *
542 +     * @throws InterruptedException {@inheritDoc}
543 +     * @throws NullPointerException if the specified element is null
544 +     */
545      public void transfer(E e) throws InterruptedException {
546          if (e == null) throw new NullPointerException();
547          if (xfer(e, WAIT, 0) == null) {
# Line 471 | Line 550 | public class LinkedTransferQueue<E> exte
550          }
551      }
552  
553 +    /**
554 +     * Inserts the specified element at the tail of this queue,
555 +     * waiting up to the specified wait time for the element to be
556 +     * received by a consumer invoking {@code take} or {@code poll}.
557 +     *
558 +     * @throws InterruptedException {@inheritDoc}
559 +     * @throws NullPointerException if the specified element is null
560 +     */
561      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
562          throws InterruptedException {
563          if (e == null) throw new NullPointerException();
# Line 481 | Line 568 | public class LinkedTransferQueue<E> exte
568          throw new InterruptedException();
569      }
570  
484    public boolean tryTransfer(E e) {
485        if (e == null) throw new NullPointerException();
486        return fulfill(e) != null;
487    }
488
571      public E take() throws InterruptedException {
572 <        Object e = xfer(null, WAIT, 0);
572 >        E e = xfer(null, WAIT, 0);
573          if (e != null)
574 <            return (E) e;
574 >            return e;
575          Thread.interrupted();
576          throw new InterruptedException();
577      }
578  
579 +    /**
580 +     * @throws InterruptedException {@inheritDoc}
581 +     */
582      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
583 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
583 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
584          if (e != null || !Thread.interrupted())
585 <            return (E) e;
585 >            return e;
586          throw new InterruptedException();
587      }
588  
589      public E poll() {
590 <        return (E) fulfill(null);
590 >        return fulfill(null);
591      }
592  
593 +    /**
594 +     * @throws NullPointerException     {@inheritDoc}
595 +     * @throws IllegalArgumentException {@inheritDoc}
596 +     */
597      public int drainTo(Collection<? super E> c) {
598          if (c == null)
599              throw new NullPointerException();
# Line 519 | Line 608 | public class LinkedTransferQueue<E> exte
608          return n;
609      }
610  
611 +    /**
612 +     * @throws NullPointerException     {@inheritDoc}
613 +     * @throws IllegalArgumentException {@inheritDoc}
614 +     */
615      public int drainTo(Collection<? super E> c, int maxElements) {
616          if (c == null)
617              throw new NullPointerException();
# Line 538 | Line 631 | public class LinkedTransferQueue<E> exte
631      /**
632       * Returns head after performing any outstanding helping steps.
633       */
634 <    private QNode traversalHead() {
634 >    private Node<E> traversalHead() {
635          for (;;) {
636 <            QNode t = tail.get();
637 <            QNode h = head.get();
636 >            Node<E> t = tail.get();
637 >            Node<E> h = head.get();
638              if (h != null && t != null) {
639 <                QNode last = t.next;
640 <                QNode first = h.next;
639 >                Node<E> last = t.next;
640 >                Node<E> first = h.next;
641                  if (t == tail.get()) {
642                      if (last != null)
643                          tail.compareAndSet(t, last);
# Line 563 | Line 656 | public class LinkedTransferQueue<E> exte
656          }
657      }
658  
659 <
659 >    /**
660 >     * Returns an iterator over the elements in this queue in proper
661 >     * sequence, from head to tail.
662 >     *
663 >     * <p>The returned iterator is a "weakly consistent" iterator that
664 >     * will never throw
665 >     * {@link ConcurrentModificationException ConcurrentModificationException},
666 >     * and guarantees to traverse elements as they existed upon
667 >     * construction of the iterator, and may (but is not guaranteed
668 >     * to) reflect any modifications subsequent to construction.
669 >     *
670 >     * @return an iterator over the elements in this queue in proper sequence
671 >     */
672      public Iterator<E> iterator() {
673          return new Itr();
674      }
# Line 576 | Line 681 | public class LinkedTransferQueue<E> exte
681       * if subsequently removed.
682       */
683      class Itr implements Iterator<E> {
684 <        QNode next;        // node to return next
685 <        QNode pnext;       // predecessor of next
686 <        QNode snext;       // successor of next
687 <        QNode curr;        // last returned node, for remove()
688 <        QNode pcurr;       // predecessor of curr, for remove()
584 <        E nextItem;        // Cache of next item, once committed to in next
684 >        Node<E> next;        // node to return next
685 >        Node<E> pnext;       // predecessor of next
686 >        Node<E> curr;        // last returned node, for remove()
687 >        Node<E> pcurr;       // predecessor of curr, for remove()
688 >        E nextItem;          // Cache of next item, once committed to in next
689  
690          Itr() {
691 <            findNext();
691 >            advance();
692          }
693  
694          /**
695 <         * Ensures next points to next valid node, or null if none.
695 >         * Moves to next valid node and returns item to return for
696 >         * next(), or null if no such.
697           */
698 <        void findNext() {
698 >        private E advance() {
699 >            pcurr = pnext;
700 >            curr = next;
701 >            E item = nextItem;
702 >
703              for (;;) {
704 <                QNode pred = pnext;
705 <                QNode q = next;
706 <                if (pred == null || pred == q) {
598 <                    pred = traversalHead();
599 <                    q = pred.next;
600 <                }
601 <                if (q == null || !q.isData) {
704 >                pnext = (next == null) ? traversalHead() : next;
705 >                next = pnext.next;
706 >                if (next == pnext) {
707                      next = null;
708 <                    return;
708 >                    continue;  // restart
709                  }
710 <                Object x = q.get();
711 <                QNode s = q.next;
712 <                if (x != null && q != x && q != s) {
710 >                if (next == null)
711 >                    break;
712 >                Object x = next.get();
713 >                if (x != null && x != next) {
714                      nextItem = (E) x;
715 <                    snext = s;
610 <                    pnext = pred;
611 <                    next = q;
612 <                    return;
715 >                    break;
716                  }
614                pnext = q;
615                next = s;
717              }
718 +            return item;
719          }
720  
721          public boolean hasNext() {
# Line 621 | Line 723 | public class LinkedTransferQueue<E> exte
723          }
724  
725          public E next() {
726 <            if (next == null) throw new NoSuchElementException();
727 <            pcurr = pnext;
728 <            curr = next;
627 <            pnext = next;
628 <            next = snext;
629 <            E x = nextItem;
630 <            findNext();
631 <            return x;
726 >            if (next == null)
727 >                throw new NoSuchElementException();
728 >            return advance();
729          }
730  
731          public void remove() {
732 <            QNode p = curr;
732 >            Node<E> p = curr;
733              if (p == null)
734                  throw new IllegalStateException();
735              Object x = p.get();
# Line 643 | Line 740 | public class LinkedTransferQueue<E> exte
740  
741      public E peek() {
742          for (;;) {
743 <            QNode h = traversalHead();
744 <            QNode p = h.next;
743 >            Node<E> h = traversalHead();
744 >            Node<E> p = h.next;
745              if (p == null)
746                  return null;
747              Object x = p.get();
# Line 659 | Line 756 | public class LinkedTransferQueue<E> exte
756  
757      public boolean isEmpty() {
758          for (;;) {
759 <            QNode h = traversalHead();
760 <            QNode p = h.next;
759 >            Node<E> h = traversalHead();
760 >            Node<E> p = h.next;
761              if (p == null)
762                  return true;
763              Object x = p.get();
# Line 675 | Line 772 | public class LinkedTransferQueue<E> exte
772  
773      public boolean hasWaitingConsumer() {
774          for (;;) {
775 <            QNode h = traversalHead();
776 <            QNode p = h.next;
775 >            Node<E> h = traversalHead();
776 >            Node<E> p = h.next;
777              if (p == null)
778                  return false;
779              Object x = p.get();
# Line 698 | Line 795 | public class LinkedTransferQueue<E> exte
795       * @return the number of elements in this queue
796       */
797      public int size() {
798 <        int count = 0;
799 <        QNode h = traversalHead();
800 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
801 <            Object x = p.get();
802 <            if (x != null && x != p) {
803 <                if (++count == Integer.MAX_VALUE) // saturated
798 >        for (;;) {
799 >            int count = 0;
800 >            Node<E> pred = traversalHead();
801 >            for (;;) {
802 >                Node<E> q = pred.next;
803 >                if (q == pred) // restart
804                      break;
805 +                if (q == null || !q.isData)
806 +                    return count;
807 +                Object x = q.get();
808 +                if (x != null && x != q) {
809 +                    if (++count == Integer.MAX_VALUE) // saturated
810 +                        return count;
811 +                }
812 +                pred = q;
813              }
814          }
710        return count;
815      }
816  
817      public int getWaitingConsumerCount() {
818 <        int count = 0;
819 <        QNode h = traversalHead();
820 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
821 <            if (p.get() == null) {
822 <                if (++count == Integer.MAX_VALUE)
818 >        // converse of size -- count valid non-data nodes
819 >        for (;;) {
820 >            int count = 0;
821 >            Node<E> pred = traversalHead();
822 >            for (;;) {
823 >                Node<E> q = pred.next;
824 >                if (q == pred) // restart
825                      break;
826 +                if (q == null || q.isData)
827 +                    return count;
828 +                Object x = q.get();
829 +                if (x == null) {
830 +                    if (++count == Integer.MAX_VALUE) // saturated
831 +                        return count;
832 +                }
833 +                pred = q;
834              }
835          }
722        return count;
723    }
724
725    public int remainingCapacity() {
726        return Integer.MAX_VALUE;
836      }
837  
838      public boolean remove(Object o) {
839          if (o == null)
840              return false;
841          for (;;) {
842 <            QNode pred = traversalHead();
842 >            Node<E> pred = traversalHead();
843              for (;;) {
844 <                QNode q = pred.next;
736 <                if (q == null || !q.isData)
737 <                    return false;
844 >                Node<E> q = pred.next;
845                  if (q == pred) // restart
846                      break;
847 +                if (q == null || !q.isData)
848 +                    return false;
849                  Object x = q.get();
850                  if (x != null && x != q && o.equals(x) &&
851                      q.compareAndSet(x, q)) {
# Line 749 | Line 858 | public class LinkedTransferQueue<E> exte
858      }
859  
860      /**
861 +     * Always returns {@code Integer.MAX_VALUE} because a
862 +     * {@code LinkedTransferQueue} is not capacity constrained.
863 +     *
864 +     * @return {@code Integer.MAX_VALUE} (as specified by
865 +     *         {@link BlockingQueue#remainingCapacity()})
866 +     */
867 +    public int remainingCapacity() {
868 +        return Integer.MAX_VALUE;
869 +    }
870 +
871 +    /**
872       * Save the state to a stream (that is, serialize it).
873       *
874       * @serialData All of the elements (each an {@code E}) in
# Line 775 | Line 895 | public class LinkedTransferQueue<E> exte
895          s.defaultReadObject();
896          resetHeadAndTail();
897          for (;;) {
898 <            E item = (E) s.readObject();
898 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
899              if (item == null)
900                  break;
901              else
# Line 783 | Line 903 | public class LinkedTransferQueue<E> exte
903          }
904      }
905  
786
906      // Support for resetting head/tail while deserializing
907      private void resetHeadAndTail() {
908 <        QNode dummy = new QNode(null, false);
908 >        Node<E> dummy = new Node<E>(null, false);
909          UNSAFE.putObjectVolatile(this, headOffset,
910 <                                  new PaddedAtomicReference<QNode>(dummy));
910 >                                 new PaddedAtomicReference<Node<E>>(dummy));
911          UNSAFE.putObjectVolatile(this, tailOffset,
912 <                                  new PaddedAtomicReference<QNode>(dummy));
912 >                                 new PaddedAtomicReference<Node<E>>(dummy));
913          UNSAFE.putObjectVolatile(this, cleanMeOffset,
914 <                                  new PaddedAtomicReference<QNode>(null));
914 >                                 new PaddedAtomicReference<Node<E>>(null));
915      }
916  
917 <    // Temporary Unsafe mechanics for preliminary release
918 <    private static Unsafe getUnsafe() throws Throwable {
917 >    // Unsafe mechanics
918 >
919 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
920 >    private static final long headOffset =
921 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
922 >    private static final long tailOffset =
923 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
924 >    private static final long cleanMeOffset =
925 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
926 >
927 >
928 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
929 >                                  String field, Class<?> klazz) {
930          try {
931 <            return Unsafe.getUnsafe();
931 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
932 >        } catch (NoSuchFieldException e) {
933 >            // Convert Exception to corresponding Error
934 >            NoSuchFieldError error = new NoSuchFieldError(field);
935 >            error.initCause(e);
936 >            throw error;
937 >        }
938 >    }
939 >
940 >    /**
941 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
942 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
943 >     * into a jdk.
944 >     *
945 >     * @return a sun.misc.Unsafe
946 >     */
947 >    private static sun.misc.Unsafe getUnsafe() {
948 >        try {
949 >            return sun.misc.Unsafe.getUnsafe();
950          } catch (SecurityException se) {
951              try {
952                  return java.security.AccessController.doPrivileged
953 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
954 <                        public Unsafe run() throws Exception {
955 <                            return getUnsafePrivileged();
953 >                    (new java.security
954 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
955 >                        public sun.misc.Unsafe run() throws Exception {
956 >                            java.lang.reflect.Field f = sun.misc
957 >                                .Unsafe.class.getDeclaredField("theUnsafe");
958 >                            f.setAccessible(true);
959 >                            return (sun.misc.Unsafe) f.get(null);
960                          }});
961              } catch (java.security.PrivilegedActionException e) {
962 <                throw e.getCause();
962 >                throw new RuntimeException("Could not initialize intrinsics",
963 >                                           e.getCause());
964              }
965          }
966      }
814
815    private static Unsafe getUnsafePrivileged()
816            throws NoSuchFieldException, IllegalAccessException {
817        Field f = Unsafe.class.getDeclaredField("theUnsafe");
818        f.setAccessible(true);
819        return (Unsafe) f.get(null);
820    }
821
822    private static long fieldOffset(String fieldName)
823            throws NoSuchFieldException {
824        return UNSAFE.objectFieldOffset
825            (LinkedTransferQueue.class.getDeclaredField(fieldName));
826    }
827
828    private static final Unsafe UNSAFE;
829    private static final long headOffset;
830    private static final long tailOffset;
831    private static final long cleanMeOffset;
832    static {
833        try {
834            UNSAFE = getUnsafe();
835            headOffset = fieldOffset("head");
836            tailOffset = fieldOffset("tail");
837            cleanMeOffset = fieldOffset("cleanMe");
838        } catch (Throwable e) {
839            throw new RuntimeException("Could not initialize intrinsics", e);
840        }
841    }
842
967   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines