ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.25 by jsr166, Fri Jul 24 23:48:26 2009 UTC vs.
Revision 1.36 by jsr166, Fri Jul 31 07:30:29 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.ConcurrentModificationException;
14 > import java.util.Iterator;
15 > import java.util.NoSuchElementException;
16 > import java.util.Queue;
17 > import java.util.concurrent.locks.LockSupport;
18 > import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 110 | Line 116 | public class LinkedTransferQueue<E> exte
116              this.isData = isData;
117          }
118  
119 <        @SuppressWarnings("rawtypes")
120 <        static final AtomicReferenceFieldUpdater<Node, Node>
121 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
122 <            (Node.class, Node.class, "next");
119 >        // Unsafe mechanics
120 >
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124  
125          final boolean casNext(Node<E> cmp, Node<E> val) {
126 <            return nextUpdater.compareAndSet(this, cmp, val);
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127          }
128  
129          final void clearNext() {
130 <            nextUpdater.lazySet(this, this);
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160  
161          private static final long serialVersionUID = -3375979862319811754L;
# Line 437 | Line 472 | public class LinkedTransferQueue<E> exte
472          addAll(c);
473      }
474  
475 <    public void put(E e) throws InterruptedException {
476 <        if (e == null) throw new NullPointerException();
477 <        if (Thread.interrupted()) throw new InterruptedException();
478 <        xfer(e, NOWAIT, 0);
475 >    /**
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480 >     */
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485 <    public boolean offer(E e, long timeout, TimeUnit unit)
486 <        throws InterruptedException {
487 <        if (e == null) throw new NullPointerException();
488 <        if (Thread.interrupted()) throw new InterruptedException();
489 <        xfer(e, NOWAIT, 0);
490 <        return true;
485 >    /**
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493 >     */
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498 +    /**
499 +     * Inserts the specified element at the tail of this queue.
500 +     * As the queue is unbounded, this method will never return {@code false}.
501 +     *
502 +     * @return {@code true} (as specified by
503 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 +     * @throws NullPointerException if the specified element is null
505 +     */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
508          xfer(e, NOWAIT, 0);
509          return true;
510      }
511  
512 +    /**
513 +     * Inserts the specified element at the tail of this queue.
514 +     * As the queue is unbounded this method will never throw
515 +     * {@link IllegalStateException} or return {@code false}.
516 +     *
517 +     * @return {@code true} (as specified by {@link Collection#add})
518 +     * @throws NullPointerException if the specified element is null
519 +     */
520      public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the specified element immediately if there exists a
526 +     * consumer already waiting to receive it (in {@link #take} or
527 +     * timed {@link #poll(long,TimeUnit) poll}), otherwise
528 +     * returning {@code false} without enqueuing the element.
529 +     *
530 +     * @throws NullPointerException if the specified element is null
531 +     */
532 +    public boolean tryTransfer(E e) {
533          if (e == null) throw new NullPointerException();
534 <        xfer(e, NOWAIT, 0);
463 <        return true;
534 >        return fulfill(e) != null;
535      }
536  
537 +    /**
538 +     * Inserts the specified element at the tail of this queue,
539 +     * waiting if necessary for the element to be received by a
540 +     * consumer invoking {@code take} or {@code poll}.
541 +     *
542 +     * @throws NullPointerException if the specified element is null
543 +     */
544      public void transfer(E e) throws InterruptedException {
545          if (e == null) throw new NullPointerException();
546          if (xfer(e, WAIT, 0) == null) {
# Line 471 | Line 549 | public class LinkedTransferQueue<E> exte
549          }
550      }
551  
552 +    /**
553 +     * Inserts the specified element at the tail of this queue,
554 +     * waiting up to the specified wait time for the element to be
555 +     * received by a consumer invoking {@code take} or {@code poll}.
556 +     *
557 +     * @throws NullPointerException if the specified element is null
558 +     */
559      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
560          throws InterruptedException {
561          if (e == null) throw new NullPointerException();
# Line 481 | Line 566 | public class LinkedTransferQueue<E> exte
566          throw new InterruptedException();
567      }
568  
484    public boolean tryTransfer(E e) {
485        if (e == null) throw new NullPointerException();
486        return fulfill(e) != null;
487    }
488
569      public E take() throws InterruptedException {
570 <        Object e = xfer(null, WAIT, 0);
570 >        E e = xfer(null, WAIT, 0);
571          if (e != null)
572 <            return (E) e;
572 >            return e;
573          Thread.interrupted();
574          throw new InterruptedException();
575      }
576  
577      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
578 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
578 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
579          if (e != null || !Thread.interrupted())
580 <            return (E) e;
580 >            return e;
581          throw new InterruptedException();
582      }
583  
# Line 505 | Line 585 | public class LinkedTransferQueue<E> exte
585          return fulfill(null);
586      }
587  
588 +    /**
589 +     * @throws NullPointerException     {@inheritDoc}
590 +     * @throws IllegalArgumentException {@inheritDoc}
591 +     */
592      public int drainTo(Collection<? super E> c) {
593          if (c == null)
594              throw new NullPointerException();
# Line 519 | Line 603 | public class LinkedTransferQueue<E> exte
603          return n;
604      }
605  
606 +    /**
607 +     * @throws NullPointerException     {@inheritDoc}
608 +     * @throws IllegalArgumentException {@inheritDoc}
609 +     */
610      public int drainTo(Collection<? super E> c, int maxElements) {
611          if (c == null)
612              throw new NullPointerException();
# Line 563 | Line 651 | public class LinkedTransferQueue<E> exte
651          }
652      }
653  
654 <
654 >    /**
655 >     * Returns an iterator over the elements in this queue in proper
656 >     * sequence, from head to tail.
657 >     *
658 >     * <p>The returned iterator is a "weakly consistent" iterator that
659 >     * will never throw
660 >     * {@link ConcurrentModificationException ConcurrentModificationException},
661 >     * and guarantees to traverse elements as they existed upon
662 >     * construction of the iterator, and may (but is not guaranteed
663 >     * to) reflect any modifications subsequent to construction.
664 >     *
665 >     * @return an iterator over the elements in this queue in proper sequence
666 >     */
667      public Iterator<E> iterator() {
668          return new Itr();
669      }
# Line 578 | Line 678 | public class LinkedTransferQueue<E> exte
678      class Itr implements Iterator<E> {
679          Node<E> next;        // node to return next
680          Node<E> pnext;       // predecessor of next
581        Node<E> snext;       // successor of next
681          Node<E> curr;        // last returned node, for remove()
682          Node<E> pcurr;       // predecessor of curr, for remove()
683 <        E nextItem;        // Cache of next item, once committed to in next
683 >        E nextItem;          // Cache of next item, once committed to in next
684  
685          Itr() {
686 <            findNext();
686 >            advance();
687          }
688  
689          /**
690 <         * Ensures next points to next valid node, or null if none.
690 >         * Moves to next valid node and returns item to return for
691 >         * next(), or null if no such.
692           */
693 <        void findNext() {
693 >        private E advance() {
694 >            pcurr = pnext;
695 >            curr = next;
696 >            E item = nextItem;
697 >
698              for (;;) {
699 <                Node<E> pred = pnext;
700 <                Node<E> q = next;
701 <                if (pred == null || pred == q) {
598 <                    pred = traversalHead();
599 <                    q = pred.next;
600 <                }
601 <                if (q == null || !q.isData) {
699 >                pnext = (next == null) ? traversalHead() : next;
700 >                next = pnext.next;
701 >                if (next == pnext) {
702                      next = null;
703 <                    return;
703 >                    continue;  // restart
704                  }
705 <                Object x = q.get();
706 <                Node<E> s = q.next;
707 <                if (x != null && q != x && q != s) {
705 >                if (next == null)
706 >                    break;
707 >                Object x = next.get();
708 >                if (x != null && x != next) {
709                      nextItem = (E) x;
710 <                    snext = s;
610 <                    pnext = pred;
611 <                    next = q;
612 <                    return;
710 >                    break;
711                  }
614                pnext = q;
615                next = s;
712              }
713 +            return item;
714          }
715  
716          public boolean hasNext() {
# Line 621 | Line 718 | public class LinkedTransferQueue<E> exte
718          }
719  
720          public E next() {
721 <            if (next == null) throw new NoSuchElementException();
722 <            pcurr = pnext;
723 <            curr = next;
627 <            pnext = next;
628 <            next = snext;
629 <            E x = nextItem;
630 <            findNext();
631 <            return x;
721 >            if (next == null)
722 >                throw new NoSuchElementException();
723 >            return advance();
724          }
725  
726          public void remove() {
# Line 698 | Line 790 | public class LinkedTransferQueue<E> exte
790       * @return the number of elements in this queue
791       */
792      public int size() {
793 <        int count = 0;
794 <        Node<E> h = traversalHead();
795 <        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
796 <            Object x = p.get();
797 <            if (x != null && x != p) {
798 <                if (++count == Integer.MAX_VALUE) // saturated
793 >        for (;;) {
794 >            int count = 0;
795 >            Node<E> pred = traversalHead();
796 >            for (;;) {
797 >                Node<E> q = pred.next;
798 >                if (q == pred) // restart
799                      break;
800 +                if (q == null || !q.isData)
801 +                    return count;
802 +                Object x = q.get();
803 +                if (x != null && x != q) {
804 +                    if (++count == Integer.MAX_VALUE) // saturated
805 +                        return count;
806 +                }
807 +                pred = q;
808              }
809          }
710        return count;
810      }
811  
812      public int getWaitingConsumerCount() {
813 <        int count = 0;
814 <        Node<E> h = traversalHead();
815 <        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
816 <            if (p.get() == null) {
817 <                if (++count == Integer.MAX_VALUE)
813 >        // converse of size -- count valid non-data nodes
814 >        for (;;) {
815 >            int count = 0;
816 >            Node<E> pred = traversalHead();
817 >            for (;;) {
818 >                Node<E> q = pred.next;
819 >                if (q == pred) // restart
820                      break;
821 +                if (q == null || q.isData)
822 +                    return count;
823 +                Object x = q.get();
824 +                if (x == null) {
825 +                    if (++count == Integer.MAX_VALUE) // saturated
826 +                        return count;
827 +                }
828 +                pred = q;
829              }
830          }
722        return count;
723    }
724
725    public int remainingCapacity() {
726        return Integer.MAX_VALUE;
831      }
832  
833      public boolean remove(Object o) {
# Line 733 | Line 837 | public class LinkedTransferQueue<E> exte
837              Node<E> pred = traversalHead();
838              for (;;) {
839                  Node<E> q = pred.next;
736                if (q == null || !q.isData)
737                    return false;
840                  if (q == pred) // restart
841                      break;
842 +                if (q == null || !q.isData)
843 +                    return false;
844                  Object x = q.get();
845                  if (x != null && x != q && o.equals(x) &&
846                      q.compareAndSet(x, q)) {
# Line 749 | Line 853 | public class LinkedTransferQueue<E> exte
853      }
854  
855      /**
856 +     * Always returns {@code Integer.MAX_VALUE} because a
857 +     * {@code LinkedTransferQueue} is not capacity constrained.
858 +     *
859 +     * @return {@code Integer.MAX_VALUE} (as specified by
860 +     *         {@link BlockingQueue#remainingCapacity()})
861 +     */
862 +    public int remainingCapacity() {
863 +        return Integer.MAX_VALUE;
864 +    }
865 +
866 +    /**
867       * Save the state to a stream (that is, serialize it).
868       *
869       * @serialData All of the elements (each an {@code E}) in
# Line 794 | Line 909 | public class LinkedTransferQueue<E> exte
909                                   new PaddedAtomicReference<Node<E>>(null));
910      }
911  
912 <    // Unsafe mechanics for jsr166y 3rd party package.
912 >    // Unsafe mechanics
913 >
914 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
915 >    private static final long headOffset =
916 >        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
917 >    private static final long tailOffset =
918 >        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
919 >    private static final long cleanMeOffset =
920 >        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
921 >
922 >
923 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
924 >                                  String field, Class<?> klazz) {
925 >        try {
926 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
927 >        } catch (NoSuchFieldException e) {
928 >            // Convert Exception to corresponding Error
929 >            NoSuchFieldError error = new NoSuchFieldError(field);
930 >            error.initCause(e);
931 >            throw error;
932 >        }
933 >    }
934 >
935 >    /**
936 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
937 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
938 >     * into a jdk.
939 >     *
940 >     * @return a sun.misc.Unsafe
941 >     */
942      private static sun.misc.Unsafe getUnsafe() {
943          try {
944              return sun.misc.Unsafe.getUnsafe();
945          } catch (SecurityException se) {
946              try {
947                  return java.security.AccessController.doPrivileged
948 <                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
948 >                    (new java.security
949 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
950                          public sun.misc.Unsafe run() throws Exception {
951 <                            return getUnsafeByReflection();
951 >                            java.lang.reflect.Field f = sun.misc
952 >                                .Unsafe.class.getDeclaredField("theUnsafe");
953 >                            f.setAccessible(true);
954 >                            return (sun.misc.Unsafe) f.get(null);
955                          }});
956              } catch (java.security.PrivilegedActionException e) {
957                  throw new RuntimeException("Could not initialize intrinsics",
# Line 811 | Line 959 | public class LinkedTransferQueue<E> exte
959              }
960          }
961      }
814
815    private static sun.misc.Unsafe getUnsafeByReflection()
816            throws NoSuchFieldException, IllegalAccessException {
817        java.lang.reflect.Field f =
818            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
819        f.setAccessible(true);
820        return (sun.misc.Unsafe) f.get(null);
821    }
822
823    private static long fieldOffset(String fieldName, Class<?> klazz) {
824        try {
825            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
826        } catch (NoSuchFieldException e) {
827            // Convert Exception to Error
828            NoSuchFieldError error = new NoSuchFieldError(fieldName);
829            error.initCause(e);
830            throw error;
831        }
832    }
833
834    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
835    static final long headOffset =
836        fieldOffset("head", LinkedTransferQueue.class);
837    static final long tailOffset =
838        fieldOffset("tail", LinkedTransferQueue.class);
839    static final long cleanMeOffset =
840        fieldOffset("cleanMe", LinkedTransferQueue.class);
841
962   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines