ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.30 by jsr166, Mon Jul 27 03:22:39 2009 UTC vs.
Revision 1.39 by jsr166, Fri Jul 31 20:24:30 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.*;
10  
11   import java.util.AbstractQueue;
12   import java.util.Collection;
13 + import java.util.ConcurrentModificationException;
14   import java.util.Iterator;
15   import java.util.NoSuchElementException;
16 + import java.util.Queue;
17   import java.util.concurrent.locks.LockSupport;
18   import java.util.concurrent.atomic.AtomicReference;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19  
20   /**
21   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 115 | Line 116 | public class LinkedTransferQueue<E> exte
116              this.isData = isData;
117          }
118  
119 <        @SuppressWarnings("rawtypes")
120 <        static final AtomicReferenceFieldUpdater<Node, Node>
121 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
122 <            (Node.class, Node.class, "next");
119 >        // Unsafe mechanics
120 >
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124  
125          final boolean casNext(Node<E> cmp, Node<E> val) {
126 <            return nextUpdater.compareAndSet(this, cmp, val);
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127          }
128  
129          final void clearNext() {
130 <            nextUpdater.lazySet(this, this);
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160  
161          private static final long serialVersionUID = -3375979862319811754L;
# Line 264 | Line 294 | public class LinkedTransferQueue<E> exte
294       * @param e the comparison value for checking match
295       * @param mode mode
296       * @param nanos timeout value
297 <     * @return matched item, or s if cancelled
297 >     * @return matched item, or null if cancelled
298       */
299      private E awaitFulfill(Node<E> pred, Node<E> s, E e,
300                             int mode, long nanos) {
# Line 330 | Line 360 | public class LinkedTransferQueue<E> exte
360          for (;;) {
361              Node<E> h = head.get();
362              Node<E> first = h.next;
363 <            if (first != null && first.next == first) { // help advance
363 >            if (first != null && first.get() == first) { // help advance
364                  advanceHead(h, first);
365                  continue;
366              }
# Line 443 | Line 473 | public class LinkedTransferQueue<E> exte
473      }
474  
475      /**
476 <     * @throws InterruptedException {@inheritDoc}
477 <     * @throws NullPointerException {@inheritDoc}
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480       */
481 <    public void put(E e) throws InterruptedException {
482 <        if (e == null) throw new NullPointerException();
451 <        if (Thread.interrupted()) throw new InterruptedException();
452 <        xfer(e, NOWAIT, 0);
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485      /**
486 <     * @throws InterruptedException {@inheritDoc}
487 <     * @throws NullPointerException {@inheritDoc}
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493       */
494 <    public boolean offer(E e, long timeout, TimeUnit unit)
495 <        throws InterruptedException {
461 <        if (e == null) throw new NullPointerException();
462 <        if (Thread.interrupted()) throw new InterruptedException();
463 <        xfer(e, NOWAIT, 0);
464 <        return true;
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498      /**
499 <     * @throws NullPointerException {@inheritDoc}
499 >     * Inserts the specified element at the tail of this queue.
500 >     * As the queue is unbounded, this method will never return {@code false}.
501 >     *
502 >     * @return {@code true} (as specified by
503 >     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 >     * @throws NullPointerException if the specified element is null
505       */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
# Line 474 | Line 510 | public class LinkedTransferQueue<E> exte
510      }
511  
512      /**
513 <     * @throws NullPointerException {@inheritDoc}
513 >     * Inserts the specified element at the tail of this queue.
514 >     * As the queue is unbounded, this method will never throw
515 >     * {@link IllegalStateException} or return {@code false}.
516 >     *
517 >     * @return {@code true} (as specified by {@link Collection#add})
518 >     * @throws NullPointerException if the specified element is null
519       */
520      public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the specified element immediately if there exists a
526 +     * consumer already waiting to receive it (in {@link #take} or
527 +     * timed {@link #poll(long,TimeUnit) poll}), otherwise
528 +     * returning {@code false} without enqueuing the element.
529 +     *
530 +     * @throws NullPointerException if the specified element is null
531 +     */
532 +    public boolean tryTransfer(E e) {
533          if (e == null) throw new NullPointerException();
534 <        xfer(e, NOWAIT, 0);
482 <        return true;
534 >        return fulfill(e) != null;
535      }
536  
537      /**
538 <     * @throws InterruptedException {@inheritDoc}
539 <     * @throws NullPointerException {@inheritDoc}
538 >     * Inserts the specified element at the tail of this queue,
539 >     * waiting if necessary for the element to be received by a
540 >     * consumer invoking {@code take} or {@code poll}.
541 >     *
542 >     * @throws NullPointerException if the specified element is null
543       */
544      public void transfer(E e) throws InterruptedException {
545          if (e == null) throw new NullPointerException();
# Line 495 | Line 550 | public class LinkedTransferQueue<E> exte
550      }
551  
552      /**
553 <     * @throws InterruptedException {@inheritDoc}
554 <     * @throws NullPointerException {@inheritDoc}
553 >     * Inserts the specified element at the tail of this queue,
554 >     * waiting up to the specified wait time if necessary for the
555 >     * element to be received by a consumer invoking {@code take} or
556 >     * {@code poll}.
557 >     *
558 >     * @throws NullPointerException if the specified element is null
559       */
560      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
561          throws InterruptedException {
# Line 508 | Line 567 | public class LinkedTransferQueue<E> exte
567          throw new InterruptedException();
568      }
569  
511    /**
512     * @throws NullPointerException {@inheritDoc}
513     */
514    public boolean tryTransfer(E e) {
515        if (e == null) throw new NullPointerException();
516        return fulfill(e) != null;
517    }
518
519    /**
520     * @throws InterruptedException {@inheritDoc}
521     */
570      public E take() throws InterruptedException {
571 <        Object e = xfer(null, WAIT, 0);
571 >        E e = xfer(null, WAIT, 0);
572          if (e != null)
573 <            return (E) e;
573 >            return e;
574          Thread.interrupted();
575          throw new InterruptedException();
576      }
577  
530    /**
531     * @throws InterruptedException {@inheritDoc}
532     */
578      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
579 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
579 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
580          if (e != null || !Thread.interrupted())
581 <            return (E) e;
581 >            return e;
582          throw new InterruptedException();
583      }
584  
# Line 607 | Line 652 | public class LinkedTransferQueue<E> exte
652          }
653      }
654  
655 <
655 >    /**
656 >     * Returns an iterator over the elements in this queue in proper
657 >     * sequence, from head to tail.
658 >     *
659 >     * <p>The returned iterator is a "weakly consistent" iterator that
660 >     * will never throw
661 >     * {@link ConcurrentModificationException ConcurrentModificationException},
662 >     * and guarantees to traverse elements as they existed upon
663 >     * construction of the iterator, and may (but is not guaranteed
664 >     * to) reflect any modifications subsequent to construction.
665 >     *
666 >     * @return an iterator over the elements in this queue in proper sequence
667 >     */
668      public Iterator<E> iterator() {
669          return new Itr();
670      }
# Line 622 | Line 679 | public class LinkedTransferQueue<E> exte
679      class Itr implements Iterator<E> {
680          Node<E> next;        // node to return next
681          Node<E> pnext;       // predecessor of next
625        Node<E> snext;       // successor of next
682          Node<E> curr;        // last returned node, for remove()
683          Node<E> pcurr;       // predecessor of curr, for remove()
684 <        E nextItem;        // Cache of next item, once committed to in next
684 >        E nextItem;          // Cache of next item, once committed to in next
685  
686          Itr() {
687 <            findNext();
687 >            advance();
688          }
689  
690          /**
691 <         * Ensures next points to next valid node, or null if none.
691 >         * Moves to next valid node and returns item to return for
692 >         * next(), or null if no such.
693           */
694 <        void findNext() {
694 >        private E advance() {
695 >            pcurr = pnext;
696 >            curr = next;
697 >            E item = nextItem;
698 >
699              for (;;) {
700 <                Node<E> pred = pnext;
701 <                Node<E> q = next;
702 <                if (pred == null || pred == q) {
642 <                    pred = traversalHead();
643 <                    q = pred.next;
644 <                }
645 <                if (q == null || !q.isData) {
700 >                pnext = (next == null) ? traversalHead() : next;
701 >                next = pnext.next;
702 >                if (next == pnext) {
703                      next = null;
704 <                    return;
704 >                    continue;  // restart
705                  }
706 <                Object x = q.get();
707 <                Node<E> s = q.next;
708 <                if (x != null && q != x && q != s) {
706 >                if (next == null)
707 >                    break;
708 >                Object x = next.get();
709 >                if (x != null && x != next) {
710                      nextItem = (E) x;
711 <                    snext = s;
654 <                    pnext = pred;
655 <                    next = q;
656 <                    return;
711 >                    break;
712                  }
658                pnext = q;
659                next = s;
713              }
714 +            return item;
715          }
716  
717          public boolean hasNext() {
# Line 665 | Line 719 | public class LinkedTransferQueue<E> exte
719          }
720  
721          public E next() {
722 <            if (next == null) throw new NoSuchElementException();
723 <            pcurr = pnext;
724 <            curr = next;
671 <            pnext = next;
672 <            next = snext;
673 <            E x = nextItem;
674 <            findNext();
675 <            return x;
722 >            if (next == null)
723 >                throw new NoSuchElementException();
724 >            return advance();
725          }
726  
727          public void remove() {
# Line 742 | Line 791 | public class LinkedTransferQueue<E> exte
791       * @return the number of elements in this queue
792       */
793      public int size() {
794 <        int count = 0;
795 <        Node<E> h = traversalHead();
796 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
797 <            Object x = p.get();
798 <            if (x != null && x != p) {
799 <                if (++count == Integer.MAX_VALUE) // saturated
794 >        for (;;) {
795 >            int count = 0;
796 >            Node<E> pred = traversalHead();
797 >            for (;;) {
798 >                Node<E> q = pred.next;
799 >                if (q == pred) // restart
800                      break;
801 +                if (q == null || !q.isData)
802 +                    return count;
803 +                Object x = q.get();
804 +                if (x != null && x != q) {
805 +                    if (++count == Integer.MAX_VALUE) // saturated
806 +                        return count;
807 +                }
808 +                pred = q;
809              }
810          }
754        return count;
811      }
812  
813      public int getWaitingConsumerCount() {
814 <        int count = 0;
815 <        Node<E> h = traversalHead();
816 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
817 <            if (p.get() == null) {
818 <                if (++count == Integer.MAX_VALUE)
814 >        // converse of size -- count valid non-data nodes
815 >        for (;;) {
816 >            int count = 0;
817 >            Node<E> pred = traversalHead();
818 >            for (;;) {
819 >                Node<E> q = pred.next;
820 >                if (q == pred) // restart
821                      break;
822 +                if (q == null || q.isData)
823 +                    return count;
824 +                Object x = q.get();
825 +                if (x == null) {
826 +                    if (++count == Integer.MAX_VALUE) // saturated
827 +                        return count;
828 +                }
829 +                pred = q;
830              }
831          }
766        return count;
767    }
768
769    public int remainingCapacity() {
770        return Integer.MAX_VALUE;
832      }
833  
834      public boolean remove(Object o) {
# Line 777 | Line 838 | public class LinkedTransferQueue<E> exte
838              Node<E> pred = traversalHead();
839              for (;;) {
840                  Node<E> q = pred.next;
780                if (q == null || !q.isData)
781                    return false;
841                  if (q == pred) // restart
842                      break;
843 +                if (q == null || !q.isData)
844 +                    return false;
845                  Object x = q.get();
846                  if (x != null && x != q && o.equals(x) &&
847                      q.compareAndSet(x, q)) {
# Line 793 | Line 854 | public class LinkedTransferQueue<E> exte
854      }
855  
856      /**
857 +     * Always returns {@code Integer.MAX_VALUE} because a
858 +     * {@code LinkedTransferQueue} is not capacity constrained.
859 +     *
860 +     * @return {@code Integer.MAX_VALUE} (as specified by
861 +     *         {@link BlockingQueue#remainingCapacity()})
862 +     */
863 +    public int remainingCapacity() {
864 +        return Integer.MAX_VALUE;
865 +    }
866 +
867 +    /**
868       * Save the state to a stream (that is, serialize it).
869       *
870       * @serialData All of the elements (each an {@code E}) in
# Line 842 | Line 914 | public class LinkedTransferQueue<E> exte
914  
915      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
916      private static final long headOffset =
917 <        objectFieldOffset("head", LinkedTransferQueue.class);
917 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
918      private static final long tailOffset =
919 <        objectFieldOffset("tail", LinkedTransferQueue.class);
919 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
920      private static final long cleanMeOffset =
921 <        objectFieldOffset("cleanMe", LinkedTransferQueue.class);
921 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
922 >
923  
924 <    private static long objectFieldOffset(String field, Class<?> klazz) {
924 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
925 >                                  String field, Class<?> klazz) {
926          try {
927              return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
928          } catch (NoSuchFieldException e) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines