ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.70 by jsr166, Sun Nov 15 02:29:12 2009 UTC vs.
Revision 1.87 by jsr166, Fri Jun 3 14:17:10 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.AbstractQueue;
10   import java.util.Collection;
13 import java.util.ConcurrentModificationException;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13   import java.util.Queue;
14 + import java.util.concurrent.TimeUnit;
15   import java.util.concurrent.locks.LockSupport;
16 +
17   /**
18   * An unbounded {@link TransferQueue} based on linked nodes.
19   * This queue orders elements FIFO (first-in-first-out) with respect
# Line 23 | Line 22 | import java.util.concurrent.locks.LockSu
22   * producer.  The <em>tail</em> of the queue is that element that has
23   * been on the queue the shortest time for some producer.
24   *
25 < * <p>Beware that, unlike in most collections, the {@code size}
26 < * method is <em>NOT</em> a constant-time operation. Because of the
25 > * <p>Beware that, unlike in most collections, the {@code size} method
26 > * is <em>NOT</em> a constant-time operation. Because of the
27   * asynchronous nature of these queues, determining the current number
28 < * of elements requires a traversal of the elements.
28 > * of elements requires a traversal of the elements, and so may report
29 > * inaccurate results if this collection is modified during traversal.
30 > * Additionally, the bulk operations {@code addAll},
31 > * {@code removeAll}, {@code retainAll}, {@code containsAll},
32 > * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
33 > * to be performed atomically. For example, an iterator operating
34 > * concurrently with an {@code addAll} operation might view only some
35 > * of the added elements.
36   *
37   * <p>This class and its iterator implement all of the
38   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 295 | Line 301 | public class LinkedTransferQueue<E> exte
301       *    of less-contended queues.  During spins threads check their
302       *    interrupt status and generate a thread-local random number
303       *    to decide to occasionally perform a Thread.yield. While
304 <     *    yield has underdefined specs, we assume that might it help,
304 >     *    yield has underdefined specs, we assume that it might help,
305       *    and will not hurt in limiting impact of spinning on busy
306       *    systems.  We also use smaller (1/2) spins for nodes that are
307       *    not known to be front but whose predecessors have not
# Line 321 | Line 327 | public class LinkedTransferQueue<E> exte
327       * situations in which we cannot guarantee to make node s
328       * unreachable in this way: (1) If s is the trailing node of list
329       * (i.e., with null next), then it is pinned as the target node
330 <     * for appends, so can only be removed later when other nodes are
330 >     * for appends, so can only be removed later after other nodes are
331       * appended. (2) We cannot necessarily unlink s given a
332       * predecessor node that is matched (including the case of being
333       * cancelled): the predecessor may already be unspliced, in which
# Line 343 | Line 349 | public class LinkedTransferQueue<E> exte
349       * When these cases arise, rather than always retraversing the
350       * entire list to find an actual predecessor to unlink (which
351       * won't help for case (1) anyway), we record a conservative
352 <     * estimate of possible unsplice failures (in "sweepVotes").  We
353 <     * trigger a full sweep when the estimate exceeds a threshold
354 <     * indicating the maximum number of estimated removal failures to
355 <     * tolerate before sweeping through, unlinking cancelled nodes
356 <     * that were not unlinked upon initial removal. We perform sweeps
357 <     * by the thread hitting threshold (rather than background threads
358 <     * or by spreading work to other threads) because in the main
359 <     * contexts in which removal occurs, the caller is already
360 <     * timed-out, cancelled, or performing a potentially O(n)
361 <     * operation (i.e., remove(x)), none of which are time-critical
362 <     * enough to warrant the overhead that alternatives would impose
363 <     * on other threads.
352 >     * estimate of possible unsplice failures (in "sweepVotes").
353 >     * We trigger a full sweep when the estimate exceeds a threshold
354 >     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
355 >     * removal failures to tolerate before sweeping through, unlinking
356 >     * cancelled nodes that were not unlinked upon initial removal.
357 >     * We perform sweeps by the thread hitting threshold (rather than
358 >     * background threads or by spreading work to other threads)
359 >     * because in the main contexts in which removal occurs, the
360 >     * caller is already timed-out, cancelled, or performing a
361 >     * potentially O(n) operation (e.g. remove(x)), none of which are
362 >     * time-critical enough to warrant the overhead that alternatives
363 >     * would impose on other threads.
364       *
365       * Because the sweepVotes estimate is conservative, and because
366       * nodes become unlinked "naturally" as they fall off the head of
# Line 422 | Line 428 | public class LinkedTransferQueue<E> exte
428          }
429  
430          final boolean casItem(Object cmp, Object val) {
431 <            assert cmp == null || cmp.getClass() != Node.class;
431 >            // assert cmp == null || cmp.getClass() != Node.class;
432              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
433          }
434  
435          /**
436 <         * Creates a new node. Uses relaxed write because item can only
437 <         * be seen if followed by CAS.
436 >         * Constructs a new node.  Uses relaxed write because item can
437 >         * only be seen after publication via casNext.
438           */
439          Node(Object item, boolean isData) {
440              UNSAFE.putObject(this, itemOffset, item); // relaxed write
# Line 446 | Line 452 | public class LinkedTransferQueue<E> exte
452          /**
453           * Sets item to self and waiter to null, to avoid garbage
454           * retention after matching or cancelling. Uses relaxed writes
455 <         * bacause order is already constrained in the only calling
455 >         * because order is already constrained in the only calling
456           * contexts: item is forgotten only after volatile/atomic
457           * mechanics that extract items.  Similarly, clearing waiter
458           * follows either CAS or return from park (if ever parked;
# Line 488 | Line 494 | public class LinkedTransferQueue<E> exte
494           * Tries to artificially match a data node -- used by remove.
495           */
496          final boolean tryMatchData() {
497 <            assert isData;
497 >            // assert isData;
498              Object x = item;
499              if (x != null && x != this && casItem(x, null)) {
500                  LockSupport.unpark(waiter);
# Line 497 | Line 503 | public class LinkedTransferQueue<E> exte
503              return false;
504          }
505  
500        // Unsafe mechanics
501        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
502        private static final long nextOffset =
503            objectFieldOffset(UNSAFE, "next", Node.class);
504        private static final long itemOffset =
505            objectFieldOffset(UNSAFE, "item", Node.class);
506        private static final long waiterOffset =
507            objectFieldOffset(UNSAFE, "waiter", Node.class);
508
506          private static final long serialVersionUID = -3375979862319811754L;
507 +
508 +        // Unsafe mechanics
509 +        private static final sun.misc.Unsafe UNSAFE;
510 +        private static final long itemOffset;
511 +        private static final long nextOffset;
512 +        private static final long waiterOffset;
513 +        static {
514 +            try {
515 +                UNSAFE = getUnsafe();
516 +                Class<?> k = Node.class;
517 +                itemOffset = UNSAFE.objectFieldOffset
518 +                    (k.getDeclaredField("item"));
519 +                nextOffset = UNSAFE.objectFieldOffset
520 +                    (k.getDeclaredField("next"));
521 +                waiterOffset = UNSAFE.objectFieldOffset
522 +                    (k.getDeclaredField("waiter"));
523 +            } catch (Exception e) {
524 +                throw new Error(e);
525 +            }
526 +        }
527      }
528  
529      /** head of the queue; null until first enqueue */
# Line 541 | Line 558 | public class LinkedTransferQueue<E> exte
558  
559      @SuppressWarnings("unchecked")
560      static <E> E cast(Object item) {
561 <        assert item == null || item.getClass() != Node.class;
561 >        // assert item == null || item.getClass() != Node.class;
562          return (E) item;
563      }
564  
# Line 560 | Line 577 | public class LinkedTransferQueue<E> exte
577              throw new NullPointerException();
578          Node s = null;                        // the node to append, if needed
579  
580 <        retry: for (;;) {                     // restart on append race
580 >        retry:
581 >        for (;;) {                            // restart on append race
582  
583              for (Node h = head, p = h; p != null;) { // find & match first node
584                  boolean isData = p.isData;
# Line 571 | Line 589 | public class LinkedTransferQueue<E> exte
589                      if (p.casItem(item, e)) { // match
590                          for (Node q = p; q != h;) {
591                              Node n = q.next;  // update by 2 unless singleton
592 <                            if (head == h && casHead(h, n == null? q : n)) {
592 >                            if (head == h && casHead(h, n == null ? q : n)) {
593                                  h.forgetNext();
594                                  break;
595                              }                 // advance and retry
# Line 656 | Line 674 | public class LinkedTransferQueue<E> exte
674          for (;;) {
675              Object item = s.item;
676              if (item != e) {                  // matched
677 <                assert item != s;
677 >                // assert item != s;
678                  s.forgetContents();           // avoid garbage
679                  return this.<E>cast(item);
680              }
# Line 781 | Line 799 | public class LinkedTransferQueue<E> exte
799           * Moves to next node after prev, or first node if prev null.
800           */
801          private void advance(Node prev) {
802 <            lastPred = lastRet;
803 <            lastRet = prev;
804 <            for (Node p = (prev == null) ? head : succ(prev);
805 <                 p != null; p = succ(p)) {
806 <                Object item = p.item;
807 <                if (p.isData) {
808 <                    if (item != null && item != p) {
809 <                        nextItem = LinkedTransferQueue.this.<E>cast(item);
810 <                        nextNode = p;
802 >            /*
803 >             * To track and avoid buildup of deleted nodes in the face
804 >             * of calls to both Queue.remove and Itr.remove, we must
805 >             * include variants of unsplice and sweep upon each
806 >             * advance: Upon Itr.remove, we may need to catch up links
807 >             * from lastPred, and upon other removes, we might need to
808 >             * skip ahead from stale nodes and unsplice deleted ones
809 >             * found while advancing.
810 >             */
811 >
812 >            Node r, b; // reset lastPred upon possible deletion of lastRet
813 >            if ((r = lastRet) != null && !r.isMatched())
814 >                lastPred = r;    // next lastPred is old lastRet
815 >            else if ((b = lastPred) == null || b.isMatched())
816 >                lastPred = null; // at start of list
817 >            else {
818 >                Node s, n;       // help with removal of lastPred.next
819 >                while ((s = b.next) != null &&
820 >                       s != b && s.isMatched() &&
821 >                       (n = s.next) != null && n != s)
822 >                    b.casNext(s, n);
823 >            }
824 >
825 >            this.lastRet = prev;
826 >
827 >            for (Node p = prev, s, n;;) {
828 >                s = (p == null) ? head : p.next;
829 >                if (s == null)
830 >                    break;
831 >                else if (s == p) {
832 >                    p = null;
833 >                    continue;
834 >                }
835 >                Object item = s.item;
836 >                if (s.isData) {
837 >                    if (item != null && item != s) {
838 >                        nextItem = LinkedTransferQueue.<E>cast(item);
839 >                        nextNode = s;
840                          return;
841                      }
842                  }
843                  else if (item == null)
844                      break;
845 +                // assert s.isMatched();
846 +                if (p == null)
847 +                    p = s;
848 +                else if ((n = s.next) == null)
849 +                    break;
850 +                else if (s == n)
851 +                    p = null;
852 +                else
853 +                    p.casNext(s, n);
854              }
855              nextNode = null;
856 +            nextItem = null;
857          }
858  
859          Itr() {
# Line 816 | Line 873 | public class LinkedTransferQueue<E> exte
873          }
874  
875          public final void remove() {
876 <            Node p = lastRet;
877 <            if (p == null) throw new IllegalStateException();
878 <            if (p.tryMatchData())
879 <                unsplice(lastPred, p);
876 >            final Node lastRet = this.lastRet;
877 >            if (lastRet == null)
878 >                throw new IllegalStateException();
879 >            this.lastRet = null;
880 >            if (lastRet.tryMatchData())
881 >                unsplice(lastPred, lastRet);
882          }
883      }
884  
# Line 876 | Line 935 | public class LinkedTransferQueue<E> exte
935      }
936  
937      /**
938 <     * Unlinks matched nodes encountered in a traversal from head.
938 >     * Unlinks matched (typically cancelled) nodes encountered in a
939 >     * traversal from head.
940       */
941      private void sweep() {
942 <        Node p = head, s, n;
943 <        while (p != null && (s = p.next) != null && (n = s.next) != null) {
944 <            if (p == s || s == n)
885 <                p = head; // stale
886 <            else if (s.isMatched())
887 <                p.casNext(s, n);
888 <            else
942 >        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
943 >            if (!s.isMatched())
944 >                // Unmatched nodes are never self-linked
945                  p = s;
946 +            else if ((n = s.next) == null) // trailing node is pinned
947 +                break;
948 +            else if (s == n)    // stale
949 +                // No need to also check for p == s, since that implies s == n
950 +                p = head;
951 +            else
952 +                p.casNext(s, n);
953          }
954      }
955  
# Line 953 | Line 1016 | public class LinkedTransferQueue<E> exte
1016       * return {@code false}.
1017       *
1018       * @return {@code true} (as specified by
1019 <     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1019 >     *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1020 >     *  BlockingQueue.offer})
1021       * @throws NullPointerException if the specified element is null
1022       */
1023      public boolean offer(E e, long timeout, TimeUnit unit) {
# Line 965 | Line 1029 | public class LinkedTransferQueue<E> exte
1029       * Inserts the specified element at the tail of this queue.
1030       * As the queue is unbounded, this method will never return {@code false}.
1031       *
1032 <     * @return {@code true} (as specified by
969 <     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1032 >     * @return {@code true} (as specified by {@link Queue#offer})
1033       * @throws NullPointerException if the specified element is null
1034       */
1035      public boolean offer(E e) {
# Line 1098 | Line 1161 | public class LinkedTransferQueue<E> exte
1161      }
1162  
1163      /**
1164 <     * Returns an iterator over the elements in this queue in proper
1165 <     * sequence, from head to tail.
1164 >     * Returns an iterator over the elements in this queue in proper sequence.
1165 >     * The elements will be returned in order from first (head) to last (tail).
1166       *
1167       * <p>The returned iterator is a "weakly consistent" iterator that
1168 <     * will never throw
1169 <     * {@link ConcurrentModificationException ConcurrentModificationException},
1170 <     * and guarantees to traverse elements as they existed upon
1171 <     * construction of the iterator, and may (but is not guaranteed
1172 <     * to) reflect any modifications subsequent to construction.
1168 >     * will never throw {@link java.util.ConcurrentModificationException
1169 >     * ConcurrentModificationException}, and guarantees to traverse
1170 >     * elements as they existed upon construction of the iterator, and
1171 >     * may (but is not guaranteed to) reflect any modifications
1172 >     * subsequent to construction.
1173       *
1174       * @return an iterator over the elements in this queue in proper sequence
1175       */
# Line 1124 | Line 1187 | public class LinkedTransferQueue<E> exte
1187       * @return {@code true} if this queue contains no elements
1188       */
1189      public boolean isEmpty() {
1190 <        return firstOfMode(true) == null;
1190 >        for (Node p = head; p != null; p = succ(p)) {
1191 >            if (!p.isMatched())
1192 >                return !p.isData;
1193 >        }
1194 >        return true;
1195      }
1196  
1197      public boolean hasWaitingConsumer() {
# Line 1167 | Line 1234 | public class LinkedTransferQueue<E> exte
1234      }
1235  
1236      /**
1237 +     * Returns {@code true} if this queue contains the specified element.
1238 +     * More formally, returns {@code true} if and only if this queue contains
1239 +     * at least one element {@code e} such that {@code o.equals(e)}.
1240 +     *
1241 +     * @param o object to be checked for containment in this queue
1242 +     * @return {@code true} if this queue contains the specified element
1243 +     */
1244 +    public boolean contains(Object o) {
1245 +        if (o == null) return false;
1246 +        for (Node p = head; p != null; p = succ(p)) {
1247 +            Object item = p.item;
1248 +            if (p.isData) {
1249 +                if (item != null && item != p && o.equals(item))
1250 +                    return true;
1251 +            }
1252 +            else if (item == null)
1253 +                break;
1254 +        }
1255 +        return false;
1256 +    }
1257 +
1258 +    /**
1259       * Always returns {@code Integer.MAX_VALUE} because a
1260       * {@code LinkedTransferQueue} is not capacity constrained.
1261       *
1262       * @return {@code Integer.MAX_VALUE} (as specified by
1263 <     *         {@link BlockingQueue#remainingCapacity()})
1263 >     *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1264 >     *         BlockingQueue.remainingCapacity})
1265       */
1266      public int remainingCapacity() {
1267          return Integer.MAX_VALUE;
# Line 1213 | Line 1303 | public class LinkedTransferQueue<E> exte
1303  
1304      // Unsafe mechanics
1305  
1306 <    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1307 <    private static final long headOffset =
1308 <        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1309 <    private static final long tailOffset =
1310 <        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1221 <    private static final long sweepVotesOffset =
1222 <        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1223 <
1224 <    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1225 <                                  String field, Class<?> klazz) {
1306 >    private static final sun.misc.Unsafe UNSAFE;
1307 >    private static final long headOffset;
1308 >    private static final long tailOffset;
1309 >    private static final long sweepVotesOffset;
1310 >    static {
1311          try {
1312 <            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1313 <        } catch (NoSuchFieldException e) {
1314 <            // Convert Exception to corresponding Error
1315 <            NoSuchFieldError error = new NoSuchFieldError(field);
1316 <            error.initCause(e);
1317 <            throw error;
1312 >            UNSAFE = getUnsafe();
1313 >            Class<?> k = LinkedTransferQueue.class;
1314 >            headOffset = UNSAFE.objectFieldOffset
1315 >                (k.getDeclaredField("head"));
1316 >            tailOffset = UNSAFE.objectFieldOffset
1317 >                (k.getDeclaredField("tail"));
1318 >            sweepVotesOffset = UNSAFE.objectFieldOffset
1319 >                (k.getDeclaredField("sweepVotes"));
1320 >        } catch (Exception e) {
1321 >            throw new Error(e);
1322          }
1323      }
1324  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines