ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.6 by jsr166, Fri Jul 25 18:13:15 2008 UTC vs.
Revision 1.15 by dl, Wed Mar 25 13:43:42 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.locks.*;
10   import java.util.concurrent.atomic.*;
11   import java.util.*;
12   import java.io.*;
13 + import sun.misc.Unsafe;
14 + import java.lang.reflect.*;
15  
16   /**
17   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 19 | Line 21 | import java.io.*;
21   * producer.  The <em>tail</em> of the queue is that element that has
22   * been on the queue the shortest time for some producer.
23   *
24 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
24 > * <p>Beware that, unlike in most collections, the {@code size}
25   * method is <em>NOT</em> a constant-time operation. Because of the
26   * asynchronous nature of these queues, determining the current number
27   * of elements requires a traversal of the elements.
# Line 49 | Line 51 | public class LinkedTransferQueue<E> exte
51      private static final long serialVersionUID = -3223113410248163686L;
52  
53      /*
52     * This is still a work in progress...
53     *
54       * This class extends the approach used in FIFO-mode
55       * SynchronousQueues. See the internal documentation, as well as
56       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57       * Lea & Scott
58       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59       *
60 <     * The main extension is to provide different Wait modes
61 <     * for the main "xfer" method that puts or takes items.
62 <     * These don't impact the basic dual-queue logic, but instead
63 <     * control whether or how threads block upon insertion
64 <     * of request or data nodes into the dual queue.
60 >     * The main extension is to provide different Wait modes for the
61 >     * main "xfer" method that puts or takes items.  These don't
62 >     * impact the basic dual-queue logic, but instead control whether
63 >     * or how threads block upon insertion of request or data nodes
64 >     * into the dual queue. It also uses slightly different
65 >     * conventions for tracking whether nodes are off-list or
66 >     * cancelled.
67       */
68  
69      // Wait modes for xfer method
# Line 95 | Line 97 | public class LinkedTransferQueue<E> exte
97      static final long spinForTimeoutThreshold = 1000L;
98  
99      /**
100 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
101 <     * AtomicReference to represent item. Uses Object, not E, to allow
102 <     * setting item to "this" after use, to avoid garbage
103 <     * retention. Similarly, setting the next field to this is used as
104 <     * sentinel that node is off list.
100 >     * Node class for LinkedTransferQueue. Opportunistically
101 >     * subclasses from AtomicReference to represent item. Uses Object,
102 >     * not E, to allow setting item to "this" after use, to avoid
103 >     * garbage retention. Similarly, setting the next field to this is
104 >     * used as sentinel that node is off list.
105       */
106      static final class QNode extends AtomicReference<Object> {
107          volatile QNode next;
# Line 114 | Line 116 | public class LinkedTransferQueue<E> exte
116              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117              (QNode.class, QNode.class, "next");
118  
119 <        boolean casNext(QNode cmp, QNode val) {
119 >        final boolean casNext(QNode cmp, QNode val) {
120              return nextUpdater.compareAndSet(this, cmp, val);
121          }
122 +
123 +        final void clearNext() {
124 +            nextUpdater.lazySet(this, this);
125 +        }
126 +
127      }
128  
129      /**
# Line 131 | Line 138 | public class LinkedTransferQueue<E> exte
138      }
139  
140  
141 <    private final QNode dummy = new QNode(null, false);
142 <    private final PaddedAtomicReference<QNode> head =
143 <        new PaddedAtomicReference<QNode>(dummy);
144 <    private final PaddedAtomicReference<QNode> tail =
138 <        new PaddedAtomicReference<QNode>(dummy);
141 >    /** head of the queue */
142 >    private transient final PaddedAtomicReference<QNode> head;
143 >    /** tail of the queue */
144 >    private transient final PaddedAtomicReference<QNode> tail;
145  
146      /**
147       * Reference to a cancelled node that might not yet have been
148       * unlinked from queue because it was the last inserted node
149       * when it cancelled.
150       */
151 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
151 >    private transient final PaddedAtomicReference<QNode> cleanMe;
152  
153      /**
154       * Tries to cas nh as new head; if successful, unlink
# Line 151 | Line 156 | public class LinkedTransferQueue<E> exte
156       */
157      private boolean advanceHead(QNode h, QNode nh) {
158          if (h == head.get() && head.compareAndSet(h, nh)) {
159 <            h.next = h; // forget old next
159 >            h.clearNext(); // forget old next
160              return true;
161          }
162          return false;
# Line 159 | Line 164 | public class LinkedTransferQueue<E> exte
164  
165      /**
166       * Puts or takes an item. Used for most queue operations (except
167 <     * poll() and tryTransfer())
167 >     * poll() and tryTransfer()). See the similar code in
168 >     * SynchronousQueue for detailed explanation.
169       * @param e the item or if null, signifies that this is a take
170       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
171       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 266 | Line 272 | public class LinkedTransferQueue<E> exte
272              Object x = s.get();
273              if (x != e) {                 // Node was matched or cancelled
274                  advanceHead(pred, s);     // unlink if head
275 <                if (x == s)               // was cancelled
276 <                    return clean(pred, s);
275 >                if (x == s) {              // was cancelled
276 >                    clean(pred, s);
277 >                    return null;
278 >                }
279                  else if (x != null) {
280                      s.set(s);             // avoid garbage retention
281                      return x;
# Line 275 | Line 283 | public class LinkedTransferQueue<E> exte
283                  else
284                      return e;
285              }
278
286              if (mode == TIMEOUT) {
287                  long now = System.nanoTime();
288                  nanos -= now - lastTime;
# Line 296 | Line 303 | public class LinkedTransferQueue<E> exte
303              else if (s.waiter == null)
304                  s.waiter = w;
305              else if (mode != TIMEOUT) {
306 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
306 >                LockSupport.park(this);
307                  s.waiter = null;
308                  spins = -1;
309              }
310              else if (nanos > spinForTimeoutThreshold) {
311 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
311 >                LockSupport.parkNanos(this, nanos);
312                  s.waiter = null;
313                  spins = -1;
314              }
# Line 311 | Line 316 | public class LinkedTransferQueue<E> exte
316      }
317  
318      /**
319 +     * Returns validated tail for use in cleaning methods
320 +     */
321 +    private QNode getValidatedTail() {
322 +        for (;;) {
323 +            QNode h = head.get();
324 +            QNode first = h.next;
325 +            if (first != null && first.next == first) { // help advance
326 +                advanceHead(h, first);
327 +                continue;
328 +            }
329 +            QNode t = tail.get();
330 +            QNode last = t.next;
331 +            if (t == tail.get()) {
332 +                if (last != null)
333 +                    tail.compareAndSet(t, last); // help advance
334 +                else
335 +                    return t;
336 +            }
337 +        }
338 +    }
339 +
340 +    /**
341       * Gets rid of cancelled node s with original predecessor pred.
342 <     * @return null (to simplify use by callers)
342 >     * @param pred predecessor of cancelled node
343 >     * @param s the cancelled node
344       */
345 <    private Object clean(QNode pred, QNode s) {
345 >    private void clean(QNode pred, QNode s) {
346          Thread w = s.waiter;
347          if (w != null) {             // Wake up thread
348              s.waiter = null;
# Line 322 | Line 350 | public class LinkedTransferQueue<E> exte
350                  LockSupport.unpark(w);
351          }
352  
353 <        for (;;) {
354 <            if (pred.next != s) // already cleaned
355 <                return null;
356 <            QNode h = head.get();
357 <            QNode hn = h.next;   // Absorb cancelled first node as head
358 <            if (hn != null && hn.next == hn) {
359 <                advanceHead(h, hn);
360 <                continue;
361 <            }
362 <            QNode t = tail.get();      // Ensure consistent read for tail
363 <            if (t == h)
364 <                return null;
365 <            QNode tn = t.next;
366 <            if (t != tail.get())
367 <                continue;
368 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
353 >        if (pred == null)
354 >            return;
355 >
356 >        /*
357 >         * At any given time, exactly one node on list cannot be
358 >         * deleted -- the last inserted node. To accommodate this, if
359 >         * we cannot delete s, we save its predecessor as "cleanMe",
360 >         * processing the previously saved version first. At least one
361 >         * of node s or the node previously saved can always be
362 >         * processed, so this always terminates.
363 >         */
364 >        while (pred.next == s) {
365 >            QNode oldpred = reclean();  // First, help get rid of cleanMe
366 >            QNode t = getValidatedTail();
367 >            if (s != t) {               // If not tail, try to unsplice
368 >                QNode sn = s.next;      // s.next == s means s already off list
369                  if (sn == s || pred.casNext(s, sn))
370 <                    return null;
370 >                    break;
371              }
372 <            QNode dp = cleanMe.get();
373 <            if (dp != null) {    // Try unlinking previous cancelled node
374 <                QNode d = dp.next;
375 <                QNode dn;
376 <                if (d == null ||               // d is gone or
377 <                    d == dp ||                 // d is off list or
378 <                    d.get() != d ||            // d not cancelled or
379 <                    (d != t &&                 // d not tail and
380 <                     (dn = d.next) != null &&  //   has successor
381 <                     dn != d &&                //   that is on list
382 <                     dp.casNext(d, dn)))       // d unspliced
383 <                    cleanMe.compareAndSet(dp, null);
384 <                if (dp == pred)
385 <                    return null;      // s is already saved node
372 >            else if (oldpred == pred || // Already saved
373 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
374 >                break;                  // Postpone cleaning
375 >        }
376 >    }
377 >
378 >    /**
379 >     * Tries to unsplice the cancelled node held in cleanMe that was
380 >     * previously uncleanable because it was at tail.
381 >     * @return current cleanMe node (or null)
382 >     */
383 >    private QNode reclean() {
384 >        /*
385 >         * cleanMe is, or at one time was, predecessor of cancelled
386 >         * node s that was the tail so could not be unspliced.  If s
387 >         * is no longer the tail, try to unsplice if necessary and
388 >         * make cleanMe slot available.  This differs from similar
389 >         * code in clean() because we must check that pred still
390 >         * points to a cancelled node that must be unspliced -- if
391 >         * not, we can (must) clear cleanMe without unsplicing.
392 >         * This can loop only due to contention on casNext or
393 >         * clearing cleanMe.
394 >         */
395 >        QNode pred;
396 >        while ((pred = cleanMe.get()) != null) {
397 >            QNode t = getValidatedTail();
398 >            QNode s = pred.next;
399 >            if (s != t) {
400 >                QNode sn;
401 >                if (s == null || s == pred || s.get() != s ||
402 >                    (sn = s.next) == s || pred.casNext(s, sn))
403 >                    cleanMe.compareAndSet(pred, null);
404              }
405 <            else if (cleanMe.compareAndSet(null, pred))
406 <                return null;          // Postpone cleaning s
405 >            else // s is still tail; cannot clean
406 >                break;
407          }
408 +        return pred;
409      }
410  
411      /**
412 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
412 >     * Creates an initially empty {@code LinkedTransferQueue}.
413       */
414      public LinkedTransferQueue() {
415 +        QNode dummy = new QNode(null, false);
416 +        head = new PaddedAtomicReference<QNode>(dummy);
417 +        tail = new PaddedAtomicReference<QNode>(dummy);
418 +        cleanMe = new PaddedAtomicReference<QNode>(null);
419      }
420  
421      /**
422 <     * Creates a <tt>LinkedTransferQueue</tt>
422 >     * Creates a {@code LinkedTransferQueue}
423       * initially containing the elements of the given collection,
424       * added in traversal order of the collection's iterator.
425       * @param c the collection of elements to initially contain
# Line 381 | Line 427 | public class LinkedTransferQueue<E> exte
427       *         of its elements are null
428       */
429      public LinkedTransferQueue(Collection<? extends E> c) {
430 +        this();
431          addAll(c);
432      }
433  
# Line 404 | Line 451 | public class LinkedTransferQueue<E> exte
451          return true;
452      }
453  
454 +    public boolean add(E e) {
455 +        if (e == null) throw new NullPointerException();
456 +        xfer(e, NOWAIT, 0);
457 +        return true;
458 +    }
459 +
460      public void transfer(E e) throws InterruptedException {
461          if (e == null) throw new NullPointerException();
462          if (xfer(e, WAIT, 0) == null) {
# Line 500 | Line 553 | public class LinkedTransferQueue<E> exte
553                          return h;
554                  }
555              }
556 +            reclean();
557          }
558      }
559  
# Line 516 | Line 570 | public class LinkedTransferQueue<E> exte
570       * if subsequently removed.
571       */
572      class Itr implements Iterator<E> {
573 <        QNode nextNode;    // Next node to return next
574 <        QNode currentNode; // last returned node, for remove()
575 <        QNode prevNode;    // predecessor of last returned node
573 >        QNode next;        // node to return next
574 >        QNode pnext;       // predecessor of next
575 >        QNode snext;       // successor of next
576 >        QNode curr;        // last returned node, for remove()
577 >        QNode pcurr;       // predecessor of curr, for remove()
578          E nextItem;        // Cache of next item, once commited to in next
579  
580          Itr() {
581 <            nextNode = traversalHead();
526 <            advance();
581 >            findNext();
582          }
583  
584 <        E advance() {
585 <            prevNode = currentNode;
586 <            currentNode = nextNode;
587 <            E x = nextItem;
533 <
534 <            QNode p = nextNode.next;
584 >        /**
585 >         * Ensure next points to next valid node, or null if none.
586 >         */
587 >        void findNext() {
588              for (;;) {
589 <                if (p == null || !p.isData) {
590 <                    nextNode = null;
591 <                    nextItem = null;
592 <                    return x;
589 >                QNode pred = pnext;
590 >                QNode q = next;
591 >                if (pred == null || pred == q) {
592 >                    pred = traversalHead();
593 >                    q = pred.next;
594                  }
595 <                Object item = p.get();
596 <                if (item != p && item != null) {
597 <                    nextNode = p;
544 <                    nextItem = (E)item;
545 <                    return x;
595 >                if (q == null || !q.isData) {
596 >                    next = null;
597 >                    return;
598                  }
599 <                prevNode = p;
600 <                p = p.next;
599 >                Object x = q.get();
600 >                QNode s = q.next;
601 >                if (x != null && q != x && q != s) {
602 >                    nextItem = (E)x;
603 >                    snext = s;
604 >                    pnext = pred;
605 >                    next = q;
606 >                    return;
607 >                }
608 >                pnext = q;
609 >                next = s;
610              }
611          }
612  
613          public boolean hasNext() {
614 <            return nextNode != null;
614 >            return next != null;
615          }
616  
617          public E next() {
618 <            if (nextNode == null) throw new NoSuchElementException();
619 <            return advance();
618 >            if (next == null) throw new NoSuchElementException();
619 >            pcurr = pnext;
620 >            curr = next;
621 >            pnext = next;
622 >            next = snext;
623 >            E x = nextItem;
624 >            findNext();
625 >            return x;
626          }
627  
628          public void remove() {
629 <            QNode p = currentNode;
630 <            QNode prev = prevNode;
564 <            if (prev == null || p == null)
629 >            QNode p = curr;
630 >            if (p == null)
631                  throw new IllegalStateException();
632              Object x = p.get();
633              if (x != null && x != p && p.compareAndSet(x, p))
634 <                clean(prev, p);
634 >                clean(pcurr, p);
635          }
636      }
637  
# Line 615 | Line 681 | public class LinkedTransferQueue<E> exte
681  
682      /**
683       * Returns the number of elements in this queue.  If this queue
684 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
685 <     * <tt>Integer.MAX_VALUE</tt>.
684 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
685 >     * {@code Integer.MAX_VALUE}.
686       *
687       * <p>Beware that, unlike in most collections, this method is
688       * <em>NOT</em> a constant-time operation. Because of the
# Line 654 | Line 720 | public class LinkedTransferQueue<E> exte
720          return Integer.MAX_VALUE;
721      }
722  
723 +    public boolean remove(Object o) {
724 +        if (o == null)
725 +            return false;
726 +        for (;;) {
727 +            QNode pred = traversalHead();
728 +            for (;;) {
729 +                QNode q = pred.next;
730 +                if (q == null || !q.isData)
731 +                    return false;
732 +                if (q == pred) // restart
733 +                    break;
734 +                Object x = q.get();
735 +                if (x != null && x != q && o.equals(x) &&
736 +                    q.compareAndSet(x, q)) {
737 +                    clean(pred, q);
738 +                    return true;
739 +                }
740 +                pred = q;
741 +            }
742 +        }
743 +    }
744 +
745      /**
746       * Save the state to a stream (that is, serialize it).
747       *
748 <     * @serialData All of the elements (each an <tt>E</tt>) in
748 >     * @serialData All of the elements (each an {@code E}) in
749       * the proper order, followed by a null
750       * @param s the stream
751       */
# Line 678 | Line 766 | public class LinkedTransferQueue<E> exte
766      private void readObject(java.io.ObjectInputStream s)
767          throws java.io.IOException, ClassNotFoundException {
768          s.defaultReadObject();
769 +        resetHeadAndTail();
770          for (;;) {
771              E item = (E)s.readObject();
772              if (item == null)
# Line 686 | Line 775 | public class LinkedTransferQueue<E> exte
775                  offer(item);
776          }
777      }
778 +
779 +
780 +    // Support for resetting head/tail while deserializing
781 +    private void resetHeadAndTail() {
782 +        QNode dummy = new QNode(null, false);
783 +        _unsafe.putObjectVolatile(this, headOffset,
784 +                                  new PaddedAtomicReference<QNode>(dummy));
785 +        _unsafe.putObjectVolatile(this, tailOffset,
786 +                                  new PaddedAtomicReference<QNode>(dummy));
787 +        _unsafe.putObjectVolatile(this, cleanMeOffset,
788 +                                  new PaddedAtomicReference<QNode>(null));
789 +    }
790 +
791 +    // Temporary Unsafe mechanics for preliminary release
792 +    private static Unsafe getUnsafe() throws Throwable {
793 +        try {
794 +            return Unsafe.getUnsafe();
795 +        } catch (SecurityException se) {
796 +            try {
797 +                return java.security.AccessController.doPrivileged
798 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
799 +                        public Unsafe run() throws Exception {
800 +                            return getUnsafePrivileged();
801 +                        }});
802 +            } catch (java.security.PrivilegedActionException e) {
803 +                throw e.getCause();
804 +            }
805 +        }
806 +    }
807 +
808 +    private static Unsafe getUnsafePrivileged()
809 +            throws NoSuchFieldException, IllegalAccessException {
810 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
811 +        f.setAccessible(true);
812 +        return (Unsafe) f.get(null);
813 +    }
814 +
815 +    private static long fieldOffset(String fieldName)
816 +            throws NoSuchFieldException {
817 +        return _unsafe.objectFieldOffset
818 +            (LinkedTransferQueue.class.getDeclaredField(fieldName));
819 +    }
820 +
821 +    private static final Unsafe _unsafe;
822 +    private static final long headOffset;
823 +    private static final long tailOffset;
824 +    private static final long cleanMeOffset;
825 +    static {
826 +        try {
827 +            _unsafe = getUnsafe();
828 +            headOffset = fieldOffset("head");
829 +            tailOffset = fieldOffset("tail");
830 +            cleanMeOffset = fieldOffset("cleanMe");
831 +        } catch (Throwable e) {
832 +            throw new RuntimeException("Could not initialize intrinsics", e);
833 +        }
834 +    }
835 +
836   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines