ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.74 by jsr166, Wed Sep 1 21:43:08 2010 UTC vs.
Revision 1.84 by jsr166, Wed May 25 16:08:03 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.AbstractQueue;
10   import java.util.Collection;
13 import java.util.ConcurrentModificationException;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13   import java.util.Queue;
14 + import java.util.concurrent.TimeUnit;
15   import java.util.concurrent.locks.LockSupport;
16  
17   /**
# Line 24 | Line 22 | import java.util.concurrent.locks.LockSu
22   * producer.  The <em>tail</em> of the queue is that element that has
23   * been on the queue the shortest time for some producer.
24   *
25 < * <p>Beware that, unlike in most collections, the {@code size}
26 < * method is <em>NOT</em> a constant-time operation. Because of the
25 > * <p>Beware that, unlike in most collections, the {@code size} method
26 > * is <em>NOT</em> a constant-time operation. Because of the
27   * asynchronous nature of these queues, determining the current number
28 < * of elements requires a traversal of the elements.
28 > * of elements requires a traversal of the elements, and so may report
29 > * inaccurate results if this collection is modified during traversal.
30 > * Additionally, the bulk operations {@code addAll},
31 > * {@code removeAll}, {@code retainAll}, {@code containsAll},
32 > * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
33 > * to be performed atomically. For example, an iterator operating
34 > * concurrently with an {@code addAll} operation might view only some
35 > * of the added elements.
36   *
37   * <p>This class and its iterator implement all of the
38   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 344 | Line 349 | public class LinkedTransferQueue<E> exte
349       * When these cases arise, rather than always retraversing the
350       * entire list to find an actual predecessor to unlink (which
351       * won't help for case (1) anyway), we record a conservative
352 <     * estimate of possible unsplice failures (in "sweepVotes").  We
353 <     * trigger a full sweep when the estimate exceeds a threshold
354 <     * indicating the maximum number of estimated removal failures to
355 <     * tolerate before sweeping through, unlinking cancelled nodes
356 <     * that were not unlinked upon initial removal. We perform sweeps
357 <     * by the thread hitting threshold (rather than background threads
358 <     * or by spreading work to other threads) because in the main
359 <     * contexts in which removal occurs, the caller is already
360 <     * timed-out, cancelled, or performing a potentially O(n)
361 <     * operation (i.e., remove(x)), none of which are time-critical
362 <     * enough to warrant the overhead that alternatives would impose
363 <     * on other threads.
352 >     * estimate of possible unsplice failures (in "sweepVotes").
353 >     * We trigger a full sweep when the estimate exceeds a threshold
354 >     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
355 >     * removal failures to tolerate before sweeping through, unlinking
356 >     * cancelled nodes that were not unlinked upon initial removal.
357 >     * We perform sweeps by the thread hitting threshold (rather than
358 >     * background threads or by spreading work to other threads)
359 >     * because in the main contexts in which removal occurs, the
360 >     * caller is already timed-out, cancelled, or performing a
361 >     * potentially O(n) operation (e.g. remove(x)), none of which are
362 >     * time-critical enough to warrant the overhead that alternatives
363 >     * would impose on other threads.
364       *
365       * Because the sweepVotes estimate is conservative, and because
366       * nodes become unlinked "naturally" as they fall off the head of
# Line 423 | Line 428 | public class LinkedTransferQueue<E> exte
428          }
429  
430          final boolean casItem(Object cmp, Object val) {
431 <            //            assert cmp == null || cmp.getClass() != Node.class;
431 >            // assert cmp == null || cmp.getClass() != Node.class;
432              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
433          }
434  
435          /**
436 <         * Creates a new node. Uses relaxed write because item can only
437 <         * be seen if followed by CAS.
436 >         * Constructs a new node.  Uses relaxed write because item can
437 >         * only be seen after publication via casNext.
438           */
439          Node(Object item, boolean isData) {
440              UNSAFE.putObject(this, itemOffset, item); // relaxed write
# Line 489 | Line 494 | public class LinkedTransferQueue<E> exte
494           * Tries to artificially match a data node -- used by remove.
495           */
496          final boolean tryMatchData() {
497 <            //            assert isData;
497 >            // assert isData;
498              Object x = item;
499              if (x != null && x != this && casItem(x, null)) {
500                  LockSupport.unpark(waiter);
# Line 498 | Line 503 | public class LinkedTransferQueue<E> exte
503              return false;
504          }
505  
501        // Unsafe mechanics
502        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
503        private static final long nextOffset =
504            objectFieldOffset(UNSAFE, "next", Node.class);
505        private static final long itemOffset =
506            objectFieldOffset(UNSAFE, "item", Node.class);
507        private static final long waiterOffset =
508            objectFieldOffset(UNSAFE, "waiter", Node.class);
509
506          private static final long serialVersionUID = -3375979862319811754L;
507 +
508 +        // Unsafe mechanics
509 +        private static final sun.misc.Unsafe UNSAFE;
510 +        private static final long itemOffset;
511 +        private static final long nextOffset;
512 +        private static final long waiterOffset;
513 +        static {
514 +            try {
515 +                UNSAFE = getUnsafe();
516 +                Class k = Node.class;
517 +                itemOffset = UNSAFE.objectFieldOffset
518 +                    (k.getDeclaredField("item"));
519 +                nextOffset = UNSAFE.objectFieldOffset
520 +                    (k.getDeclaredField("next"));
521 +                waiterOffset = UNSAFE.objectFieldOffset
522 +                    (k.getDeclaredField("waiter"));
523 +            } catch (Exception e) {
524 +                throw new Error(e);
525 +            }
526 +        }
527      }
528  
529      /** head of the queue; null until first enqueue */
# Line 542 | Line 558 | public class LinkedTransferQueue<E> exte
558  
559      @SuppressWarnings("unchecked")
560      static <E> E cast(Object item) {
561 <        //        assert item == null || item.getClass() != Node.class;
561 >        // assert item == null || item.getClass() != Node.class;
562          return (E) item;
563      }
564  
# Line 561 | Line 577 | public class LinkedTransferQueue<E> exte
577              throw new NullPointerException();
578          Node s = null;                        // the node to append, if needed
579  
580 <        retry: for (;;) {                     // restart on append race
580 >        retry:
581 >        for (;;) {                            // restart on append race
582  
583              for (Node h = head, p = h; p != null;) { // find & match first node
584                  boolean isData = p.isData;
# Line 572 | Line 589 | public class LinkedTransferQueue<E> exte
589                      if (p.casItem(item, e)) { // match
590                          for (Node q = p; q != h;) {
591                              Node n = q.next;  // update by 2 unless singleton
592 <                            if (head == h && casHead(h, n == null? q : n)) {
592 >                            if (head == h && casHead(h, n == null ? q : n)) {
593                                  h.forgetNext();
594                                  break;
595                              }                 // advance and retry
# Line 657 | Line 674 | public class LinkedTransferQueue<E> exte
674          for (;;) {
675              Object item = s.item;
676              if (item != e) {                  // matched
677 <                //                assert item != s;
677 >                // assert item != s;
678                  s.forgetContents();           // avoid garbage
679                  return this.<E>cast(item);
680              }
# Line 782 | Line 799 | public class LinkedTransferQueue<E> exte
799           * Moves to next node after prev, or first node if prev null.
800           */
801          private void advance(Node prev) {
802 <            lastPred = lastRet;
803 <            lastRet = prev;
804 <            for (Node p = (prev == null) ? head : succ(prev);
805 <                 p != null; p = succ(p)) {
806 <                Object item = p.item;
807 <                if (p.isData) {
808 <                    if (item != null && item != p) {
809 <                        nextItem = LinkedTransferQueue.this.<E>cast(item);
810 <                        nextNode = p;
802 >            /*
803 >             * To track and avoid buildup of deleted nodes in the face
804 >             * of calls to both Queue.remove and Itr.remove, we must
805 >             * include variants of unsplice and sweep upon each
806 >             * advance: Upon Itr.remove, we may need to catch up links
807 >             * from lastPred, and upon other removes, we might need to
808 >             * skip ahead from stale nodes and unsplice deleted ones
809 >             * found while advancing.
810 >             */
811 >
812 >            Node r, b; // reset lastPred upon possible deletion of lastRet
813 >            if ((r = lastRet) != null && !r.isMatched())
814 >                lastPred = r;    // next lastPred is old lastRet
815 >            else if ((b = lastPred) == null || b.isMatched())
816 >                lastPred = null; // at start of list
817 >            else {
818 >                Node s, n;       // help with removal of lastPred.next
819 >                while ((s = b.next) != null &&
820 >                       s != b && s.isMatched() &&
821 >                       (n = s.next) != null && n != s)
822 >                    b.casNext(s, n);
823 >            }
824 >
825 >            this.lastRet = prev;
826 >
827 >            for (Node p = prev, s, n;;) {
828 >                s = (p == null) ? head : p.next;
829 >                if (s == null)
830 >                    break;
831 >                else if (s == p) {
832 >                    p = null;
833 >                    continue;
834 >                }
835 >                Object item = s.item;
836 >                if (s.isData) {
837 >                    if (item != null && item != s) {
838 >                        nextItem = LinkedTransferQueue.<E>cast(item);
839 >                        nextNode = s;
840                          return;
841                      }
842                  }
843                  else if (item == null)
844                      break;
845 +                // assert s.isMatched();
846 +                if (p == null)
847 +                    p = s;
848 +                else if ((n = s.next) == null)
849 +                    break;
850 +                else if (s == n)
851 +                    p = null;
852 +                else
853 +                    p.casNext(s, n);
854              }
855              nextNode = null;
856 +            nextItem = null;
857          }
858  
859          Itr() {
# Line 817 | Line 873 | public class LinkedTransferQueue<E> exte
873          }
874  
875          public final void remove() {
876 <            Node p = lastRet;
877 <            if (p == null) throw new IllegalStateException();
878 <            if (p.tryMatchData())
879 <                unsplice(lastPred, p);
876 >            final Node lastRet = this.lastRet;
877 >            if (lastRet == null)
878 >                throw new IllegalStateException();
879 >            this.lastRet = null;
880 >            if (lastRet.tryMatchData())
881 >                unsplice(lastPred, lastRet);
882          }
883      }
884  
# Line 877 | Line 935 | public class LinkedTransferQueue<E> exte
935      }
936  
937      /**
938 <     * Unlinks matched nodes encountered in a traversal from head.
938 >     * Unlinks matched (typically cancelled) nodes encountered in a
939 >     * traversal from head.
940       */
941      private void sweep() {
942          for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
943 <            if (p == s)                    // stale
944 <                p = head;
886 <            else if (!s.isMatched())
943 >            if (!s.isMatched())
944 >                // Unmatched nodes are never self-linked
945                  p = s;
946              else if ((n = s.next) == null) // trailing node is pinned
947                  break;
948 +            else if (s == n)    // stale
949 +                // No need to also check for p == s, since that implies s == n
950 +                p = head;
951              else
952                  p.casNext(s, n);
953          }
# Line 967 | Line 1028 | public class LinkedTransferQueue<E> exte
1028       * Inserts the specified element at the tail of this queue.
1029       * As the queue is unbounded, this method will never return {@code false}.
1030       *
1031 <     * @return {@code true} (as specified by
971 <     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1031 >     * @return {@code true} (as specified by {@link Queue#offer})
1032       * @throws NullPointerException if the specified element is null
1033       */
1034      public boolean offer(E e) {
# Line 1100 | Line 1160 | public class LinkedTransferQueue<E> exte
1160      }
1161  
1162      /**
1163 <     * Returns an iterator over the elements in this queue in proper
1164 <     * sequence, from head to tail.
1163 >     * Returns an iterator over the elements in this queue in proper sequence.
1164 >     * The elements will be returned in order from first (head) to last (tail).
1165       *
1166       * <p>The returned iterator is a "weakly consistent" iterator that
1167 <     * will never throw
1168 <     * {@link ConcurrentModificationException ConcurrentModificationException},
1169 <     * and guarantees to traverse elements as they existed upon
1170 <     * construction of the iterator, and may (but is not guaranteed
1171 <     * to) reflect any modifications subsequent to construction.
1167 >     * will never throw {@link java.util.ConcurrentModificationException
1168 >     * ConcurrentModificationException}, and guarantees to traverse
1169 >     * elements as they existed upon construction of the iterator, and
1170 >     * may (but is not guaranteed to) reflect any modifications
1171 >     * subsequent to construction.
1172       *
1173       * @return an iterator over the elements in this queue in proper sequence
1174       */
# Line 1173 | Line 1233 | public class LinkedTransferQueue<E> exte
1233      }
1234  
1235      /**
1236 +     * Returns {@code true} if this queue contains the specified element.
1237 +     * More formally, returns {@code true} if and only if this queue contains
1238 +     * at least one element {@code e} such that {@code o.equals(e)}.
1239 +     *
1240 +     * @param o object to be checked for containment in this queue
1241 +     * @return {@code true} if this queue contains the specified element
1242 +     */
1243 +    public boolean contains(Object o) {
1244 +        if (o == null) return false;
1245 +        for (Node p = head; p != null; p = succ(p)) {
1246 +            Object item = p.item;
1247 +            if (p.isData) {
1248 +                if (item != null && item != p && o.equals(item))
1249 +                    return true;
1250 +            }
1251 +            else if (item == null)
1252 +                break;
1253 +        }
1254 +        return false;
1255 +    }
1256 +
1257 +    /**
1258       * Always returns {@code Integer.MAX_VALUE} because a
1259       * {@code LinkedTransferQueue} is not capacity constrained.
1260       *
# Line 1219 | Line 1301 | public class LinkedTransferQueue<E> exte
1301  
1302      // Unsafe mechanics
1303  
1304 <    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1305 <    private static final long headOffset =
1306 <        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1307 <    private static final long tailOffset =
1308 <        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1227 <    private static final long sweepVotesOffset =
1228 <        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1229 <
1230 <    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1231 <                                  String field, Class<?> klazz) {
1304 >    private static final sun.misc.Unsafe UNSAFE;
1305 >    private static final long headOffset;
1306 >    private static final long tailOffset;
1307 >    private static final long sweepVotesOffset;
1308 >    static {
1309          try {
1310 <            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1311 <        } catch (NoSuchFieldException e) {
1312 <            // Convert Exception to corresponding Error
1313 <            NoSuchFieldError error = new NoSuchFieldError(field);
1314 <            error.initCause(e);
1315 <            throw error;
1310 >            UNSAFE = getUnsafe();
1311 >            Class k = LinkedTransferQueue.class;
1312 >            headOffset = UNSAFE.objectFieldOffset
1313 >                (k.getDeclaredField("head"));
1314 >            tailOffset = UNSAFE.objectFieldOffset
1315 >                (k.getDeclaredField("tail"));
1316 >            sweepVotesOffset = UNSAFE.objectFieldOffset
1317 >                (k.getDeclaredField("sweepVotes"));
1318 >        } catch (Exception e) {
1319 >            throw new Error(e);
1320          }
1321      }
1322  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines