ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.25 by jsr166, Fri Jul 24 23:48:26 2009 UTC vs.
Revision 1.44 by jsr166, Tue Aug 4 20:32:16 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.ConcurrentModificationException;
14 > import java.util.Iterator;
15 > import java.util.NoSuchElementException;
16 > import java.util.Queue;
17 > import java.util.concurrent.locks.LockSupport;
18 > import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21 < * An unbounded {@linkplain TransferQueue} based on linked nodes.
21 > * An unbounded {@link TransferQueue} based on linked nodes.
22   * This queue orders elements FIFO (first-in-first-out) with respect
23   * to any given producer.  The <em>head</em> of the queue is that
24   * element that has been on the queue the longest time for some
# Line 110 | Line 116 | public class LinkedTransferQueue<E> exte
116              this.isData = isData;
117          }
118  
119 <        @SuppressWarnings("rawtypes")
120 <        static final AtomicReferenceFieldUpdater<Node, Node>
121 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
122 <            (Node.class, Node.class, "next");
119 >        // Unsafe mechanics
120 >
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124  
125          final boolean casNext(Node<E> cmp, Node<E> val) {
126 <            return nextUpdater.compareAndSet(this, cmp, val);
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127          }
128  
129          final void clearNext() {
130 <            nextUpdater.lazySet(this, this);
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160  
161          private static final long serialVersionUID = -3375979862319811754L;
# Line 184 | Line 219 | public class LinkedTransferQueue<E> exte
219              Node<E> t = tail.get();
220              Node<E> h = head.get();
221  
222 <            if (t != null && (t == h || t.isData == isData)) {
222 >            if (t == h || t.isData == isData) {
223                  if (s == null)
224                      s = new Node<E>(e, isData);
225                  Node<E> last = t.next;
# Line 196 | Line 231 | public class LinkedTransferQueue<E> exte
231                      tail.compareAndSet(t, s);
232                      return awaitFulfill(t, s, e, mode, nanos);
233                  }
234 <            }
200 <
201 <            else if (h != null) {
234 >            } else {
235                  Node<E> first = h.next;
236                  if (t == tail.get() && first != null &&
237                      advanceHead(h, first)) {
# Line 226 | Line 259 | public class LinkedTransferQueue<E> exte
259              Node<E> t = tail.get();
260              Node<E> h = head.get();
261  
262 <            if (t != null && (t == h || t.isData == isData)) {
262 >            if (t == h || t.isData == isData) {
263                  Node<E> last = t.next;
264                  if (t == tail.get()) {
265                      if (last != null)
# Line 234 | Line 267 | public class LinkedTransferQueue<E> exte
267                      else
268                          return null;
269                  }
270 <            }
238 <            else if (h != null) {
270 >            } else {
271                  Node<E> first = h.next;
272                  if (t == tail.get() &&
273                      first != null &&
# Line 259 | Line 291 | public class LinkedTransferQueue<E> exte
291       * @param e the comparison value for checking match
292       * @param mode mode
293       * @param nanos timeout value
294 <     * @return matched item, or s if cancelled
294 >     * @return matched item, or null if cancelled
295       */
296      private E awaitFulfill(Node<E> pred, Node<E> s, E e,
297                             int mode, long nanos) {
# Line 297 | Line 329 | public class LinkedTransferQueue<E> exte
329              }
330              if (spins < 0) {
331                  Node<E> h = head.get(); // only spin if at head
332 <                spins = ((h != null && h.next == s) ?
333 <                         ((mode == TIMEOUT) ?
334 <                          maxTimedSpins : maxUntimedSpins) : 0);
332 >                spins = ((h.next != s) ? 0 :
333 >                         (mode == TIMEOUT) ? maxTimedSpins :
334 >                         maxUntimedSpins);
335              }
336              if (spins > 0)
337                  --spins;
# Line 325 | Line 357 | public class LinkedTransferQueue<E> exte
357          for (;;) {
358              Node<E> h = head.get();
359              Node<E> first = h.next;
360 <            if (first != null && first.next == first) { // help advance
360 >            if (first != null && first.get() == first) { // help advance
361                  advanceHead(h, first);
362                  continue;
363              }
# Line 437 | Line 469 | public class LinkedTransferQueue<E> exte
469          addAll(c);
470      }
471  
472 <    public void put(E e) throws InterruptedException {
473 <        if (e == null) throw new NullPointerException();
474 <        if (Thread.interrupted()) throw new InterruptedException();
475 <        xfer(e, NOWAIT, 0);
472 >    /**
473 >     * Inserts the specified element at the tail of this queue.
474 >     * As the queue is unbounded, this method will never block.
475 >     *
476 >     * @throws NullPointerException if the specified element is null
477 >     */
478 >    public void put(E e) {
479 >        offer(e);
480      }
481  
482 <    public boolean offer(E e, long timeout, TimeUnit unit)
483 <        throws InterruptedException {
484 <        if (e == null) throw new NullPointerException();
485 <        if (Thread.interrupted()) throw new InterruptedException();
486 <        xfer(e, NOWAIT, 0);
487 <        return true;
482 >    /**
483 >     * Inserts the specified element at the tail of this queue.
484 >     * As the queue is unbounded, this method will never block or
485 >     * return {@code false}.
486 >     *
487 >     * @return {@code true} (as specified by
488 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
489 >     * @throws NullPointerException if the specified element is null
490 >     */
491 >    public boolean offer(E e, long timeout, TimeUnit unit) {
492 >        return offer(e);
493      }
494  
495 +    /**
496 +     * Inserts the specified element at the tail of this queue.
497 +     * As the queue is unbounded, this method will never return {@code false}.
498 +     *
499 +     * @return {@code true} (as specified by
500 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
501 +     * @throws NullPointerException if the specified element is null
502 +     */
503      public boolean offer(E e) {
504          if (e == null) throw new NullPointerException();
505          xfer(e, NOWAIT, 0);
506          return true;
507      }
508  
509 +    /**
510 +     * Inserts the specified element at the tail of this queue.
511 +     * As the queue is unbounded, this method will never throw
512 +     * {@link IllegalStateException} or return {@code false}.
513 +     *
514 +     * @return {@code true} (as specified by {@link Collection#add})
515 +     * @throws NullPointerException if the specified element is null
516 +     */
517      public boolean add(E e) {
518 +        return offer(e);
519 +    }
520 +
521 +    /**
522 +     * Transfers the element to a waiting consumer immediately, if possible.
523 +     *
524 +     * <p>More precisely, transfers the specified element immediately
525 +     * if there exists a consumer already waiting to receive it (in
526 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
527 +     * otherwise returning {@code false} without enqueuing the element.
528 +     *
529 +     * @throws NullPointerException if the specified element is null
530 +     */
531 +    public boolean tryTransfer(E e) {
532          if (e == null) throw new NullPointerException();
533 <        xfer(e, NOWAIT, 0);
463 <        return true;
533 >        return fulfill(e) != null;
534      }
535  
536 +    /**
537 +     * Transfers the element to a consumer, waiting if necessary to do so.
538 +     *
539 +     * <p>More precisely, transfers the specified element immediately
540 +     * if there exists a consumer already waiting to receive it (in
541 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
542 +     * else inserts the specified element at the tail of this queue
543 +     * and waits until the element is received by a consumer.
544 +     *
545 +     * @throws NullPointerException if the specified element is null
546 +     */
547      public void transfer(E e) throws InterruptedException {
548          if (e == null) throw new NullPointerException();
549          if (xfer(e, WAIT, 0) == null) {
# Line 471 | Line 552 | public class LinkedTransferQueue<E> exte
552          }
553      }
554  
555 +    /**
556 +     * Transfers the element to a consumer if it is possible to do so
557 +     * before the timeout elapses.
558 +     *
559 +     * <p>More precisely, transfers the specified element immediately
560 +     * if there exists a consumer already waiting to receive it (in
561 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
562 +     * else inserts the specified element at the tail of this queue
563 +     * and waits until the element is received by a consumer,
564 +     * returning {@code false} if the specified wait time elapses
565 +     * before the element can be transferred.
566 +     *
567 +     * @throws NullPointerException if the specified element is null
568 +     */
569      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
570          throws InterruptedException {
571          if (e == null) throw new NullPointerException();
# Line 481 | Line 576 | public class LinkedTransferQueue<E> exte
576          throw new InterruptedException();
577      }
578  
484    public boolean tryTransfer(E e) {
485        if (e == null) throw new NullPointerException();
486        return fulfill(e) != null;
487    }
488
579      public E take() throws InterruptedException {
580 <        Object e = xfer(null, WAIT, 0);
580 >        E e = xfer(null, WAIT, 0);
581          if (e != null)
582 <            return (E) e;
582 >            return e;
583          Thread.interrupted();
584          throw new InterruptedException();
585      }
586  
587      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
588 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
588 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
589          if (e != null || !Thread.interrupted())
590 <            return (E) e;
590 >            return e;
591          throw new InterruptedException();
592      }
593  
# Line 505 | Line 595 | public class LinkedTransferQueue<E> exte
595          return fulfill(null);
596      }
597  
598 +    /**
599 +     * @throws NullPointerException     {@inheritDoc}
600 +     * @throws IllegalArgumentException {@inheritDoc}
601 +     */
602      public int drainTo(Collection<? super E> c) {
603          if (c == null)
604              throw new NullPointerException();
# Line 519 | Line 613 | public class LinkedTransferQueue<E> exte
613          return n;
614      }
615  
616 +    /**
617 +     * @throws NullPointerException     {@inheritDoc}
618 +     * @throws IllegalArgumentException {@inheritDoc}
619 +     */
620      public int drainTo(Collection<? super E> c, int maxElements) {
621          if (c == null)
622              throw new NullPointerException();
# Line 542 | Line 640 | public class LinkedTransferQueue<E> exte
640          for (;;) {
641              Node<E> t = tail.get();
642              Node<E> h = head.get();
643 <            if (h != null && t != null) {
644 <                Node<E> last = t.next;
645 <                Node<E> first = h.next;
646 <                if (t == tail.get()) {
647 <                    if (last != null)
648 <                        tail.compareAndSet(t, last);
649 <                    else if (first != null) {
650 <                        Object x = first.get();
651 <                        if (x == first)
554 <                            advanceHead(h, first);
555 <                        else
556 <                            return h;
557 <                    }
643 >            Node<E> last = t.next;
644 >            Node<E> first = h.next;
645 >            if (t == tail.get()) {
646 >                if (last != null)
647 >                    tail.compareAndSet(t, last);
648 >                else if (first != null) {
649 >                    Object x = first.get();
650 >                    if (x == first)
651 >                        advanceHead(h, first);
652                      else
653                          return h;
654                  }
655 +                else
656 +                    return h;
657              }
658              reclean();
659          }
660      }
661  
662 <
662 >    /**
663 >     * Returns an iterator over the elements in this queue in proper
664 >     * sequence, from head to tail.
665 >     *
666 >     * <p>The returned iterator is a "weakly consistent" iterator that
667 >     * will never throw
668 >     * {@link ConcurrentModificationException ConcurrentModificationException},
669 >     * and guarantees to traverse elements as they existed upon
670 >     * construction of the iterator, and may (but is not guaranteed
671 >     * to) reflect any modifications subsequent to construction.
672 >     *
673 >     * @return an iterator over the elements in this queue in proper sequence
674 >     */
675      public Iterator<E> iterator() {
676          return new Itr();
677      }
# Line 578 | Line 686 | public class LinkedTransferQueue<E> exte
686      class Itr implements Iterator<E> {
687          Node<E> next;        // node to return next
688          Node<E> pnext;       // predecessor of next
581        Node<E> snext;       // successor of next
689          Node<E> curr;        // last returned node, for remove()
690          Node<E> pcurr;       // predecessor of curr, for remove()
691 <        E nextItem;        // Cache of next item, once committed to in next
691 >        E nextItem;          // Cache of next item, once committed to in next
692  
693          Itr() {
694 <            findNext();
694 >            advance();
695          }
696  
697          /**
698 <         * Ensures next points to next valid node, or null if none.
698 >         * Moves to next valid node and returns item to return for
699 >         * next(), or null if no such.
700           */
701 <        void findNext() {
701 >        private E advance() {
702 >            pcurr = pnext;
703 >            curr = next;
704 >            E item = nextItem;
705 >
706              for (;;) {
707 <                Node<E> pred = pnext;
708 <                Node<E> q = next;
709 <                if (pred == null || pred == q) {
598 <                    pred = traversalHead();
599 <                    q = pred.next;
600 <                }
601 <                if (q == null || !q.isData) {
707 >                pnext = (next == null) ? traversalHead() : next;
708 >                next = pnext.next;
709 >                if (next == pnext) {
710                      next = null;
711 <                    return;
711 >                    continue;  // restart
712                  }
713 <                Object x = q.get();
714 <                Node<E> s = q.next;
715 <                if (x != null && q != x && q != s) {
713 >                if (next == null)
714 >                    break;
715 >                Object x = next.get();
716 >                if (x != null && x != next) {
717                      nextItem = (E) x;
718 <                    snext = s;
610 <                    pnext = pred;
611 <                    next = q;
612 <                    return;
718 >                    break;
719                  }
614                pnext = q;
615                next = s;
720              }
721 +            return item;
722          }
723  
724          public boolean hasNext() {
# Line 621 | Line 726 | public class LinkedTransferQueue<E> exte
726          }
727  
728          public E next() {
729 <            if (next == null) throw new NoSuchElementException();
730 <            pcurr = pnext;
731 <            curr = next;
627 <            pnext = next;
628 <            next = snext;
629 <            E x = nextItem;
630 <            findNext();
631 <            return x;
729 >            if (next == null)
730 >                throw new NoSuchElementException();
731 >            return advance();
732          }
733  
734          public void remove() {
# Line 657 | Line 757 | public class LinkedTransferQueue<E> exte
757          }
758      }
759  
760 +    /**
761 +     * Returns {@code true} if this queue contains no elements.
762 +     *
763 +     * @return {@code true} if this queue contains no elements
764 +     */
765      public boolean isEmpty() {
766          for (;;) {
767              Node<E> h = traversalHead();
# Line 698 | Line 803 | public class LinkedTransferQueue<E> exte
803       * @return the number of elements in this queue
804       */
805      public int size() {
806 <        int count = 0;
807 <        Node<E> h = traversalHead();
808 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
809 <            Object x = p.get();
810 <            if (x != null && x != p) {
811 <                if (++count == Integer.MAX_VALUE) // saturated
806 >        for (;;) {
807 >            int count = 0;
808 >            Node<E> pred = traversalHead();
809 >            for (;;) {
810 >                Node<E> q = pred.next;
811 >                if (q == pred) // restart
812                      break;
813 +                if (q == null || !q.isData)
814 +                    return count;
815 +                Object x = q.get();
816 +                if (x != null && x != q) {
817 +                    if (++count == Integer.MAX_VALUE) // saturated
818 +                        return count;
819 +                }
820 +                pred = q;
821              }
822          }
710        return count;
823      }
824  
825      public int getWaitingConsumerCount() {
826 <        int count = 0;
827 <        Node<E> h = traversalHead();
828 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
829 <            if (p.get() == null) {
830 <                if (++count == Integer.MAX_VALUE)
826 >        // converse of size -- count valid non-data nodes
827 >        for (;;) {
828 >            int count = 0;
829 >            Node<E> pred = traversalHead();
830 >            for (;;) {
831 >                Node<E> q = pred.next;
832 >                if (q == pred) // restart
833                      break;
834 +                if (q == null || q.isData)
835 +                    return count;
836 +                Object x = q.get();
837 +                if (x == null) {
838 +                    if (++count == Integer.MAX_VALUE) // saturated
839 +                        return count;
840 +                }
841 +                pred = q;
842              }
843          }
722        return count;
723    }
724
725    public int remainingCapacity() {
726        return Integer.MAX_VALUE;
844      }
845  
846 +    /**
847 +     * Removes a single instance of the specified element from this queue,
848 +     * if it is present.  More formally, removes an element {@code e} such
849 +     * that {@code o.equals(e)}, if this queue contains one or more such
850 +     * elements.
851 +     * Returns {@code true} if this queue contained the specified element
852 +     * (or equivalently, if this queue changed as a result of the call).
853 +     *
854 +     * @param o element to be removed from this queue, if present
855 +     * @return {@code true} if this queue changed as a result of the call
856 +     */
857      public boolean remove(Object o) {
858          if (o == null)
859              return false;
# Line 733 | Line 861 | public class LinkedTransferQueue<E> exte
861              Node<E> pred = traversalHead();
862              for (;;) {
863                  Node<E> q = pred.next;
736                if (q == null || !q.isData)
737                    return false;
864                  if (q == pred) // restart
865                      break;
866 +                if (q == null || !q.isData)
867 +                    return false;
868                  Object x = q.get();
869                  if (x != null && x != q && o.equals(x) &&
870                      q.compareAndSet(x, q)) {
# Line 749 | Line 877 | public class LinkedTransferQueue<E> exte
877      }
878  
879      /**
880 +     * Always returns {@code Integer.MAX_VALUE} because a
881 +     * {@code LinkedTransferQueue} is not capacity constrained.
882 +     *
883 +     * @return {@code Integer.MAX_VALUE} (as specified by
884 +     *         {@link BlockingQueue#remainingCapacity()})
885 +     */
886 +    public int remainingCapacity() {
887 +        return Integer.MAX_VALUE;
888 +    }
889 +
890 +    /**
891       * Save the state to a stream (that is, serialize it).
892       *
893       * @serialData All of the elements (each an {@code E}) in
# Line 794 | Line 933 | public class LinkedTransferQueue<E> exte
933                                   new PaddedAtomicReference<Node<E>>(null));
934      }
935  
936 <    // Unsafe mechanics for jsr166y 3rd party package.
936 >    // Unsafe mechanics
937 >
938 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
939 >    private static final long headOffset =
940 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
941 >    private static final long tailOffset =
942 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
943 >    private static final long cleanMeOffset =
944 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
945 >
946 >
947 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
948 >                                  String field, Class<?> klazz) {
949 >        try {
950 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
951 >        } catch (NoSuchFieldException e) {
952 >            // Convert Exception to corresponding Error
953 >            NoSuchFieldError error = new NoSuchFieldError(field);
954 >            error.initCause(e);
955 >            throw error;
956 >        }
957 >    }
958 >
959 >    /**
960 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
961 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
962 >     * into a jdk.
963 >     *
964 >     * @return a sun.misc.Unsafe
965 >     */
966      private static sun.misc.Unsafe getUnsafe() {
967          try {
968              return sun.misc.Unsafe.getUnsafe();
969          } catch (SecurityException se) {
970              try {
971                  return java.security.AccessController.doPrivileged
972 <                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
972 >                    (new java.security
973 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
974                          public sun.misc.Unsafe run() throws Exception {
975 <                            return getUnsafeByReflection();
975 >                            java.lang.reflect.Field f = sun.misc
976 >                                .Unsafe.class.getDeclaredField("theUnsafe");
977 >                            f.setAccessible(true);
978 >                            return (sun.misc.Unsafe) f.get(null);
979                          }});
980              } catch (java.security.PrivilegedActionException e) {
981                  throw new RuntimeException("Could not initialize intrinsics",
# Line 811 | Line 983 | public class LinkedTransferQueue<E> exte
983              }
984          }
985      }
814
815    private static sun.misc.Unsafe getUnsafeByReflection()
816            throws NoSuchFieldException, IllegalAccessException {
817        java.lang.reflect.Field f =
818            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
819        f.setAccessible(true);
820        return (sun.misc.Unsafe) f.get(null);
821    }
822
823    private static long fieldOffset(String fieldName, Class<?> klazz) {
824        try {
825            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
826        } catch (NoSuchFieldException e) {
827            // Convert Exception to Error
828            NoSuchFieldError error = new NoSuchFieldError(fieldName);
829            error.initCause(e);
830            throw error;
831        }
832    }
833
834    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
835    static final long headOffset =
836        fieldOffset("head", LinkedTransferQueue.class);
837    static final long tailOffset =
838        fieldOffset("tail", LinkedTransferQueue.class);
839    static final long cleanMeOffset =
840        fieldOffset("cleanMe", LinkedTransferQueue.class);
841
986   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines