ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.71 by jsr166, Mon Nov 16 01:02:49 2009 UTC vs.
Revision 1.82 by jsr166, Mon Nov 29 20:58:06 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.AbstractQueue;
10   import java.util.Collection;
11   import java.util.ConcurrentModificationException;
12   import java.util.Iterator;
13   import java.util.NoSuchElementException;
14   import java.util.Queue;
15 + import java.util.concurrent.TimeUnit;
16   import java.util.concurrent.locks.LockSupport;
17 +
18   /**
19   * An unbounded {@link TransferQueue} based on linked nodes.
20   * This queue orders elements FIFO (first-in-first-out) with respect
# Line 321 | Line 321 | public class LinkedTransferQueue<E> exte
321       * situations in which we cannot guarantee to make node s
322       * unreachable in this way: (1) If s is the trailing node of list
323       * (i.e., with null next), then it is pinned as the target node
324 <     * for appends, so can only be removed later when other nodes are
324 >     * for appends, so can only be removed later after other nodes are
325       * appended. (2) We cannot necessarily unlink s given a
326       * predecessor node that is matched (including the case of being
327       * cancelled): the predecessor may already be unspliced, in which
# Line 343 | Line 343 | public class LinkedTransferQueue<E> exte
343       * When these cases arise, rather than always retraversing the
344       * entire list to find an actual predecessor to unlink (which
345       * won't help for case (1) anyway), we record a conservative
346 <     * estimate of possible unsplice failures (in "sweepVotes").  We
347 <     * trigger a full sweep when the estimate exceeds a threshold
348 <     * indicating the maximum number of estimated removal failures to
349 <     * tolerate before sweeping through, unlinking cancelled nodes
350 <     * that were not unlinked upon initial removal. We perform sweeps
351 <     * by the thread hitting threshold (rather than background threads
352 <     * or by spreading work to other threads) because in the main
353 <     * contexts in which removal occurs, the caller is already
354 <     * timed-out, cancelled, or performing a potentially O(n)
355 <     * operation (i.e., remove(x)), none of which are time-critical
356 <     * enough to warrant the overhead that alternatives would impose
357 <     * on other threads.
346 >     * estimate of possible unsplice failures (in "sweepVotes").
347 >     * We trigger a full sweep when the estimate exceeds a threshold
348 >     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
349 >     * removal failures to tolerate before sweeping through, unlinking
350 >     * cancelled nodes that were not unlinked upon initial removal.
351 >     * We perform sweeps by the thread hitting threshold (rather than
352 >     * background threads or by spreading work to other threads)
353 >     * because in the main contexts in which removal occurs, the
354 >     * caller is already timed-out, cancelled, or performing a
355 >     * potentially O(n) operation (e.g. remove(x)), none of which are
356 >     * time-critical enough to warrant the overhead that alternatives
357 >     * would impose on other threads.
358       *
359       * Because the sweepVotes estimate is conservative, and because
360       * nodes become unlinked "naturally" as they fall off the head of
# Line 422 | Line 422 | public class LinkedTransferQueue<E> exte
422          }
423  
424          final boolean casItem(Object cmp, Object val) {
425 <            assert cmp == null || cmp.getClass() != Node.class;
425 >            // assert cmp == null || cmp.getClass() != Node.class;
426              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
427          }
428  
429          /**
430 <         * Creates a new node. Uses relaxed write because item can only
431 <         * be seen if followed by CAS.
430 >         * Constructs a new node.  Uses relaxed write because item can
431 >         * only be seen after publication via casNext.
432           */
433          Node(Object item, boolean isData) {
434              UNSAFE.putObject(this, itemOffset, item); // relaxed write
# Line 446 | Line 446 | public class LinkedTransferQueue<E> exte
446          /**
447           * Sets item to self and waiter to null, to avoid garbage
448           * retention after matching or cancelling. Uses relaxed writes
449 <         * bacause order is already constrained in the only calling
449 >         * because order is already constrained in the only calling
450           * contexts: item is forgotten only after volatile/atomic
451           * mechanics that extract items.  Similarly, clearing waiter
452           * follows either CAS or return from park (if ever parked;
# Line 488 | Line 488 | public class LinkedTransferQueue<E> exte
488           * Tries to artificially match a data node -- used by remove.
489           */
490          final boolean tryMatchData() {
491 <            assert isData;
491 >            // assert isData;
492              Object x = item;
493              if (x != null && x != this && casItem(x, null)) {
494                  LockSupport.unpark(waiter);
# Line 541 | Line 541 | public class LinkedTransferQueue<E> exte
541  
542      @SuppressWarnings("unchecked")
543      static <E> E cast(Object item) {
544 <        assert item == null || item.getClass() != Node.class;
544 >        // assert item == null || item.getClass() != Node.class;
545          return (E) item;
546      }
547  
# Line 560 | Line 560 | public class LinkedTransferQueue<E> exte
560              throw new NullPointerException();
561          Node s = null;                        // the node to append, if needed
562  
563 <        retry: for (;;) {                     // restart on append race
563 >        retry:
564 >        for (;;) {                            // restart on append race
565  
566              for (Node h = head, p = h; p != null;) { // find & match first node
567                  boolean isData = p.isData;
# Line 571 | Line 572 | public class LinkedTransferQueue<E> exte
572                      if (p.casItem(item, e)) { // match
573                          for (Node q = p; q != h;) {
574                              Node n = q.next;  // update by 2 unless singleton
575 <                            if (head == h && casHead(h, n == null? q : n)) {
575 >                            if (head == h && casHead(h, n == null ? q : n)) {
576                                  h.forgetNext();
577                                  break;
578                              }                 // advance and retry
# Line 656 | Line 657 | public class LinkedTransferQueue<E> exte
657          for (;;) {
658              Object item = s.item;
659              if (item != e) {                  // matched
660 <                assert item != s;
660 >                // assert item != s;
661                  s.forgetContents();           // avoid garbage
662                  return this.<E>cast(item);
663              }
# Line 781 | Line 782 | public class LinkedTransferQueue<E> exte
782           * Moves to next node after prev, or first node if prev null.
783           */
784          private void advance(Node prev) {
785 <            lastPred = lastRet;
786 <            lastRet = prev;
787 <            for (Node p = (prev == null) ? head : succ(prev);
788 <                 p != null; p = succ(p)) {
789 <                Object item = p.item;
790 <                if (p.isData) {
791 <                    if (item != null && item != p) {
792 <                        nextItem = LinkedTransferQueue.this.<E>cast(item);
793 <                        nextNode = p;
785 >            /*
786 >             * To track and avoid buildup of deleted nodes in the face
787 >             * of calls to both Queue.remove and Itr.remove, we must
788 >             * include variants of unsplice and sweep upon each
789 >             * advance: Upon Itr.remove, we may need to catch up links
790 >             * from lastPred, and upon other removes, we might need to
791 >             * skip ahead from stale nodes and unsplice deleted ones
792 >             * found while advancing.
793 >             */
794 >
795 >            Node r, b; // reset lastPred upon possible deletion of lastRet
796 >            if ((r = lastRet) != null && !r.isMatched())
797 >                lastPred = r;    // next lastPred is old lastRet
798 >            else if ((b = lastPred) == null || b.isMatched())
799 >                lastPred = null; // at start of list
800 >            else {
801 >                Node s, n;       // help with removal of lastPred.next
802 >                while ((s = b.next) != null &&
803 >                       s != b && s.isMatched() &&
804 >                       (n = s.next) != null && n != s)
805 >                    b.casNext(s, n);
806 >            }
807 >
808 >            this.lastRet = prev;
809 >            for (Node p = prev, s, n;;) {
810 >                s = (p == null) ? head : p.next;
811 >                if (s == null)
812 >                    break;
813 >                else if (s == p) {
814 >                    p = null;
815 >                    continue;
816 >                }
817 >                Object item = s.item;
818 >                if (s.isData) {
819 >                    if (item != null && item != s) {
820 >                        nextItem = LinkedTransferQueue.<E>cast(item);
821 >                        nextNode = s;
822                          return;
823                      }
824                  }
825                  else if (item == null)
826                      break;
827 +                // assert s.isMatched();
828 +                if (p == null)
829 +                    p = s;
830 +                else if ((n = s.next) == null)
831 +                    break;
832 +                else if (s == n)
833 +                    p = null;
834 +                else
835 +                    p.casNext(s, n);
836              }
837              nextNode = null;
838 +            nextItem = null;
839          }
840  
841          Itr() {
# Line 816 | Line 855 | public class LinkedTransferQueue<E> exte
855          }
856  
857          public final void remove() {
858 <            Node p = lastRet;
859 <            if (p == null) throw new IllegalStateException();
860 <            if (p.tryMatchData())
861 <                unsplice(lastPred, p);
858 >            final Node lastRet = this.lastRet;
859 >            if (lastRet == null)
860 >                throw new IllegalStateException();
861 >            this.lastRet = null;
862 >            if (lastRet.tryMatchData())
863 >                unsplice(lastPred, lastRet);
864          }
865      }
866  
# Line 876 | Line 917 | public class LinkedTransferQueue<E> exte
917      }
918  
919      /**
920 <     * Unlinks matched nodes encountered in a traversal from head.
920 >     * Unlinks matched (typically cancelled) nodes encountered in a
921 >     * traversal from head.
922       */
923      private void sweep() {
924          for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
925 <            if (p == s)                    // stale
926 <                p = head;
885 <            else if (!s.isMatched())
925 >            if (!s.isMatched())
926 >                // Unmatched nodes are never self-linked
927                  p = s;
928              else if ((n = s.next) == null) // trailing node is pinned
929                  break;
930 +            else if (s == n)    // stale
931 +                // No need to also check for p == s, since that implies s == n
932 +                p = head;
933              else
934                  p.casNext(s, n);
935          }
# Line 966 | Line 1010 | public class LinkedTransferQueue<E> exte
1010       * Inserts the specified element at the tail of this queue.
1011       * As the queue is unbounded, this method will never return {@code false}.
1012       *
1013 <     * @return {@code true} (as specified by
970 <     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1013 >     * @return {@code true} (as specified by {@link Queue#offer})
1014       * @throws NullPointerException if the specified element is null
1015       */
1016      public boolean offer(E e) {
# Line 1125 | Line 1168 | public class LinkedTransferQueue<E> exte
1168       * @return {@code true} if this queue contains no elements
1169       */
1170      public boolean isEmpty() {
1171 <        return firstOfMode(true) == null;
1171 >        for (Node p = head; p != null; p = succ(p)) {
1172 >            if (!p.isMatched())
1173 >                return !p.isData;
1174 >        }
1175 >        return true;
1176      }
1177  
1178      public boolean hasWaitingConsumer() {
# Line 1168 | Line 1215 | public class LinkedTransferQueue<E> exte
1215      }
1216  
1217      /**
1218 +     * Returns {@code true} if this queue contains the specified element.
1219 +     * More formally, returns {@code true} if and only if this queue contains
1220 +     * at least one element {@code e} such that {@code o.equals(e)}.
1221 +     *
1222 +     * @param o object to be checked for containment in this queue
1223 +     * @return {@code true} if this queue contains the specified element
1224 +     */
1225 +    public boolean contains(Object o) {
1226 +        if (o == null) return false;
1227 +        for (Node p = head; p != null; p = succ(p)) {
1228 +            Object item = p.item;
1229 +            if (p.isData) {
1230 +                if (item != null && item != p && o.equals(item))
1231 +                    return true;
1232 +            }
1233 +            else if (item == null)
1234 +                break;
1235 +        }
1236 +        return false;
1237 +    }
1238 +
1239 +    /**
1240       * Always returns {@code Integer.MAX_VALUE} because a
1241       * {@code LinkedTransferQueue} is not capacity constrained.
1242       *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines