ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.72 by dl, Mon Apr 5 15:50:51 2010 UTC vs.
Revision 1.90 by jsr166, Tue Jun 21 20:07:32 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.AbstractQueue;
10   import java.util.Collection;
13 import java.util.ConcurrentModificationException;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13   import java.util.Queue;
14 + import java.util.concurrent.TimeUnit;
15   import java.util.concurrent.locks.LockSupport;
16 +
17   /**
18   * An unbounded {@link TransferQueue} based on linked nodes.
19   * This queue orders elements FIFO (first-in-first-out) with respect
# Line 23 | Line 22 | import java.util.concurrent.locks.LockSu
22   * producer.  The <em>tail</em> of the queue is that element that has
23   * been on the queue the shortest time for some producer.
24   *
25 < * <p>Beware that, unlike in most collections, the {@code size}
26 < * method is <em>NOT</em> a constant-time operation. Because of the
25 > * <p>Beware that, unlike in most collections, the {@code size} method
26 > * is <em>NOT</em> a constant-time operation. Because of the
27   * asynchronous nature of these queues, determining the current number
28 < * of elements requires a traversal of the elements.
28 > * of elements requires a traversal of the elements, and so may report
29 > * inaccurate results if this collection is modified during traversal.
30 > * Additionally, the bulk operations {@code addAll},
31 > * {@code removeAll}, {@code retainAll}, {@code containsAll},
32 > * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
33 > * to be performed atomically. For example, an iterator operating
34 > * concurrently with an {@code addAll} operation might view only some
35 > * of the added elements.
36   *
37   * <p>This class and its iterator implement all of the
38   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 295 | Line 301 | public class LinkedTransferQueue<E> exte
301       *    of less-contended queues.  During spins threads check their
302       *    interrupt status and generate a thread-local random number
303       *    to decide to occasionally perform a Thread.yield. While
304 <     *    yield has underdefined specs, we assume that might it help,
305 <     *    and will not hurt in limiting impact of spinning on busy
304 >     *    yield has underdefined specs, we assume that it might help,
305 >     *    and will not hurt, in limiting impact of spinning on busy
306       *    systems.  We also use smaller (1/2) spins for nodes that are
307       *    not known to be front but whose predecessors have not
308       *    blocked -- these "chained" spins avoid artifacts of
# Line 321 | Line 327 | public class LinkedTransferQueue<E> exte
327       * situations in which we cannot guarantee to make node s
328       * unreachable in this way: (1) If s is the trailing node of list
329       * (i.e., with null next), then it is pinned as the target node
330 <     * for appends, so can only be removed later when other nodes are
330 >     * for appends, so can only be removed later after other nodes are
331       * appended. (2) We cannot necessarily unlink s given a
332       * predecessor node that is matched (including the case of being
333       * cancelled): the predecessor may already be unspliced, in which
# Line 343 | Line 349 | public class LinkedTransferQueue<E> exte
349       * When these cases arise, rather than always retraversing the
350       * entire list to find an actual predecessor to unlink (which
351       * won't help for case (1) anyway), we record a conservative
352 <     * estimate of possible unsplice failures (in "sweepVotes").  We
353 <     * trigger a full sweep when the estimate exceeds a threshold
354 <     * indicating the maximum number of estimated removal failures to
355 <     * tolerate before sweeping through, unlinking cancelled nodes
356 <     * that were not unlinked upon initial removal. We perform sweeps
357 <     * by the thread hitting threshold (rather than background threads
358 <     * or by spreading work to other threads) because in the main
359 <     * contexts in which removal occurs, the caller is already
360 <     * timed-out, cancelled, or performing a potentially O(n)
361 <     * operation (i.e., remove(x)), none of which are time-critical
362 <     * enough to warrant the overhead that alternatives would impose
363 <     * on other threads.
352 >     * estimate of possible unsplice failures (in "sweepVotes").
353 >     * We trigger a full sweep when the estimate exceeds a threshold
354 >     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
355 >     * removal failures to tolerate before sweeping through, unlinking
356 >     * cancelled nodes that were not unlinked upon initial removal.
357 >     * We perform sweeps by the thread hitting threshold (rather than
358 >     * background threads or by spreading work to other threads)
359 >     * because in the main contexts in which removal occurs, the
360 >     * caller is already timed-out, cancelled, or performing a
361 >     * potentially O(n) operation (e.g. remove(x)), none of which are
362 >     * time-critical enough to warrant the overhead that alternatives
363 >     * would impose on other threads.
364       *
365       * Because the sweepVotes estimate is conservative, and because
366       * nodes become unlinked "naturally" as they fall off the head of
# Line 422 | Line 428 | public class LinkedTransferQueue<E> exte
428          }
429  
430          final boolean casItem(Object cmp, Object val) {
431 <            assert cmp == null || cmp.getClass() != Node.class;
431 >            // assert cmp == null || cmp.getClass() != Node.class;
432              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
433          }
434  
435          /**
436 <         * Creates a new node. Uses relaxed write because item can only
437 <         * be seen if followed by CAS.
436 >         * Constructs a new node.  Uses relaxed write because item can
437 >         * only be seen after publication via casNext.
438           */
439          Node(Object item, boolean isData) {
440              UNSAFE.putObject(this, itemOffset, item); // relaxed write
# Line 446 | Line 452 | public class LinkedTransferQueue<E> exte
452          /**
453           * Sets item to self and waiter to null, to avoid garbage
454           * retention after matching or cancelling. Uses relaxed writes
455 <         * bacause order is already constrained in the only calling
455 >         * because order is already constrained in the only calling
456           * contexts: item is forgotten only after volatile/atomic
457           * mechanics that extract items.  Similarly, clearing waiter
458           * follows either CAS or return from park (if ever parked;
# Line 488 | Line 494 | public class LinkedTransferQueue<E> exte
494           * Tries to artificially match a data node -- used by remove.
495           */
496          final boolean tryMatchData() {
497 <            assert isData;
497 >            // assert isData;
498              Object x = item;
499              if (x != null && x != this && casItem(x, null)) {
500                  LockSupport.unpark(waiter);
# Line 497 | Line 503 | public class LinkedTransferQueue<E> exte
503              return false;
504          }
505  
500        // Unsafe mechanics
501        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
502        private static final long nextOffset =
503            objectFieldOffset(UNSAFE, "next", Node.class);
504        private static final long itemOffset =
505            objectFieldOffset(UNSAFE, "item", Node.class);
506        private static final long waiterOffset =
507            objectFieldOffset(UNSAFE, "waiter", Node.class);
508
506          private static final long serialVersionUID = -3375979862319811754L;
507 +
508 +        // Unsafe mechanics
509 +        private static final sun.misc.Unsafe UNSAFE;
510 +        private static final long itemOffset;
511 +        private static final long nextOffset;
512 +        private static final long waiterOffset;
513 +        static {
514 +            try {
515 +                UNSAFE = getUnsafe();
516 +                Class<?> k = Node.class;
517 +                itemOffset = UNSAFE.objectFieldOffset
518 +                    (k.getDeclaredField("item"));
519 +                nextOffset = UNSAFE.objectFieldOffset
520 +                    (k.getDeclaredField("next"));
521 +                waiterOffset = UNSAFE.objectFieldOffset
522 +                    (k.getDeclaredField("waiter"));
523 +            } catch (Exception e) {
524 +                throw new Error(e);
525 +            }
526 +        }
527      }
528  
529      /** head of the queue; null until first enqueue */
# Line 541 | Line 558 | public class LinkedTransferQueue<E> exte
558  
559      @SuppressWarnings("unchecked")
560      static <E> E cast(Object item) {
561 <        assert item == null || item.getClass() != Node.class;
561 >        // assert item == null || item.getClass() != Node.class;
562          return (E) item;
563      }
564  
# Line 560 | Line 577 | public class LinkedTransferQueue<E> exte
577              throw new NullPointerException();
578          Node s = null;                        // the node to append, if needed
579  
580 <        retry: for (;;) {                     // restart on append race
580 >        retry:
581 >        for (;;) {                            // restart on append race
582  
583              for (Node h = head, p = h; p != null;) { // find & match first node
584                  boolean isData = p.isData;
# Line 571 | Line 589 | public class LinkedTransferQueue<E> exte
589                      if (p.casItem(item, e)) { // match
590                          for (Node q = p; q != h;) {
591                              Node n = q.next;  // update by 2 unless singleton
592 <                            if (head == h && casHead(h, n == null? q : n)) {
592 >                            if (head == h && casHead(h, n == null ? q : n)) {
593                                  h.forgetNext();
594                                  break;
595                              }                 // advance and retry
# Line 580 | Line 598 | public class LinkedTransferQueue<E> exte
598                                  break;        // unless slack < 2
599                          }
600                          LockSupport.unpark(p.waiter);
601 <                        return this.<E>cast(item);
601 >                        return LinkedTransferQueue.<E>cast(item);
602                      }
603                  }
604                  Node n = p.next;
# Line 656 | Line 674 | public class LinkedTransferQueue<E> exte
674          for (;;) {
675              Object item = s.item;
676              if (item != e) {                  // matched
677 <                assert item != s;
677 >                // assert item != s;
678                  s.forgetContents();           // avoid garbage
679 <                return this.<E>cast(item);
679 >                return LinkedTransferQueue.<E>cast(item);
680              }
681              if ((w.isInterrupted() || (timed && nanos <= 0)) &&
682                      s.casItem(e, s)) {        // cancel
# Line 739 | Line 757 | public class LinkedTransferQueue<E> exte
757              Object item = p.item;
758              if (p.isData) {
759                  if (item != null && item != p)
760 <                    return this.<E>cast(item);
760 >                    return LinkedTransferQueue.<E>cast(item);
761              }
762              else if (item == null)
763                  return null;
# Line 781 | Line 799 | public class LinkedTransferQueue<E> exte
799           * Moves to next node after prev, or first node if prev null.
800           */
801          private void advance(Node prev) {
802 <            lastPred = lastRet;
803 <            lastRet = prev;
804 <            for (Node p = (prev == null) ? head : succ(prev);
805 <                 p != null; p = succ(p)) {
806 <                Object item = p.item;
807 <                if (p.isData) {
808 <                    if (item != null && item != p) {
809 <                        nextItem = LinkedTransferQueue.this.<E>cast(item);
810 <                        nextNode = p;
802 >            /*
803 >             * To track and avoid buildup of deleted nodes in the face
804 >             * of calls to both Queue.remove and Itr.remove, we must
805 >             * include variants of unsplice and sweep upon each
806 >             * advance: Upon Itr.remove, we may need to catch up links
807 >             * from lastPred, and upon other removes, we might need to
808 >             * skip ahead from stale nodes and unsplice deleted ones
809 >             * found while advancing.
810 >             */
811 >
812 >            Node r, b; // reset lastPred upon possible deletion of lastRet
813 >            if ((r = lastRet) != null && !r.isMatched())
814 >                lastPred = r;    // next lastPred is old lastRet
815 >            else if ((b = lastPred) == null || b.isMatched())
816 >                lastPred = null; // at start of list
817 >            else {
818 >                Node s, n;       // help with removal of lastPred.next
819 >                while ((s = b.next) != null &&
820 >                       s != b && s.isMatched() &&
821 >                       (n = s.next) != null && n != s)
822 >                    b.casNext(s, n);
823 >            }
824 >
825 >            this.lastRet = prev;
826 >
827 >            for (Node p = prev, s, n;;) {
828 >                s = (p == null) ? head : p.next;
829 >                if (s == null)
830 >                    break;
831 >                else if (s == p) {
832 >                    p = null;
833 >                    continue;
834 >                }
835 >                Object item = s.item;
836 >                if (s.isData) {
837 >                    if (item != null && item != s) {
838 >                        nextItem = LinkedTransferQueue.<E>cast(item);
839 >                        nextNode = s;
840                          return;
841                      }
842                  }
843                  else if (item == null)
844                      break;
845 +                // assert s.isMatched();
846 +                if (p == null)
847 +                    p = s;
848 +                else if ((n = s.next) == null)
849 +                    break;
850 +                else if (s == n)
851 +                    p = null;
852 +                else
853 +                    p.casNext(s, n);
854              }
855              nextNode = null;
856 +            nextItem = null;
857          }
858  
859          Itr() {
# Line 816 | Line 873 | public class LinkedTransferQueue<E> exte
873          }
874  
875          public final void remove() {
876 <            Node p = lastRet;
877 <            if (p == null) throw new IllegalStateException();
878 <            if (p.tryMatchData())
879 <                unsplice(lastPred, p);
876 >            final Node lastRet = this.lastRet;
877 >            if (lastRet == null)
878 >                throw new IllegalStateException();
879 >            this.lastRet = null;
880 >            if (lastRet.tryMatchData())
881 >                unsplice(lastPred, lastRet);
882          }
883      }
884  
# Line 876 | Line 935 | public class LinkedTransferQueue<E> exte
935      }
936  
937      /**
938 <     * Unlinks matched nodes encountered in a traversal from head.
938 >     * Unlinks matched (typically cancelled) nodes encountered in a
939 >     * traversal from head.
940       */
941      private void sweep() {
942          for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
943 <            if (p == s)                    // stale
944 <                p = head;
885 <            else if (!s.isMatched())
943 >            if (!s.isMatched())
944 >                // Unmatched nodes are never self-linked
945                  p = s;
946              else if ((n = s.next) == null) // trailing node is pinned
947                  break;
948 +            else if (s == n)    // stale
949 +                // No need to also check for p == s, since that implies s == n
950 +                p = head;
951              else
952                  p.casNext(s, n);
953          }
# Line 954 | Line 1016 | public class LinkedTransferQueue<E> exte
1016       * return {@code false}.
1017       *
1018       * @return {@code true} (as specified by
1019 <     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1019 >     *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1020 >     *  BlockingQueue.offer})
1021       * @throws NullPointerException if the specified element is null
1022       */
1023      public boolean offer(E e, long timeout, TimeUnit unit) {
# Line 966 | Line 1029 | public class LinkedTransferQueue<E> exte
1029       * Inserts the specified element at the tail of this queue.
1030       * As the queue is unbounded, this method will never return {@code false}.
1031       *
1032 <     * @return {@code true} (as specified by
970 <     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1032 >     * @return {@code true} (as specified by {@link Queue#offer})
1033       * @throws NullPointerException if the specified element is null
1034       */
1035      public boolean offer(E e) {
# Line 1072 | Line 1134 | public class LinkedTransferQueue<E> exte
1134          if (c == this)
1135              throw new IllegalArgumentException();
1136          int n = 0;
1137 <        E e;
1076 <        while ( (e = poll()) != null) {
1137 >        for (E e; (e = poll()) != null;) {
1138              c.add(e);
1139              ++n;
1140          }
# Line 1090 | Line 1151 | public class LinkedTransferQueue<E> exte
1151          if (c == this)
1152              throw new IllegalArgumentException();
1153          int n = 0;
1154 <        E e;
1094 <        while (n < maxElements && (e = poll()) != null) {
1154 >        for (E e; n < maxElements && (e = poll()) != null;) {
1155              c.add(e);
1156              ++n;
1157          }
# Line 1099 | Line 1159 | public class LinkedTransferQueue<E> exte
1159      }
1160  
1161      /**
1162 <     * Returns an iterator over the elements in this queue in proper
1163 <     * sequence, from head to tail.
1162 >     * Returns an iterator over the elements in this queue in proper sequence.
1163 >     * The elements will be returned in order from first (head) to last (tail).
1164       *
1165       * <p>The returned iterator is a "weakly consistent" iterator that
1166 <     * will never throw
1167 <     * {@link ConcurrentModificationException ConcurrentModificationException},
1168 <     * and guarantees to traverse elements as they existed upon
1169 <     * construction of the iterator, and may (but is not guaranteed
1170 <     * to) reflect any modifications subsequent to construction.
1166 >     * will never throw {@link java.util.ConcurrentModificationException
1167 >     * ConcurrentModificationException}, and guarantees to traverse
1168 >     * elements as they existed upon construction of the iterator, and
1169 >     * may (but is not guaranteed to) reflect any modifications
1170 >     * subsequent to construction.
1171       *
1172       * @return an iterator over the elements in this queue in proper sequence
1173       */
# Line 1172 | Line 1232 | public class LinkedTransferQueue<E> exte
1232      }
1233  
1234      /**
1235 +     * Returns {@code true} if this queue contains the specified element.
1236 +     * More formally, returns {@code true} if and only if this queue contains
1237 +     * at least one element {@code e} such that {@code o.equals(e)}.
1238 +     *
1239 +     * @param o object to be checked for containment in this queue
1240 +     * @return {@code true} if this queue contains the specified element
1241 +     */
1242 +    public boolean contains(Object o) {
1243 +        if (o == null) return false;
1244 +        for (Node p = head; p != null; p = succ(p)) {
1245 +            Object item = p.item;
1246 +            if (p.isData) {
1247 +                if (item != null && item != p && o.equals(item))
1248 +                    return true;
1249 +            }
1250 +            else if (item == null)
1251 +                break;
1252 +        }
1253 +        return false;
1254 +    }
1255 +
1256 +    /**
1257       * Always returns {@code Integer.MAX_VALUE} because a
1258       * {@code LinkedTransferQueue} is not capacity constrained.
1259       *
1260       * @return {@code Integer.MAX_VALUE} (as specified by
1261 <     *         {@link BlockingQueue#remainingCapacity()})
1261 >     *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1262 >     *         BlockingQueue.remainingCapacity})
1263       */
1264      public int remainingCapacity() {
1265          return Integer.MAX_VALUE;
# Line 1218 | Line 1301 | public class LinkedTransferQueue<E> exte
1301  
1302      // Unsafe mechanics
1303  
1304 <    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1305 <    private static final long headOffset =
1306 <        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
1307 <    private static final long tailOffset =
1308 <        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
1226 <    private static final long sweepVotesOffset =
1227 <        objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
1228 <
1229 <    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1230 <                                  String field, Class<?> klazz) {
1304 >    private static final sun.misc.Unsafe UNSAFE;
1305 >    private static final long headOffset;
1306 >    private static final long tailOffset;
1307 >    private static final long sweepVotesOffset;
1308 >    static {
1309          try {
1310 <            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1311 <        } catch (NoSuchFieldException e) {
1312 <            // Convert Exception to corresponding Error
1313 <            NoSuchFieldError error = new NoSuchFieldError(field);
1314 <            error.initCause(e);
1315 <            throw error;
1310 >            UNSAFE = getUnsafe();
1311 >            Class<?> k = LinkedTransferQueue.class;
1312 >            headOffset = UNSAFE.objectFieldOffset
1313 >                (k.getDeclaredField("head"));
1314 >            tailOffset = UNSAFE.objectFieldOffset
1315 >                (k.getDeclaredField("tail"));
1316 >            sweepVotesOffset = UNSAFE.objectFieldOffset
1317 >                (k.getDeclaredField("sweepVotes"));
1318 >        } catch (Exception e) {
1319 >            throw new Error(e);
1320          }
1321      }
1322  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines