ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.7 by dl, Wed Sep 24 10:48:43 2008 UTC vs.
Revision 1.15 by dl, Wed Mar 25 13:43:42 2009 UTC

# Line 21 | Line 21 | import java.lang.reflect.*;
21   * producer.  The <em>tail</em> of the queue is that element that has
22   * been on the queue the shortest time for some producer.
23   *
24 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
24 > * <p>Beware that, unlike in most collections, the {@code size}
25   * method is <em>NOT</em> a constant-time operation. Because of the
26   * asynchronous nature of these queues, determining the current number
27   * of elements requires a traversal of the elements.
# Line 51 | Line 51 | public class LinkedTransferQueue<E> exte
51      private static final long serialVersionUID = -3223113410248163686L;
52  
53      /*
54     * This is still a work in progress...
55     *
54       * This class extends the approach used in FIFO-mode
55       * SynchronousQueues. See the internal documentation, as well as
56       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57       * Lea & Scott
58       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59       *
60 <     * The main extension is to provide different Wait modes
61 <     * for the main "xfer" method that puts or takes items.
62 <     * These don't impact the basic dual-queue logic, but instead
63 <     * control whether or how threads block upon insertion
64 <     * of request or data nodes into the dual queue.
60 >     * The main extension is to provide different Wait modes for the
61 >     * main "xfer" method that puts or takes items.  These don't
62 >     * impact the basic dual-queue logic, but instead control whether
63 >     * or how threads block upon insertion of request or data nodes
64 >     * into the dual queue. It also uses slightly different
65 >     * conventions for tracking whether nodes are off-list or
66 >     * cancelled.
67       */
68  
69      // Wait modes for xfer method
# Line 97 | Line 97 | public class LinkedTransferQueue<E> exte
97      static final long spinForTimeoutThreshold = 1000L;
98  
99      /**
100 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
101 <     * AtomicReference to represent item. Uses Object, not E, to allow
102 <     * setting item to "this" after use, to avoid garbage
103 <     * retention. Similarly, setting the next field to this is used as
104 <     * sentinel that node is off list.
100 >     * Node class for LinkedTransferQueue. Opportunistically
101 >     * subclasses from AtomicReference to represent item. Uses Object,
102 >     * not E, to allow setting item to "this" after use, to avoid
103 >     * garbage retention. Similarly, setting the next field to this is
104 >     * used as sentinel that node is off list.
105       */
106      static final class QNode extends AtomicReference<Object> {
107          volatile QNode next;
# Line 116 | Line 116 | public class LinkedTransferQueue<E> exte
116              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117              (QNode.class, QNode.class, "next");
118  
119 <        boolean casNext(QNode cmp, QNode val) {
119 >        final boolean casNext(QNode cmp, QNode val) {
120              return nextUpdater.compareAndSet(this, cmp, val);
121          }
122 +
123 +        final void clearNext() {
124 +            nextUpdater.lazySet(this, this);
125 +        }
126 +
127      }
128  
129      /**
# Line 151 | Line 156 | public class LinkedTransferQueue<E> exte
156       */
157      private boolean advanceHead(QNode h, QNode nh) {
158          if (h == head.get() && head.compareAndSet(h, nh)) {
159 <            h.next = h; // forget old next
159 >            h.clearNext(); // forget old next
160              return true;
161          }
162          return false;
# Line 159 | Line 164 | public class LinkedTransferQueue<E> exte
164  
165      /**
166       * Puts or takes an item. Used for most queue operations (except
167 <     * poll() and tryTransfer())
167 >     * poll() and tryTransfer()). See the similar code in
168 >     * SynchronousQueue for detailed explanation.
169       * @param e the item or if null, signifies that this is a take
170       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
171       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 266 | Line 272 | public class LinkedTransferQueue<E> exte
272              Object x = s.get();
273              if (x != e) {                 // Node was matched or cancelled
274                  advanceHead(pred, s);     // unlink if head
275 <                if (x == s)               // was cancelled
276 <                    return clean(pred, s);
275 >                if (x == s) {              // was cancelled
276 >                    clean(pred, s);
277 >                    return null;
278 >                }
279                  else if (x != null) {
280                      s.set(s);             // avoid garbage retention
281                      return x;
# Line 275 | Line 283 | public class LinkedTransferQueue<E> exte
283                  else
284                      return e;
285              }
278
286              if (mode == TIMEOUT) {
287                  long now = System.nanoTime();
288                  nanos -= now - lastTime;
# Line 296 | Line 303 | public class LinkedTransferQueue<E> exte
303              else if (s.waiter == null)
304                  s.waiter = w;
305              else if (mode != TIMEOUT) {
306 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
306 >                LockSupport.park(this);
307                  s.waiter = null;
308                  spins = -1;
309              }
310              else if (nanos > spinForTimeoutThreshold) {
311 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
311 >                LockSupport.parkNanos(this, nanos);
312                  s.waiter = null;
313                  spins = -1;
314              }
# Line 311 | Line 316 | public class LinkedTransferQueue<E> exte
316      }
317  
318      /**
319 +     * Returns validated tail for use in cleaning methods
320 +     */
321 +    private QNode getValidatedTail() {
322 +        for (;;) {
323 +            QNode h = head.get();
324 +            QNode first = h.next;
325 +            if (first != null && first.next == first) { // help advance
326 +                advanceHead(h, first);
327 +                continue;
328 +            }
329 +            QNode t = tail.get();
330 +            QNode last = t.next;
331 +            if (t == tail.get()) {
332 +                if (last != null)
333 +                    tail.compareAndSet(t, last); // help advance
334 +                else
335 +                    return t;
336 +            }
337 +        }
338 +    }
339 +
340 +    /**
341       * Gets rid of cancelled node s with original predecessor pred.
342 <     * @return null (to simplify use by callers)
342 >     * @param pred predecessor of cancelled node
343 >     * @param s the cancelled node
344       */
345 <    private Object clean(QNode pred, QNode s) {
345 >    private void clean(QNode pred, QNode s) {
346          Thread w = s.waiter;
347          if (w != null) {             // Wake up thread
348              s.waiter = null;
# Line 322 | Line 350 | public class LinkedTransferQueue<E> exte
350                  LockSupport.unpark(w);
351          }
352  
353 <        for (;;) {
354 <            if (pred.next != s) // already cleaned
355 <                return null;
356 <            QNode h = head.get();
357 <            QNode hn = h.next;   // Absorb cancelled first node as head
358 <            if (hn != null && hn.next == hn) {
359 <                advanceHead(h, hn);
360 <                continue;
361 <            }
362 <            QNode t = tail.get();      // Ensure consistent read for tail
363 <            if (t == h)
364 <                return null;
365 <            QNode tn = t.next;
366 <            if (t != tail.get())
367 <                continue;
368 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
353 >        if (pred == null)
354 >            return;
355 >
356 >        /*
357 >         * At any given time, exactly one node on list cannot be
358 >         * deleted -- the last inserted node. To accommodate this, if
359 >         * we cannot delete s, we save its predecessor as "cleanMe",
360 >         * processing the previously saved version first. At least one
361 >         * of node s or the node previously saved can always be
362 >         * processed, so this always terminates.
363 >         */
364 >        while (pred.next == s) {
365 >            QNode oldpred = reclean();  // First, help get rid of cleanMe
366 >            QNode t = getValidatedTail();
367 >            if (s != t) {               // If not tail, try to unsplice
368 >                QNode sn = s.next;      // s.next == s means s already off list
369                  if (sn == s || pred.casNext(s, sn))
370 <                    return null;
370 >                    break;
371              }
372 <            QNode dp = cleanMe.get();
373 <            if (dp != null) {    // Try unlinking previous cancelled node
374 <                QNode d = dp.next;
375 <                QNode dn;
376 <                if (d == null ||               // d is gone or
377 <                    d == dp ||                 // d is off list or
378 <                    d.get() != d ||            // d not cancelled or
379 <                    (d != t &&                 // d not tail and
380 <                     (dn = d.next) != null &&  //   has successor
381 <                     dn != d &&                //   that is on list
382 <                     dp.casNext(d, dn)))       // d unspliced
383 <                    cleanMe.compareAndSet(dp, null);
384 <                if (dp == pred)
385 <                    return null;      // s is already saved node
372 >            else if (oldpred == pred || // Already saved
373 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
374 >                break;                  // Postpone cleaning
375 >        }
376 >    }
377 >
378 >    /**
379 >     * Tries to unsplice the cancelled node held in cleanMe that was
380 >     * previously uncleanable because it was at tail.
381 >     * @return current cleanMe node (or null)
382 >     */
383 >    private QNode reclean() {
384 >        /*
385 >         * cleanMe is, or at one time was, predecessor of cancelled
386 >         * node s that was the tail so could not be unspliced.  If s
387 >         * is no longer the tail, try to unsplice if necessary and
388 >         * make cleanMe slot available.  This differs from similar
389 >         * code in clean() because we must check that pred still
390 >         * points to a cancelled node that must be unspliced -- if
391 >         * not, we can (must) clear cleanMe without unsplicing.
392 >         * This can loop only due to contention on casNext or
393 >         * clearing cleanMe.
394 >         */
395 >        QNode pred;
396 >        while ((pred = cleanMe.get()) != null) {
397 >            QNode t = getValidatedTail();
398 >            QNode s = pred.next;
399 >            if (s != t) {
400 >                QNode sn;
401 >                if (s == null || s == pred || s.get() != s ||
402 >                    (sn = s.next) == s || pred.casNext(s, sn))
403 >                    cleanMe.compareAndSet(pred, null);
404              }
405 <            else if (cleanMe.compareAndSet(null, pred))
406 <                return null;          // Postpone cleaning s
405 >            else // s is still tail; cannot clean
406 >                break;
407          }
408 +        return pred;
409      }
410  
411      /**
412 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
412 >     * Creates an initially empty {@code LinkedTransferQueue}.
413       */
414      public LinkedTransferQueue() {
415          QNode dummy = new QNode(null, false);
# Line 377 | Line 419 | public class LinkedTransferQueue<E> exte
419      }
420  
421      /**
422 <     * Creates a <tt>LinkedTransferQueue</tt>
422 >     * Creates a {@code LinkedTransferQueue}
423       * initially containing the elements of the given collection,
424       * added in traversal order of the collection's iterator.
425       * @param c the collection of elements to initially contain
# Line 409 | Line 451 | public class LinkedTransferQueue<E> exte
451          return true;
452      }
453  
454 +    public boolean add(E e) {
455 +        if (e == null) throw new NullPointerException();
456 +        xfer(e, NOWAIT, 0);
457 +        return true;
458 +    }
459 +
460      public void transfer(E e) throws InterruptedException {
461          if (e == null) throw new NullPointerException();
462          if (xfer(e, WAIT, 0) == null) {
# Line 505 | Line 553 | public class LinkedTransferQueue<E> exte
553                          return h;
554                  }
555              }
556 +            reclean();
557          }
558      }
559  
# Line 521 | Line 570 | public class LinkedTransferQueue<E> exte
570       * if subsequently removed.
571       */
572      class Itr implements Iterator<E> {
573 <        QNode nextNode;    // Next node to return next
574 <        QNode currentNode; // last returned node, for remove()
575 <        QNode prevNode;    // predecessor of last returned node
573 >        QNode next;        // node to return next
574 >        QNode pnext;       // predecessor of next
575 >        QNode snext;       // successor of next
576 >        QNode curr;        // last returned node, for remove()
577 >        QNode pcurr;       // predecessor of curr, for remove()
578          E nextItem;        // Cache of next item, once commited to in next
579  
580          Itr() {
581 <            nextNode = traversalHead();
531 <            advance();
581 >            findNext();
582          }
583  
584 <        E advance() {
585 <            prevNode = currentNode;
586 <            currentNode = nextNode;
587 <            E x = nextItem;
538 <
539 <            QNode p = nextNode.next;
584 >        /**
585 >         * Ensure next points to next valid node, or null if none.
586 >         */
587 >        void findNext() {
588              for (;;) {
589 <                if (p == null || !p.isData) {
590 <                    nextNode = null;
591 <                    nextItem = null;
592 <                    return x;
589 >                QNode pred = pnext;
590 >                QNode q = next;
591 >                if (pred == null || pred == q) {
592 >                    pred = traversalHead();
593 >                    q = pred.next;
594                  }
595 <                Object item = p.get();
596 <                if (item != p && item != null) {
597 <                    nextNode = p;
549 <                    nextItem = (E)item;
550 <                    return x;
595 >                if (q == null || !q.isData) {
596 >                    next = null;
597 >                    return;
598                  }
599 <                prevNode = p;
600 <                p = p.next;
599 >                Object x = q.get();
600 >                QNode s = q.next;
601 >                if (x != null && q != x && q != s) {
602 >                    nextItem = (E)x;
603 >                    snext = s;
604 >                    pnext = pred;
605 >                    next = q;
606 >                    return;
607 >                }
608 >                pnext = q;
609 >                next = s;
610              }
611          }
612  
613          public boolean hasNext() {
614 <            return nextNode != null;
614 >            return next != null;
615          }
616  
617          public E next() {
618 <            if (nextNode == null) throw new NoSuchElementException();
619 <            return advance();
618 >            if (next == null) throw new NoSuchElementException();
619 >            pcurr = pnext;
620 >            curr = next;
621 >            pnext = next;
622 >            next = snext;
623 >            E x = nextItem;
624 >            findNext();
625 >            return x;
626          }
627  
628          public void remove() {
629 <            QNode p = currentNode;
630 <            QNode prev = prevNode;
569 <            if (prev == null || p == null)
629 >            QNode p = curr;
630 >            if (p == null)
631                  throw new IllegalStateException();
632              Object x = p.get();
633              if (x != null && x != p && p.compareAndSet(x, p))
634 <                clean(prev, p);
634 >                clean(pcurr, p);
635          }
636      }
637  
# Line 620 | Line 681 | public class LinkedTransferQueue<E> exte
681  
682      /**
683       * Returns the number of elements in this queue.  If this queue
684 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
685 <     * <tt>Integer.MAX_VALUE</tt>.
684 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
685 >     * {@code Integer.MAX_VALUE}.
686       *
687       * <p>Beware that, unlike in most collections, this method is
688       * <em>NOT</em> a constant-time operation. Because of the
# Line 659 | Line 720 | public class LinkedTransferQueue<E> exte
720          return Integer.MAX_VALUE;
721      }
722  
723 +    public boolean remove(Object o) {
724 +        if (o == null)
725 +            return false;
726 +        for (;;) {
727 +            QNode pred = traversalHead();
728 +            for (;;) {
729 +                QNode q = pred.next;
730 +                if (q == null || !q.isData)
731 +                    return false;
732 +                if (q == pred) // restart
733 +                    break;
734 +                Object x = q.get();
735 +                if (x != null && x != q && o.equals(x) &&
736 +                    q.compareAndSet(x, q)) {
737 +                    clean(pred, q);
738 +                    return true;
739 +                }
740 +                pred = q;
741 +            }
742 +        }
743 +    }
744 +
745      /**
746       * Save the state to a stream (that is, serialize it).
747       *
748 <     * @serialData All of the elements (each an <tt>E</tt>) in
748 >     * @serialData All of the elements (each an {@code E}) in
749       * the proper order, followed by a null
750       * @param s the stream
751       */
# Line 695 | Line 778 | public class LinkedTransferQueue<E> exte
778  
779  
780      // Support for resetting head/tail while deserializing
781 +    private void resetHeadAndTail() {
782 +        QNode dummy = new QNode(null, false);
783 +        _unsafe.putObjectVolatile(this, headOffset,
784 +                                  new PaddedAtomicReference<QNode>(dummy));
785 +        _unsafe.putObjectVolatile(this, tailOffset,
786 +                                  new PaddedAtomicReference<QNode>(dummy));
787 +        _unsafe.putObjectVolatile(this, cleanMeOffset,
788 +                                  new PaddedAtomicReference<QNode>(null));
789 +    }
790  
791      // Temporary Unsafe mechanics for preliminary release
792 +    private static Unsafe getUnsafe() throws Throwable {
793 +        try {
794 +            return Unsafe.getUnsafe();
795 +        } catch (SecurityException se) {
796 +            try {
797 +                return java.security.AccessController.doPrivileged
798 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
799 +                        public Unsafe run() throws Exception {
800 +                            return getUnsafePrivileged();
801 +                        }});
802 +            } catch (java.security.PrivilegedActionException e) {
803 +                throw e.getCause();
804 +            }
805 +        }
806 +    }
807 +
808 +    private static Unsafe getUnsafePrivileged()
809 +            throws NoSuchFieldException, IllegalAccessException {
810 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
811 +        f.setAccessible(true);
812 +        return (Unsafe) f.get(null);
813 +    }
814 +
815 +    private static long fieldOffset(String fieldName)
816 +            throws NoSuchFieldException {
817 +        return _unsafe.objectFieldOffset
818 +            (LinkedTransferQueue.class.getDeclaredField(fieldName));
819 +    }
820 +
821      private static final Unsafe _unsafe;
822      private static final long headOffset;
823      private static final long tailOffset;
824      private static final long cleanMeOffset;
825      static {
826          try {
827 <            if (LinkedTransferQueue.class.getClassLoader() != null) {
828 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
829 <                f.setAccessible(true);
830 <                _unsafe = (Unsafe)f.get(null);
831 <            }
711 <            else
712 <                _unsafe = Unsafe.getUnsafe();
713 <            headOffset = _unsafe.objectFieldOffset
714 <                (LinkedTransferQueue.class.getDeclaredField("head"));
715 <            tailOffset = _unsafe.objectFieldOffset
716 <                (LinkedTransferQueue.class.getDeclaredField("tail"));
717 <            cleanMeOffset = _unsafe.objectFieldOffset
718 <                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
719 <        } catch (Exception e) {
827 >            _unsafe = getUnsafe();
828 >            headOffset = fieldOffset("head");
829 >            tailOffset = fieldOffset("tail");
830 >            cleanMeOffset = fieldOffset("cleanMe");
831 >        } catch (Throwable e) {
832              throw new RuntimeException("Could not initialize intrinsics", e);
833          }
834      }
835  
724    private void resetHeadAndTail() {
725        QNode dummy = new QNode(null, false);
726        _unsafe.putObjectVolatile(this, headOffset, dummy);
727        _unsafe.putObjectVolatile(this, tailOffset, dummy);
728        _unsafe.putObjectVolatile(this, cleanMeOffset,
729                                  new PaddedAtomicReference<QNode>(null));
730
731    }
732
836   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines