ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.26 by jsr166, Sat Jul 25 00:34:00 2009 UTC vs.
Revision 1.43 by jsr166, Sat Aug 1 21:24:01 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.*;
10  
11   import java.util.AbstractQueue;
12   import java.util.Collection;
13 + import java.util.ConcurrentModificationException;
14   import java.util.Iterator;
15   import java.util.NoSuchElementException;
16 + import java.util.Queue;
17   import java.util.concurrent.locks.LockSupport;
18   import java.util.concurrent.atomic.AtomicReference;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19  
20   /**
21 < * An unbounded {@linkplain TransferQueue} based on linked nodes.
21 > * An unbounded {@link TransferQueue} based on linked nodes.
22   * This queue orders elements FIFO (first-in-first-out) with respect
23   * to any given producer.  The <em>head</em> of the queue is that
24   * element that has been on the queue the longest time for some
# Line 115 | Line 116 | public class LinkedTransferQueue<E> exte
116              this.isData = isData;
117          }
118  
119 <        @SuppressWarnings("rawtypes")
120 <        static final AtomicReferenceFieldUpdater<Node, Node>
121 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
122 <            (Node.class, Node.class, "next");
119 >        // Unsafe mechanics
120 >
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124  
125          final boolean casNext(Node<E> cmp, Node<E> val) {
126 <            return nextUpdater.compareAndSet(this, cmp, val);
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127          }
128  
129          final void clearNext() {
130 <            nextUpdater.lazySet(this, this);
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160  
161          private static final long serialVersionUID = -3375979862319811754L;
# Line 264 | Line 294 | public class LinkedTransferQueue<E> exte
294       * @param e the comparison value for checking match
295       * @param mode mode
296       * @param nanos timeout value
297 <     * @return matched item, or s if cancelled
297 >     * @return matched item, or null if cancelled
298       */
299      private E awaitFulfill(Node<E> pred, Node<E> s, E e,
300                             int mode, long nanos) {
# Line 330 | Line 360 | public class LinkedTransferQueue<E> exte
360          for (;;) {
361              Node<E> h = head.get();
362              Node<E> first = h.next;
363 <            if (first != null && first.next == first) { // help advance
363 >            if (first != null && first.get() == first) { // help advance
364                  advanceHead(h, first);
365                  continue;
366              }
# Line 442 | Line 472 | public class LinkedTransferQueue<E> exte
472          addAll(c);
473      }
474  
475 <    public void put(E e) throws InterruptedException {
476 <        if (e == null) throw new NullPointerException();
477 <        if (Thread.interrupted()) throw new InterruptedException();
478 <        xfer(e, NOWAIT, 0);
475 >    /**
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480 >     */
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485 <    public boolean offer(E e, long timeout, TimeUnit unit)
486 <        throws InterruptedException {
487 <        if (e == null) throw new NullPointerException();
488 <        if (Thread.interrupted()) throw new InterruptedException();
489 <        xfer(e, NOWAIT, 0);
490 <        return true;
485 >    /**
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493 >     */
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498 +    /**
499 +     * Inserts the specified element at the tail of this queue.
500 +     * As the queue is unbounded, this method will never return {@code false}.
501 +     *
502 +     * @return {@code true} (as specified by
503 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 +     * @throws NullPointerException if the specified element is null
505 +     */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
508          xfer(e, NOWAIT, 0);
509          return true;
510      }
511  
512 +    /**
513 +     * Inserts the specified element at the tail of this queue.
514 +     * As the queue is unbounded, this method will never throw
515 +     * {@link IllegalStateException} or return {@code false}.
516 +     *
517 +     * @return {@code true} (as specified by {@link Collection#add})
518 +     * @throws NullPointerException if the specified element is null
519 +     */
520      public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the element to a waiting consumer immediately, if possible.
526 +     *
527 +     * <p>More precisely, transfers the specified element immediately
528 +     * if there exists a consumer already waiting to receive it (in
529 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
530 +     * otherwise returning {@code false} without enqueuing the element.
531 +     *
532 +     * @throws NullPointerException if the specified element is null
533 +     */
534 +    public boolean tryTransfer(E e) {
535          if (e == null) throw new NullPointerException();
536 <        xfer(e, NOWAIT, 0);
468 <        return true;
536 >        return fulfill(e) != null;
537      }
538  
539 +    /**
540 +     * Transfers the element to a consumer, waiting if necessary to do so.
541 +     *
542 +     * <p>More precisely, transfers the specified element immediately
543 +     * if there exists a consumer already waiting to receive it (in
544 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
545 +     * else inserts the specified element at the tail of this queue
546 +     * and waits until the element is received by a consumer.
547 +     *
548 +     * @throws NullPointerException if the specified element is null
549 +     */
550      public void transfer(E e) throws InterruptedException {
551          if (e == null) throw new NullPointerException();
552          if (xfer(e, WAIT, 0) == null) {
# Line 476 | Line 555 | public class LinkedTransferQueue<E> exte
555          }
556      }
557  
558 +    /**
559 +     * Transfers the element to a consumer if it is possible to do so
560 +     * before the timeout elapses.
561 +     *
562 +     * <p>More precisely, transfers the specified element immediately
563 +     * if there exists a consumer already waiting to receive it (in
564 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
565 +     * else inserts the specified element at the tail of this queue
566 +     * and waits until the element is received by a consumer,
567 +     * returning {@code false} if the specified wait time elapses
568 +     * before the element can be transferred.
569 +     *
570 +     * @throws NullPointerException if the specified element is null
571 +     */
572      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
573          throws InterruptedException {
574          if (e == null) throw new NullPointerException();
# Line 486 | Line 579 | public class LinkedTransferQueue<E> exte
579          throw new InterruptedException();
580      }
581  
489    public boolean tryTransfer(E e) {
490        if (e == null) throw new NullPointerException();
491        return fulfill(e) != null;
492    }
493
582      public E take() throws InterruptedException {
583 <        Object e = xfer(null, WAIT, 0);
583 >        E e = xfer(null, WAIT, 0);
584          if (e != null)
585 <            return (E) e;
585 >            return e;
586          Thread.interrupted();
587          throw new InterruptedException();
588      }
589  
590      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
591 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
591 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
592          if (e != null || !Thread.interrupted())
593 <            return (E) e;
593 >            return e;
594          throw new InterruptedException();
595      }
596  
# Line 510 | Line 598 | public class LinkedTransferQueue<E> exte
598          return fulfill(null);
599      }
600  
601 +    /**
602 +     * @throws NullPointerException     {@inheritDoc}
603 +     * @throws IllegalArgumentException {@inheritDoc}
604 +     */
605      public int drainTo(Collection<? super E> c) {
606          if (c == null)
607              throw new NullPointerException();
# Line 524 | Line 616 | public class LinkedTransferQueue<E> exte
616          return n;
617      }
618  
619 +    /**
620 +     * @throws NullPointerException     {@inheritDoc}
621 +     * @throws IllegalArgumentException {@inheritDoc}
622 +     */
623      public int drainTo(Collection<? super E> c, int maxElements) {
624          if (c == null)
625              throw new NullPointerException();
# Line 568 | Line 664 | public class LinkedTransferQueue<E> exte
664          }
665      }
666  
667 <
667 >    /**
668 >     * Returns an iterator over the elements in this queue in proper
669 >     * sequence, from head to tail.
670 >     *
671 >     * <p>The returned iterator is a "weakly consistent" iterator that
672 >     * will never throw
673 >     * {@link ConcurrentModificationException ConcurrentModificationException},
674 >     * and guarantees to traverse elements as they existed upon
675 >     * construction of the iterator, and may (but is not guaranteed
676 >     * to) reflect any modifications subsequent to construction.
677 >     *
678 >     * @return an iterator over the elements in this queue in proper sequence
679 >     */
680      public Iterator<E> iterator() {
681          return new Itr();
682      }
# Line 583 | Line 691 | public class LinkedTransferQueue<E> exte
691      class Itr implements Iterator<E> {
692          Node<E> next;        // node to return next
693          Node<E> pnext;       // predecessor of next
586        Node<E> snext;       // successor of next
694          Node<E> curr;        // last returned node, for remove()
695          Node<E> pcurr;       // predecessor of curr, for remove()
696 <        E nextItem;        // Cache of next item, once committed to in next
696 >        E nextItem;          // Cache of next item, once committed to in next
697  
698          Itr() {
699 <            findNext();
699 >            advance();
700          }
701  
702          /**
703 <         * Ensures next points to next valid node, or null if none.
703 >         * Moves to next valid node and returns item to return for
704 >         * next(), or null if no such.
705           */
706 <        void findNext() {
706 >        private E advance() {
707 >            pcurr = pnext;
708 >            curr = next;
709 >            E item = nextItem;
710 >
711              for (;;) {
712 <                Node<E> pred = pnext;
713 <                Node<E> q = next;
714 <                if (pred == null || pred == q) {
603 <                    pred = traversalHead();
604 <                    q = pred.next;
605 <                }
606 <                if (q == null || !q.isData) {
712 >                pnext = (next == null) ? traversalHead() : next;
713 >                next = pnext.next;
714 >                if (next == pnext) {
715                      next = null;
716 <                    return;
716 >                    continue;  // restart
717                  }
718 <                Object x = q.get();
719 <                Node<E> s = q.next;
720 <                if (x != null && q != x && q != s) {
718 >                if (next == null)
719 >                    break;
720 >                Object x = next.get();
721 >                if (x != null && x != next) {
722                      nextItem = (E) x;
723 <                    snext = s;
615 <                    pnext = pred;
616 <                    next = q;
617 <                    return;
723 >                    break;
724                  }
619                pnext = q;
620                next = s;
725              }
726 +            return item;
727          }
728  
729          public boolean hasNext() {
# Line 626 | Line 731 | public class LinkedTransferQueue<E> exte
731          }
732  
733          public E next() {
734 <            if (next == null) throw new NoSuchElementException();
735 <            pcurr = pnext;
736 <            curr = next;
632 <            pnext = next;
633 <            next = snext;
634 <            E x = nextItem;
635 <            findNext();
636 <            return x;
734 >            if (next == null)
735 >                throw new NoSuchElementException();
736 >            return advance();
737          }
738  
739          public void remove() {
# Line 662 | Line 762 | public class LinkedTransferQueue<E> exte
762          }
763      }
764  
765 +    /**
766 +     * Returns {@code true} if this queue contains no elements.
767 +     *
768 +     * @return {@code true} if this queue contains no elements
769 +     */
770      public boolean isEmpty() {
771          for (;;) {
772              Node<E> h = traversalHead();
# Line 703 | Line 808 | public class LinkedTransferQueue<E> exte
808       * @return the number of elements in this queue
809       */
810      public int size() {
811 <        int count = 0;
812 <        Node<E> h = traversalHead();
813 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
814 <            Object x = p.get();
815 <            if (x != null && x != p) {
816 <                if (++count == Integer.MAX_VALUE) // saturated
811 >        for (;;) {
812 >            int count = 0;
813 >            Node<E> pred = traversalHead();
814 >            for (;;) {
815 >                Node<E> q = pred.next;
816 >                if (q == pred) // restart
817                      break;
818 +                if (q == null || !q.isData)
819 +                    return count;
820 +                Object x = q.get();
821 +                if (x != null && x != q) {
822 +                    if (++count == Integer.MAX_VALUE) // saturated
823 +                        return count;
824 +                }
825 +                pred = q;
826              }
827          }
715        return count;
828      }
829  
830      public int getWaitingConsumerCount() {
831 <        int count = 0;
832 <        Node<E> h = traversalHead();
833 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
834 <            if (p.get() == null) {
835 <                if (++count == Integer.MAX_VALUE)
831 >        // converse of size -- count valid non-data nodes
832 >        for (;;) {
833 >            int count = 0;
834 >            Node<E> pred = traversalHead();
835 >            for (;;) {
836 >                Node<E> q = pred.next;
837 >                if (q == pred) // restart
838                      break;
839 +                if (q == null || q.isData)
840 +                    return count;
841 +                Object x = q.get();
842 +                if (x == null) {
843 +                    if (++count == Integer.MAX_VALUE) // saturated
844 +                        return count;
845 +                }
846 +                pred = q;
847              }
848          }
727        return count;
728    }
729
730    public int remainingCapacity() {
731        return Integer.MAX_VALUE;
849      }
850  
851 +    /**
852 +     * Removes a single instance of the specified element from this queue,
853 +     * if it is present.  More formally, removes an element {@code e} such
854 +     * that {@code o.equals(e)}, if this queue contains one or more such
855 +     * elements.
856 +     * Returns {@code true} if this queue contained the specified element
857 +     * (or equivalently, if this queue changed as a result of the call).
858 +     *
859 +     * @param o element to be removed from this queue, if present
860 +     * @return {@code true} if this queue changed as a result of the call
861 +     */
862      public boolean remove(Object o) {
863          if (o == null)
864              return false;
# Line 738 | Line 866 | public class LinkedTransferQueue<E> exte
866              Node<E> pred = traversalHead();
867              for (;;) {
868                  Node<E> q = pred.next;
741                if (q == null || !q.isData)
742                    return false;
869                  if (q == pred) // restart
870                      break;
871 +                if (q == null || !q.isData)
872 +                    return false;
873                  Object x = q.get();
874                  if (x != null && x != q && o.equals(x) &&
875                      q.compareAndSet(x, q)) {
# Line 754 | Line 882 | public class LinkedTransferQueue<E> exte
882      }
883  
884      /**
885 +     * Always returns {@code Integer.MAX_VALUE} because a
886 +     * {@code LinkedTransferQueue} is not capacity constrained.
887 +     *
888 +     * @return {@code Integer.MAX_VALUE} (as specified by
889 +     *         {@link BlockingQueue#remainingCapacity()})
890 +     */
891 +    public int remainingCapacity() {
892 +        return Integer.MAX_VALUE;
893 +    }
894 +
895 +    /**
896       * Save the state to a stream (that is, serialize it).
897       *
898       * @serialData All of the elements (each an {@code E}) in
# Line 799 | Line 938 | public class LinkedTransferQueue<E> exte
938                                   new PaddedAtomicReference<Node<E>>(null));
939      }
940  
941 <    // Unsafe mechanics for jsr166y 3rd party package.
941 >    // Unsafe mechanics
942 >
943 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
944 >    private static final long headOffset =
945 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
946 >    private static final long tailOffset =
947 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
948 >    private static final long cleanMeOffset =
949 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
950 >
951 >
952 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
953 >                                  String field, Class<?> klazz) {
954 >        try {
955 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
956 >        } catch (NoSuchFieldException e) {
957 >            // Convert Exception to corresponding Error
958 >            NoSuchFieldError error = new NoSuchFieldError(field);
959 >            error.initCause(e);
960 >            throw error;
961 >        }
962 >    }
963 >
964 >    /**
965 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
966 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
967 >     * into a jdk.
968 >     *
969 >     * @return a sun.misc.Unsafe
970 >     */
971      private static sun.misc.Unsafe getUnsafe() {
972          try {
973              return sun.misc.Unsafe.getUnsafe();
974          } catch (SecurityException se) {
975              try {
976                  return java.security.AccessController.doPrivileged
977 <                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
977 >                    (new java.security
978 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
979                          public sun.misc.Unsafe run() throws Exception {
980 <                            return getUnsafeByReflection();
980 >                            java.lang.reflect.Field f = sun.misc
981 >                                .Unsafe.class.getDeclaredField("theUnsafe");
982 >                            f.setAccessible(true);
983 >                            return (sun.misc.Unsafe) f.get(null);
984                          }});
985              } catch (java.security.PrivilegedActionException e) {
986                  throw new RuntimeException("Could not initialize intrinsics",
# Line 816 | Line 988 | public class LinkedTransferQueue<E> exte
988              }
989          }
990      }
819
820    private static sun.misc.Unsafe getUnsafeByReflection()
821            throws NoSuchFieldException, IllegalAccessException {
822        java.lang.reflect.Field f =
823            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
824        f.setAccessible(true);
825        return (sun.misc.Unsafe) f.get(null);
826    }
827
828    private static long fieldOffset(String fieldName, Class<?> klazz) {
829        try {
830            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
831        } catch (NoSuchFieldException e) {
832            // Convert Exception to Error
833            NoSuchFieldError error = new NoSuchFieldError(fieldName);
834            error.initCause(e);
835            throw error;
836        }
837    }
838
839    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
840    static final long headOffset =
841        fieldOffset("head", LinkedTransferQueue.class);
842    static final long tailOffset =
843        fieldOffset("tail", LinkedTransferQueue.class);
844    static final long cleanMeOffset =
845        fieldOffset("cleanMe", LinkedTransferQueue.class);
846
991   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines