ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.25 by jsr166, Fri Jul 24 23:48:26 2009 UTC vs.
Revision 1.33 by dl, Thu Jul 30 13:30:19 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.Iterator;
14 > import java.util.NoSuchElementException;
15 > import java.util.concurrent.locks.LockSupport;
16 > import java.util.concurrent.atomic.AtomicReference;
17  
18   /**
19   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 110 | Line 114 | public class LinkedTransferQueue<E> exte
114              this.isData = isData;
115          }
116  
117 <        @SuppressWarnings("rawtypes")
118 <        static final AtomicReferenceFieldUpdater<Node, Node>
119 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
120 <            (Node.class, Node.class, "next");
117 >        // Unsafe mechanics
118 >
119 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
120 >        private static final long nextOffset =
121 >            objectFieldOffset(UNSAFE, "next", Node.class);
122  
123          final boolean casNext(Node<E> cmp, Node<E> val) {
124 <            return nextUpdater.compareAndSet(this, cmp, val);
124 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
125          }
126  
127          final void clearNext() {
128 <            nextUpdater.lazySet(this, this);
128 >            UNSAFE.putOrderedObject(this, nextOffset, this);
129 >        }
130 >
131 >        /**
132 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
133 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
134 >         * into a jdk.
135 >         *
136 >         * @return a sun.misc.Unsafe
137 >         */
138 >        private static sun.misc.Unsafe getUnsafe() {
139 >            try {
140 >                return sun.misc.Unsafe.getUnsafe();
141 >            } catch (SecurityException se) {
142 >                try {
143 >                    return java.security.AccessController.doPrivileged
144 >                        (new java.security
145 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
146 >                            public sun.misc.Unsafe run() throws Exception {
147 >                                java.lang.reflect.Field f = sun.misc
148 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
149 >                                f.setAccessible(true);
150 >                                return (sun.misc.Unsafe) f.get(null);
151 >                            }});
152 >                } catch (java.security.PrivilegedActionException e) {
153 >                    throw new RuntimeException("Could not initialize intrinsics",
154 >                                               e.getCause());
155 >                }
156 >            }
157          }
158  
159          private static final long serialVersionUID = -3375979862319811754L;
# Line 437 | Line 470 | public class LinkedTransferQueue<E> exte
470          addAll(c);
471      }
472  
473 +    /**
474 +     * @throws InterruptedException {@inheritDoc}
475 +     * @throws NullPointerException {@inheritDoc}
476 +     */
477      public void put(E e) throws InterruptedException {
478          if (e == null) throw new NullPointerException();
479          if (Thread.interrupted()) throw new InterruptedException();
480          xfer(e, NOWAIT, 0);
481      }
482  
483 +    /**
484 +     * @throws InterruptedException {@inheritDoc}
485 +     * @throws NullPointerException {@inheritDoc}
486 +     */
487      public boolean offer(E e, long timeout, TimeUnit unit)
488          throws InterruptedException {
489          if (e == null) throw new NullPointerException();
# Line 451 | Line 492 | public class LinkedTransferQueue<E> exte
492          return true;
493      }
494  
495 +    /**
496 +     * @throws NullPointerException {@inheritDoc}
497 +     */
498      public boolean offer(E e) {
499          if (e == null) throw new NullPointerException();
500          xfer(e, NOWAIT, 0);
501          return true;
502      }
503  
504 +    /**
505 +     * @throws NullPointerException {@inheritDoc}
506 +     */
507      public boolean add(E e) {
508          if (e == null) throw new NullPointerException();
509          xfer(e, NOWAIT, 0);
510          return true;
511      }
512  
513 +    /**
514 +     * @throws InterruptedException {@inheritDoc}
515 +     * @throws NullPointerException {@inheritDoc}
516 +     */
517      public void transfer(E e) throws InterruptedException {
518          if (e == null) throw new NullPointerException();
519          if (xfer(e, WAIT, 0) == null) {
# Line 471 | Line 522 | public class LinkedTransferQueue<E> exte
522          }
523      }
524  
525 +    /**
526 +     * @throws InterruptedException {@inheritDoc}
527 +     * @throws NullPointerException {@inheritDoc}
528 +     */
529      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
530          throws InterruptedException {
531          if (e == null) throw new NullPointerException();
# Line 481 | Line 536 | public class LinkedTransferQueue<E> exte
536          throw new InterruptedException();
537      }
538  
539 +    /**
540 +     * @throws NullPointerException {@inheritDoc}
541 +     */
542      public boolean tryTransfer(E e) {
543          if (e == null) throw new NullPointerException();
544          return fulfill(e) != null;
545      }
546  
547 +    /**
548 +     * @throws InterruptedException {@inheritDoc}
549 +     */
550      public E take() throws InterruptedException {
551          Object e = xfer(null, WAIT, 0);
552          if (e != null)
# Line 494 | Line 555 | public class LinkedTransferQueue<E> exte
555          throw new InterruptedException();
556      }
557  
558 +    /**
559 +     * @throws InterruptedException {@inheritDoc}
560 +     */
561      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
562          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
563          if (e != null || !Thread.interrupted())
# Line 505 | Line 569 | public class LinkedTransferQueue<E> exte
569          return fulfill(null);
570      }
571  
572 +    /**
573 +     * @throws NullPointerException     {@inheritDoc}
574 +     * @throws IllegalArgumentException {@inheritDoc}
575 +     */
576      public int drainTo(Collection<? super E> c) {
577          if (c == null)
578              throw new NullPointerException();
# Line 519 | Line 587 | public class LinkedTransferQueue<E> exte
587          return n;
588      }
589  
590 +    /**
591 +     * @throws NullPointerException     {@inheritDoc}
592 +     * @throws IllegalArgumentException {@inheritDoc}
593 +     */
594      public int drainTo(Collection<? super E> c, int maxElements) {
595          if (c == null)
596              throw new NullPointerException();
# Line 563 | Line 635 | public class LinkedTransferQueue<E> exte
635          }
636      }
637  
566
638      public Iterator<E> iterator() {
639          return new Itr();
640      }
# Line 578 | Line 649 | public class LinkedTransferQueue<E> exte
649      class Itr implements Iterator<E> {
650          Node<E> next;        // node to return next
651          Node<E> pnext;       // predecessor of next
581        Node<E> snext;       // successor of next
652          Node<E> curr;        // last returned node, for remove()
653          Node<E> pcurr;       // predecessor of curr, for remove()
654 <        E nextItem;        // Cache of next item, once committed to in next
654 >        E nextItem;          // Cache of next item, once committed to in next
655  
656          Itr() {
657 <            findNext();
657 >            advance();
658          }
659  
660          /**
661 <         * Ensures next points to next valid node, or null if none.
661 >         * Moves to next valid node and returns item to return for
662 >         * next(), or null if no such.
663           */
664 <        void findNext() {
664 >        private E advance() {
665 >            pcurr = pnext;
666 >            curr = next;
667 >            E item = nextItem;
668 >
669              for (;;) {
670 <                Node<E> pred = pnext;
671 <                Node<E> q = next;
672 <                if (pred == null || pred == q) {
598 <                    pred = traversalHead();
599 <                    q = pred.next;
600 <                }
601 <                if (q == null || !q.isData) {
670 >                pnext = next == null ? traversalHead() : next;
671 >                next = pnext.next;
672 >                if (next == pnext) {
673                      next = null;
674 <                    return;
674 >                    continue;  // restart
675                  }
676 <                Object x = q.get();
677 <                Node<E> s = q.next;
678 <                if (x != null && q != x && q != s) {
676 >                if (next == null)
677 >                    break;
678 >                Object x = next.get();
679 >                if (x != null && x != next) {
680                      nextItem = (E) x;
681 <                    snext = s;
610 <                    pnext = pred;
611 <                    next = q;
612 <                    return;
681 >                    break;
682                  }
614                pnext = q;
615                next = s;
683              }
684 +            return item;
685          }
686  
687          public boolean hasNext() {
# Line 621 | Line 689 | public class LinkedTransferQueue<E> exte
689          }
690  
691          public E next() {
692 <            if (next == null) throw new NoSuchElementException();
693 <            pcurr = pnext;
694 <            curr = next;
627 <            pnext = next;
628 <            next = snext;
629 <            E x = nextItem;
630 <            findNext();
631 <            return x;
692 >            if (next == null)
693 >                throw new NoSuchElementException();
694 >            return advance();
695          }
696  
697          public void remove() {
# Line 698 | Line 761 | public class LinkedTransferQueue<E> exte
761       * @return the number of elements in this queue
762       */
763      public int size() {
764 <        int count = 0;
765 <        Node<E> h = traversalHead();
766 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
767 <            Object x = p.get();
768 <            if (x != null && x != p) {
769 <                if (++count == Integer.MAX_VALUE) // saturated
764 >        for (;;) {
765 >            int count = 0;
766 >            Node<E> pred = traversalHead();
767 >            for (;;) {
768 >                Node<E> q = pred.next;
769 >                if (q == pred) // restart
770                      break;
771 +                if (q == null || !q.isData)
772 +                    return count;
773 +                Object x = q.get();
774 +                if (x != null && x != q) {
775 +                    if (++count == Integer.MAX_VALUE) // saturated
776 +                        return count;
777 +                }
778 +                pred = q;
779              }
780          }
710        return count;
781      }
782  
783      public int getWaitingConsumerCount() {
784 <        int count = 0;
785 <        Node<E> h = traversalHead();
786 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
787 <            if (p.get() == null) {
788 <                if (++count == Integer.MAX_VALUE)
784 >        // converse of size -- count valid non-data nodes
785 >        for (;;) {
786 >            int count = 0;
787 >            Node<E> pred = traversalHead();
788 >            for (;;) {
789 >                Node<E> q = pred.next;
790 >                if (q == pred) // restart
791                      break;
792 +                if (q == null || q.isData)
793 +                    return count;
794 +                Object x = q.get();
795 +                if (x == null) {
796 +                    if (++count == Integer.MAX_VALUE) // saturated
797 +                        return count;
798 +                }
799 +                pred = q;
800              }
801          }
722        return count;
723    }
724
725    public int remainingCapacity() {
726        return Integer.MAX_VALUE;
802      }
803  
804      public boolean remove(Object o) {
# Line 733 | Line 808 | public class LinkedTransferQueue<E> exte
808              Node<E> pred = traversalHead();
809              for (;;) {
810                  Node<E> q = pred.next;
736                if (q == null || !q.isData)
737                    return false;
811                  if (q == pred) // restart
812                      break;
813 +                if (q == null || !q.isData)
814 +                    return false;
815                  Object x = q.get();
816                  if (x != null && x != q && o.equals(x) &&
817                      q.compareAndSet(x, q)) {
# Line 748 | Line 823 | public class LinkedTransferQueue<E> exte
823          }
824      }
825  
826 +    public int remainingCapacity() {
827 +        return Integer.MAX_VALUE;
828 +    }
829 +
830      /**
831       * Save the state to a stream (that is, serialize it).
832       *
# Line 794 | Line 873 | public class LinkedTransferQueue<E> exte
873                                   new PaddedAtomicReference<Node<E>>(null));
874      }
875  
876 <    // Unsafe mechanics for jsr166y 3rd party package.
876 >    // Unsafe mechanics
877 >
878 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
879 >    private static final long headOffset =
880 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
881 >    private static final long tailOffset =
882 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
883 >    private static final long cleanMeOffset =
884 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
885 >
886 >
887 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
888 >                                  String field, Class<?> klazz) {
889 >        try {
890 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
891 >        } catch (NoSuchFieldException e) {
892 >            // Convert Exception to corresponding Error
893 >            NoSuchFieldError error = new NoSuchFieldError(field);
894 >            error.initCause(e);
895 >            throw error;
896 >        }
897 >    }
898 >
899 >    /**
900 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
901 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
902 >     * into a jdk.
903 >     *
904 >     * @return a sun.misc.Unsafe
905 >     */
906      private static sun.misc.Unsafe getUnsafe() {
907          try {
908              return sun.misc.Unsafe.getUnsafe();
909          } catch (SecurityException se) {
910              try {
911                  return java.security.AccessController.doPrivileged
912 <                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
912 >                    (new java.security
913 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
914                          public sun.misc.Unsafe run() throws Exception {
915 <                            return getUnsafeByReflection();
915 >                            java.lang.reflect.Field f = sun.misc
916 >                                .Unsafe.class.getDeclaredField("theUnsafe");
917 >                            f.setAccessible(true);
918 >                            return (sun.misc.Unsafe) f.get(null);
919                          }});
920              } catch (java.security.PrivilegedActionException e) {
921                  throw new RuntimeException("Could not initialize intrinsics",
# Line 811 | Line 923 | public class LinkedTransferQueue<E> exte
923              }
924          }
925      }
814
815    private static sun.misc.Unsafe getUnsafeByReflection()
816            throws NoSuchFieldException, IllegalAccessException {
817        java.lang.reflect.Field f =
818            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
819        f.setAccessible(true);
820        return (sun.misc.Unsafe) f.get(null);
821    }
822
823    private static long fieldOffset(String fieldName, Class<?> klazz) {
824        try {
825            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
826        } catch (NoSuchFieldException e) {
827            // Convert Exception to Error
828            NoSuchFieldError error = new NoSuchFieldError(fieldName);
829            error.initCause(e);
830            throw error;
831        }
832    }
833
834    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
835    static final long headOffset =
836        fieldOffset("head", LinkedTransferQueue.class);
837    static final long tailOffset =
838        fieldOffset("tail", LinkedTransferQueue.class);
839    static final long cleanMeOffset =
840        fieldOffset("cleanMe", LinkedTransferQueue.class);
841
926   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines