ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.24 by jsr166, Thu Jul 23 23:23:41 2009 UTC vs.
Revision 1.25 by jsr166, Fri Jul 24 23:48:26 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.locks.*;
10   import java.util.concurrent.atomic.*;
11   import java.util.*;
12   import java.io.*;
13 import sun.misc.Unsafe;
14 import java.lang.reflect.*;
13  
14   /**
15   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 102 | Line 100 | public class LinkedTransferQueue<E> exte
100       * garbage retention. Similarly, setting the next field to this is
101       * used as sentinel that node is off list.
102       */
103 <    static final class QNode extends AtomicReference<Object> {
104 <        volatile QNode next;
103 >    static final class Node<E> extends AtomicReference<Object> {
104 >        volatile Node<E> next;
105          volatile Thread waiter;       // to control park/unpark
106          final boolean isData;
107 <        QNode(Object item, boolean isData) {
107 >
108 >        Node(E item, boolean isData) {
109              super(item);
110              this.isData = isData;
111          }
112  
113 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
113 >        @SuppressWarnings("rawtypes")
114 >        static final AtomicReferenceFieldUpdater<Node, Node>
115              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
116 <            (QNode.class, QNode.class, "next");
116 >            (Node.class, Node.class, "next");
117  
118 <        final boolean casNext(QNode cmp, QNode val) {
118 >        final boolean casNext(Node<E> cmp, Node<E> val) {
119              return nextUpdater.compareAndSet(this, cmp, val);
120          }
121  
# Line 140 | Line 140 | public class LinkedTransferQueue<E> exte
140  
141  
142      /** head of the queue */
143 <    private transient final PaddedAtomicReference<QNode> head;
143 >    private transient final PaddedAtomicReference<Node<E>> head;
144  
145      /** tail of the queue */
146 <    private transient final PaddedAtomicReference<QNode> tail;
146 >    private transient final PaddedAtomicReference<Node<E>> tail;
147  
148      /**
149       * Reference to a cancelled node that might not yet have been
150       * unlinked from queue because it was the last inserted node
151       * when it cancelled.
152       */
153 <    private transient final PaddedAtomicReference<QNode> cleanMe;
153 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
154  
155      /**
156       * Tries to cas nh as new head; if successful, unlink
157       * old head's next node to avoid garbage retention.
158       */
159 <    private boolean advanceHead(QNode h, QNode nh) {
159 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
160          if (h == head.get() && head.compareAndSet(h, nh)) {
161              h.clearNext(); // forget old next
162              return true;
# Line 174 | Line 174 | public class LinkedTransferQueue<E> exte
174       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
175       * @return an item, or null on failure
176       */
177 <    private Object xfer(Object e, int mode, long nanos) {
177 >    private E xfer(E e, int mode, long nanos) {
178          boolean isData = (e != null);
179 <        QNode s = null;
180 <        final PaddedAtomicReference<QNode> head = this.head;
181 <        final PaddedAtomicReference<QNode> tail = this.tail;
179 >        Node<E> s = null;
180 >        final PaddedAtomicReference<Node<E>> head = this.head;
181 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
182  
183          for (;;) {
184 <            QNode t = tail.get();
185 <            QNode h = head.get();
184 >            Node<E> t = tail.get();
185 >            Node<E> h = head.get();
186  
187              if (t != null && (t == h || t.isData == isData)) {
188                  if (s == null)
189 <                    s = new QNode(e, isData);
190 <                QNode last = t.next;
189 >                    s = new Node<E>(e, isData);
190 >                Node<E> last = t.next;
191                  if (last != null) {
192                      if (t == tail.get())
193                          tail.compareAndSet(t, last);
# Line 199 | Line 199 | public class LinkedTransferQueue<E> exte
199              }
200  
201              else if (h != null) {
202 <                QNode first = h.next;
202 >                Node<E> first = h.next;
203                  if (t == tail.get() && first != null &&
204                      advanceHead(h, first)) {
205                      Object x = first.get();
206                      if (x != first && first.compareAndSet(x, e)) {
207                          LockSupport.unpark(first.waiter);
208 <                        return isData ? e : x;
208 >                        return isData ? e : (E) x;
209                      }
210                  }
211              }
# Line 217 | Line 217 | public class LinkedTransferQueue<E> exte
217       * Version of xfer for poll() and tryTransfer, which
218       * simplifies control paths both here and in xfer.
219       */
220 <    private Object fulfill(Object e) {
220 >    private E fulfill(E e) {
221          boolean isData = (e != null);
222 <        final PaddedAtomicReference<QNode> head = this.head;
223 <        final PaddedAtomicReference<QNode> tail = this.tail;
222 >        final PaddedAtomicReference<Node<E>> head = this.head;
223 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
224  
225          for (;;) {
226 <            QNode t = tail.get();
227 <            QNode h = head.get();
226 >            Node<E> t = tail.get();
227 >            Node<E> h = head.get();
228  
229              if (t != null && (t == h || t.isData == isData)) {
230 <                QNode last = t.next;
230 >                Node<E> last = t.next;
231                  if (t == tail.get()) {
232                      if (last != null)
233                          tail.compareAndSet(t, last);
# Line 236 | Line 236 | public class LinkedTransferQueue<E> exte
236                  }
237              }
238              else if (h != null) {
239 <                QNode first = h.next;
239 >                Node<E> first = h.next;
240                  if (t == tail.get() &&
241                      first != null &&
242                      advanceHead(h, first)) {
243                      Object x = first.get();
244                      if (x != first && first.compareAndSet(x, e)) {
245                          LockSupport.unpark(first.waiter);
246 <                        return isData ? e : x;
246 >                        return isData ? e : (E) x;
247                      }
248                  }
249              }
# Line 261 | Line 261 | public class LinkedTransferQueue<E> exte
261       * @param nanos timeout value
262       * @return matched item, or s if cancelled
263       */
264 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
265 <                                int mode, long nanos) {
264 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
265 >                           int mode, long nanos) {
266          if (mode == NOWAIT)
267              return null;
268  
# Line 281 | Line 281 | public class LinkedTransferQueue<E> exte
281                  }
282                  else if (x != null) {
283                      s.set(s);             // avoid garbage retention
284 <                    return x;
284 >                    return (E) x;
285                  }
286                  else
287                      return e;
# Line 296 | Line 296 | public class LinkedTransferQueue<E> exte
296                  }
297              }
298              if (spins < 0) {
299 <                QNode h = head.get(); // only spin if at head
299 >                Node<E> h = head.get(); // only spin if at head
300                  spins = ((h != null && h.next == s) ?
301                           ((mode == TIMEOUT) ?
302                            maxTimedSpins : maxUntimedSpins) : 0);
# Line 321 | Line 321 | public class LinkedTransferQueue<E> exte
321      /**
322       * Returns validated tail for use in cleaning methods.
323       */
324 <    private QNode getValidatedTail() {
324 >    private Node<E> getValidatedTail() {
325          for (;;) {
326 <            QNode h = head.get();
327 <            QNode first = h.next;
326 >            Node<E> h = head.get();
327 >            Node<E> first = h.next;
328              if (first != null && first.next == first) { // help advance
329                  advanceHead(h, first);
330                  continue;
331              }
332 <            QNode t = tail.get();
333 <            QNode last = t.next;
332 >            Node<E> t = tail.get();
333 >            Node<E> last = t.next;
334              if (t == tail.get()) {
335                  if (last != null)
336                      tail.compareAndSet(t, last); // help advance
# Line 346 | Line 346 | public class LinkedTransferQueue<E> exte
346       * @param pred predecessor of cancelled node
347       * @param s the cancelled node
348       */
349 <    private void clean(QNode pred, QNode s) {
349 >    private void clean(Node<E> pred, Node<E> s) {
350          Thread w = s.waiter;
351          if (w != null) {             // Wake up thread
352              s.waiter = null;
# Line 366 | Line 366 | public class LinkedTransferQueue<E> exte
366           * processed, so this always terminates.
367           */
368          while (pred.next == s) {
369 <            QNode oldpred = reclean();  // First, help get rid of cleanMe
370 <            QNode t = getValidatedTail();
369 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
370 >            Node<E> t = getValidatedTail();
371              if (s != t) {               // If not tail, try to unsplice
372 <                QNode sn = s.next;      // s.next == s means s already off list
372 >                Node<E> sn = s.next;      // s.next == s means s already off list
373                  if (sn == s || pred.casNext(s, sn))
374                      break;
375              }
# Line 385 | Line 385 | public class LinkedTransferQueue<E> exte
385       *
386       * @return current cleanMe node (or null)
387       */
388 <    private QNode reclean() {
388 >    private Node<E> reclean() {
389          /*
390           * cleanMe is, or at one time was, predecessor of cancelled
391           * node s that was the tail so could not be unspliced.  If s
# Line 397 | Line 397 | public class LinkedTransferQueue<E> exte
397           * This can loop only due to contention on casNext or
398           * clearing cleanMe.
399           */
400 <        QNode pred;
400 >        Node<E> pred;
401          while ((pred = cleanMe.get()) != null) {
402 <            QNode t = getValidatedTail();
403 <            QNode s = pred.next;
402 >            Node<E> t = getValidatedTail();
403 >            Node<E> s = pred.next;
404              if (s != t) {
405 <                QNode sn;
405 >                Node<E> sn;
406                  if (s == null || s == pred || s.get() != s ||
407                      (sn = s.next) == s || pred.casNext(s, sn))
408                      cleanMe.compareAndSet(pred, null);
# Line 417 | Line 417 | public class LinkedTransferQueue<E> exte
417       * Creates an initially empty {@code LinkedTransferQueue}.
418       */
419      public LinkedTransferQueue() {
420 <        QNode dummy = new QNode(null, false);
421 <        head = new PaddedAtomicReference<QNode>(dummy);
422 <        tail = new PaddedAtomicReference<QNode>(dummy);
423 <        cleanMe = new PaddedAtomicReference<QNode>(null);
420 >        Node<E> dummy = new Node<E>(null, false);
421 >        head = new PaddedAtomicReference<Node<E>>(dummy);
422 >        tail = new PaddedAtomicReference<Node<E>>(dummy);
423 >        cleanMe = new PaddedAtomicReference<Node<E>>(null);
424      }
425  
426      /**
# Line 502 | Line 502 | public class LinkedTransferQueue<E> exte
502      }
503  
504      public E poll() {
505 <        return (E) fulfill(null);
505 >        return fulfill(null);
506      }
507  
508      public int drainTo(Collection<? super E> c) {
# Line 538 | Line 538 | public class LinkedTransferQueue<E> exte
538      /**
539       * Returns head after performing any outstanding helping steps.
540       */
541 <    private QNode traversalHead() {
541 >    private Node<E> traversalHead() {
542          for (;;) {
543 <            QNode t = tail.get();
544 <            QNode h = head.get();
543 >            Node<E> t = tail.get();
544 >            Node<E> h = head.get();
545              if (h != null && t != null) {
546 <                QNode last = t.next;
547 <                QNode first = h.next;
546 >                Node<E> last = t.next;
547 >                Node<E> first = h.next;
548                  if (t == tail.get()) {
549                      if (last != null)
550                          tail.compareAndSet(t, last);
# Line 576 | Line 576 | public class LinkedTransferQueue<E> exte
576       * if subsequently removed.
577       */
578      class Itr implements Iterator<E> {
579 <        QNode next;        // node to return next
580 <        QNode pnext;       // predecessor of next
581 <        QNode snext;       // successor of next
582 <        QNode curr;        // last returned node, for remove()
583 <        QNode pcurr;       // predecessor of curr, for remove()
579 >        Node<E> next;        // node to return next
580 >        Node<E> pnext;       // predecessor of next
581 >        Node<E> snext;       // successor of next
582 >        Node<E> curr;        // last returned node, for remove()
583 >        Node<E> pcurr;       // predecessor of curr, for remove()
584          E nextItem;        // Cache of next item, once committed to in next
585  
586          Itr() {
# Line 592 | Line 592 | public class LinkedTransferQueue<E> exte
592           */
593          void findNext() {
594              for (;;) {
595 <                QNode pred = pnext;
596 <                QNode q = next;
595 >                Node<E> pred = pnext;
596 >                Node<E> q = next;
597                  if (pred == null || pred == q) {
598                      pred = traversalHead();
599                      q = pred.next;
# Line 603 | Line 603 | public class LinkedTransferQueue<E> exte
603                      return;
604                  }
605                  Object x = q.get();
606 <                QNode s = q.next;
606 >                Node<E> s = q.next;
607                  if (x != null && q != x && q != s) {
608                      nextItem = (E) x;
609                      snext = s;
# Line 632 | Line 632 | public class LinkedTransferQueue<E> exte
632          }
633  
634          public void remove() {
635 <            QNode p = curr;
635 >            Node<E> p = curr;
636              if (p == null)
637                  throw new IllegalStateException();
638              Object x = p.get();
# Line 643 | Line 643 | public class LinkedTransferQueue<E> exte
643  
644      public E peek() {
645          for (;;) {
646 <            QNode h = traversalHead();
647 <            QNode p = h.next;
646 >            Node<E> h = traversalHead();
647 >            Node<E> p = h.next;
648              if (p == null)
649                  return null;
650              Object x = p.get();
# Line 659 | Line 659 | public class LinkedTransferQueue<E> exte
659  
660      public boolean isEmpty() {
661          for (;;) {
662 <            QNode h = traversalHead();
663 <            QNode p = h.next;
662 >            Node<E> h = traversalHead();
663 >            Node<E> p = h.next;
664              if (p == null)
665                  return true;
666              Object x = p.get();
# Line 675 | Line 675 | public class LinkedTransferQueue<E> exte
675  
676      public boolean hasWaitingConsumer() {
677          for (;;) {
678 <            QNode h = traversalHead();
679 <            QNode p = h.next;
678 >            Node<E> h = traversalHead();
679 >            Node<E> p = h.next;
680              if (p == null)
681                  return false;
682              Object x = p.get();
# Line 699 | Line 699 | public class LinkedTransferQueue<E> exte
699       */
700      public int size() {
701          int count = 0;
702 <        QNode h = traversalHead();
703 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
702 >        Node<E> h = traversalHead();
703 >        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
704              Object x = p.get();
705              if (x != null && x != p) {
706                  if (++count == Integer.MAX_VALUE) // saturated
# Line 712 | Line 712 | public class LinkedTransferQueue<E> exte
712  
713      public int getWaitingConsumerCount() {
714          int count = 0;
715 <        QNode h = traversalHead();
716 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
715 >        Node<E> h = traversalHead();
716 >        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
717              if (p.get() == null) {
718                  if (++count == Integer.MAX_VALUE)
719                      break;
# Line 730 | Line 730 | public class LinkedTransferQueue<E> exte
730          if (o == null)
731              return false;
732          for (;;) {
733 <            QNode pred = traversalHead();
733 >            Node<E> pred = traversalHead();
734              for (;;) {
735 <                QNode q = pred.next;
735 >                Node<E> q = pred.next;
736                  if (q == null || !q.isData)
737                      return false;
738                  if (q == pred) // restart
# Line 775 | Line 775 | public class LinkedTransferQueue<E> exte
775          s.defaultReadObject();
776          resetHeadAndTail();
777          for (;;) {
778 <            E item = (E) s.readObject();
778 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
779              if (item == null)
780                  break;
781              else
# Line 783 | Line 783 | public class LinkedTransferQueue<E> exte
783          }
784      }
785  
786
786      // Support for resetting head/tail while deserializing
787      private void resetHeadAndTail() {
788 <        QNode dummy = new QNode(null, false);
788 >        Node<E> dummy = new Node<E>(null, false);
789          UNSAFE.putObjectVolatile(this, headOffset,
790 <                                  new PaddedAtomicReference<QNode>(dummy));
790 >                                 new PaddedAtomicReference<Node<E>>(dummy));
791          UNSAFE.putObjectVolatile(this, tailOffset,
792 <                                  new PaddedAtomicReference<QNode>(dummy));
792 >                                 new PaddedAtomicReference<Node<E>>(dummy));
793          UNSAFE.putObjectVolatile(this, cleanMeOffset,
794 <                                  new PaddedAtomicReference<QNode>(null));
794 >                                 new PaddedAtomicReference<Node<E>>(null));
795      }
796  
797 <    // Temporary Unsafe mechanics for preliminary release
798 <    private static Unsafe getUnsafe() throws Throwable {
797 >    // Unsafe mechanics for jsr166y 3rd party package.
798 >    private static sun.misc.Unsafe getUnsafe() {
799          try {
800 <            return Unsafe.getUnsafe();
800 >            return sun.misc.Unsafe.getUnsafe();
801          } catch (SecurityException se) {
802              try {
803                  return java.security.AccessController.doPrivileged
804 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
805 <                        public Unsafe run() throws Exception {
806 <                            return getUnsafePrivileged();
804 >                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
805 >                        public sun.misc.Unsafe run() throws Exception {
806 >                            return getUnsafeByReflection();
807                          }});
808              } catch (java.security.PrivilegedActionException e) {
809 <                throw e.getCause();
809 >                throw new RuntimeException("Could not initialize intrinsics",
810 >                                           e.getCause());
811              }
812          }
813      }
814  
815 <    private static Unsafe getUnsafePrivileged()
815 >    private static sun.misc.Unsafe getUnsafeByReflection()
816              throws NoSuchFieldException, IllegalAccessException {
817 <        Field f = Unsafe.class.getDeclaredField("theUnsafe");
817 >        java.lang.reflect.Field f =
818 >            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
819          f.setAccessible(true);
820 <        return (Unsafe) f.get(null);
820 >        return (sun.misc.Unsafe) f.get(null);
821      }
822  
823 <    private static long fieldOffset(String fieldName)
823 <            throws NoSuchFieldException {
824 <        return UNSAFE.objectFieldOffset
825 <            (LinkedTransferQueue.class.getDeclaredField(fieldName));
826 <    }
827 <
828 <    private static final Unsafe UNSAFE;
829 <    private static final long headOffset;
830 <    private static final long tailOffset;
831 <    private static final long cleanMeOffset;
832 <    static {
823 >    private static long fieldOffset(String fieldName, Class<?> klazz) {
824          try {
825 <            UNSAFE = getUnsafe();
826 <            headOffset = fieldOffset("head");
827 <            tailOffset = fieldOffset("tail");
828 <            cleanMeOffset = fieldOffset("cleanMe");
829 <        } catch (Throwable e) {
830 <            throw new RuntimeException("Could not initialize intrinsics", e);
825 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
826 >        } catch (NoSuchFieldException e) {
827 >            // Convert Exception to Error
828 >            NoSuchFieldError error = new NoSuchFieldError(fieldName);
829 >            error.initCause(e);
830 >            throw error;
831          }
832      }
833  
834 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
835 +    static final long headOffset =
836 +        fieldOffset("head", LinkedTransferQueue.class);
837 +    static final long tailOffset =
838 +        fieldOffset("tail", LinkedTransferQueue.class);
839 +    static final long cleanMeOffset =
840 +        fieldOffset("cleanMe", LinkedTransferQueue.class);
841 +
842   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines