ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.139 by jsr166, Mon Jan 9 17:46:26 2017 UTC vs.
Revision 1.140 by jsr166, Sat Jan 14 06:59:57 2017 UTC

# Line 130 | Line 130 | public class LinkedTransferQueue<E> exte
130       * correctly perform enqueue and dequeue operations by traversing
131       * from a pointer to the initial node; CASing the item of the
132       * first unmatched node on match and CASing the next field of the
133 <     * trailing node on appends. (Plus some special-casing when
134 <     * initially empty).  While this would be a terrible idea in
135 <     * itself, it does have the benefit of not requiring ANY atomic
133 >     * trailing node on appends.  While this would be a terrible idea
134 >     * in itself, it does have the benefit of not requiring ANY atomic
135       * updates on head/tail fields.
136       *
137       * We introduce here an approach that lies between the extremes of
# Line 238 | Line 237 | public class LinkedTransferQueue<E> exte
237       * interior nodes) except in the case of cancellation/removal (see
238       * below).
239       *
241     * We allow both the head and tail fields to be null before any
242     * nodes are enqueued; initializing upon first append.  This
243     * simplifies some other logic, as well as providing more
244     * efficient explicit control paths instead of letting JVMs insert
245     * implicit NullPointerExceptions when they are null.  While not
246     * currently fully implemented, we also leave open the possibility
247     * of re-nulling these fields when empty (which is complicated to
248     * arrange, for little benefit.)
249     *
240       * All enqueue/dequeue operations are handled by the single method
241       * "xfer" with parameters indicating whether to act as some form
242       * of offer, put, poll, take, or transfer (each possibly with
# Line 278 | Line 268 | public class LinkedTransferQueue<E> exte
268       * 2. Try to append a new node (method tryAppend)
269       *
270       *    Starting at current tail pointer, find the actual last node
271 <     *    and try to append a new node (or if head was null, establish
272 <     *    the first node). Nodes can be appended only if their
273 <     *    predecessors are either already matched or are of the same
274 <     *    mode. If we detect otherwise, then a new node with opposite
275 <     *    mode must have been appended during traversal, so we must
276 <     *    restart at phase 1. The traversal and update steps are
277 <     *    otherwise similar to phase 1: Retrying upon CAS misses and
278 <     *    checking for staleness.  In particular, if a self-link is
279 <     *    encountered, then we can safely jump to a node on the list
290 <     *    by continuing the traversal at current head.
271 >     *    and try to append a new node. Nodes can be appended only if
272 >     *    their predecessors are either already matched or are of the
273 >     *    same mode. If we detect otherwise, then a new node with
274 >     *    opposite mode must have been appended during traversal, so
275 >     *    we must restart at phase 1. The traversal and update steps
276 >     *    are otherwise similar to phase 1: Retrying upon CAS misses
277 >     *    and checking for staleness.  In particular, if a self-link
278 >     *    is encountered, then we can safely jump to a node on the
279 >     *    list by continuing the traversal at current head.
280       *
281       *    On successful append, if the call was ASYNC, return.
282       *
# Line 440 | Line 429 | public class LinkedTransferQueue<E> exte
429          }
430  
431          /**
432 <         * Constructs a new node.  Uses relaxed write because item can
433 <         * only be seen after publication via casNext.
432 >         * Constructs a data node holding item if item is non-null,
433 >         * else a request node.  Uses relaxed write because item can
434 >         * only be seen after piggy-backing publication via CAS.
435           */
436          Node(Object item) {
437              ITEM.set(this, item);
438              isData = (item != null);
439          }
440  
441 +        /** Constructs a dead (matched data) dummy node. */
442 +        Node() {
443 +            isData = true;
444 +        }
445 +
446          /**
447           * Links node to itself to avoid garbage retention.  Called
448           * only after CASing head field, so uses relaxed write.
# Line 456 | Line 451 | public class LinkedTransferQueue<E> exte
451              NEXT.setRelease(this, this);
452          }
453  
454 +        final void appendRelaxed(Node next) {
455 +            // assert next != null;
456 +            // assert this.next == null;
457 +            NEXT.set(this, next);
458 +        }
459 +
460          /**
461           * Sets item (of a request node) to self and waiter to null,
462           * to avoid garbage retention after matching or cancelling.
# Line 505 | Line 506 | public class LinkedTransferQueue<E> exte
506          }
507  
508          private static final long serialVersionUID = -3375979862319811754L;
508
509        // VarHandle mechanics
510        private static final VarHandle ITEM;
511        private static final VarHandle NEXT;
512        private static final VarHandle WAITER;
513        static {
514            try {
515                MethodHandles.Lookup l = MethodHandles.lookup();
516                ITEM = l.findVarHandle(Node.class, "item", Object.class);
517                NEXT = l.findVarHandle(Node.class, "next", Node.class);
518                WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
519            } catch (ReflectiveOperationException e) {
520                throw new Error(e);
521            }
522        }
509      }
510  
511 <    /** head of the queue; null until first enqueue */
511 >    /**
512 >     * A node from which the first live (non-matched) node (if any)
513 >     * can be reached in O(1) time.
514 >     * Invariants:
515 >     * - all live nodes are reachable from head via .next
516 >     * - head != null
517 >     * - (tmp = head).next != tmp || tmp != head
518 >     * Non-invariants:
519 >     * - head may or may not be live
520 >     * - it is permitted for tail to lag behind head, that is, for tail
521 >     *   to not be reachable from head!
522 >     */
523      transient volatile Node head;
524  
525 <    /** tail of the queue; null until first append */
525 >    /**
526 >     * A node from which the last node on list (that is, the unique
527 >     * node with node.next == null) can be reached in O(1) time.
528 >     * Invariants:
529 >     * - the last node is always reachable from tail via .next
530 >     * - tail != null
531 >     * Non-invariants:
532 >     * - tail may or may not be live
533 >     * - it is permitted for tail to lag behind head, that is, for tail
534 >     *   to not be reachable from head!
535 >     * - tail.next may or may not be self-linked.
536 >     */
537      private transient volatile Node tail;
538  
539      /** The number of apparent failures to unsplice removed nodes */
540      private transient volatile int sweepVotes;
541  
542      private boolean casTail(Node cmp, Node val) {
543 +        // assert cmp != null;
544 +        // assert val != null;
545          return TAIL.compareAndSet(this, cmp, val);
546      }
547  
# Line 655 | Line 665 | public class LinkedTransferQueue<E> exte
665       * predecessor
666       */
667      private Node tryAppend(Node s, boolean haveData) {
668 +        // assert head != null;
669 +        // assert tail != null;
670          for (Node t = tail, p = t;;) {        // move p to last node and append
671              Node n, u;                        // temps for reads of next & tail
672 <            if (p == null && (p = head) == null) {
673 <                if (casHead(null, s))
662 <                    return s;                 // initialize
663 <            }
672 >            if (p == null)
673 >                p = head;
674              else if (p.cannotPrecede(haveData))
675                  return null;                  // lost race vs opposite mode
676              else if ((n = p.next) != null)    // not last; keep traversing
# Line 1223 | Line 1233 | public class LinkedTransferQueue<E> exte
1233       * Creates an initially empty {@code LinkedTransferQueue}.
1234       */
1235      public LinkedTransferQueue() {
1236 +        head = tail = new Node();
1237      }
1238  
1239      /**
# Line 1235 | Line 1246 | public class LinkedTransferQueue<E> exte
1246       *         of its elements are null
1247       */
1248      public LinkedTransferQueue(Collection<? extends E> c) {
1249 <        this();
1250 <        addAll(c);
1249 >        Node h = null, t = null;
1250 >        for (E e : c) {
1251 >            Node newNode = new Node(Objects.requireNonNull(e));
1252 >            if (h == null)
1253 >                h = t = newNode;
1254 >            else
1255 >                t.appendRelaxed(t = newNode);
1256 >        }
1257 >        if (h == null)
1258 >            h = t = new Node();
1259 >        head = h;
1260 >        tail = t;
1261      }
1262  
1263      /**
# Line 1580 | Line 1601 | public class LinkedTransferQueue<E> exte
1601       */
1602      private void readObject(java.io.ObjectInputStream s)
1603          throws java.io.IOException, ClassNotFoundException {
1604 <        s.defaultReadObject();
1605 <        for (;;) {
1604 >
1605 >        // Read in elements until trailing null sentinel found
1606 >        Node h = null, t = null;
1607 >        for (Object item; (item = s.readObject()) != null; ) {
1608              @SuppressWarnings("unchecked")
1609 <            E item = (E) s.readObject();
1610 <            if (item == null)
1611 <                break;
1609 >            Node newNode = new Node((E) item);
1610 >            if (h == null)
1611 >                h = t = newNode;
1612              else
1613 <                offer(item);
1613 >                t.appendRelaxed(t = newNode);
1614          }
1615 +        if (h == null)
1616 +            h = t = new Node();
1617 +        head = h;
1618 +        tail = t;
1619      }
1620  
1621      /**
# Line 1701 | Line 1728 | public class LinkedTransferQueue<E> exte
1728      private static final VarHandle HEAD;
1729      private static final VarHandle TAIL;
1730      private static final VarHandle SWEEPVOTES;
1731 +    static final VarHandle ITEM;
1732 +    static final VarHandle NEXT;
1733 +    static final VarHandle WAITER;
1734      static {
1735          try {
1736              MethodHandles.Lookup l = MethodHandles.lookup();
# Line 1710 | Line 1740 | public class LinkedTransferQueue<E> exte
1740                                     Node.class);
1741              SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
1742                                           int.class);
1743 +            ITEM = l.findVarHandle(Node.class, "item", Object.class);
1744 +            NEXT = l.findVarHandle(Node.class, "next", Node.class);
1745 +            WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
1746          } catch (ReflectiveOperationException e) {
1747              throw new Error(e);
1748          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines