ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.4 by jsr166, Fri Jul 25 18:10:41 2008 UTC vs.
Revision 1.14 by jsr166, Thu Mar 19 05:10:42 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.locks.*;
10   import java.util.concurrent.atomic.*;
11   import java.util.*;
12   import java.io.*;
13 + import sun.misc.Unsafe;
14 + import java.lang.reflect.*;
15  
16   /**
17   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 19 | Line 21 | import java.io.*;
21   * producer.  The <em>tail</em> of the queue is that element that has
22   * been on the queue the shortest time for some producer.
23   *
24 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
24 > * <p>Beware that, unlike in most collections, the {@code size}
25   * method is <em>NOT</em> a constant-time operation. Because of the
26   * asynchronous nature of these queues, determining the current number
27   * of elements requires a traversal of the elements.
# Line 49 | Line 51 | public class LinkedTransferQueue<E> exte
51      private static final long serialVersionUID = -3223113410248163686L;
52  
53      /*
52     * This is still a work in progress...
53     *
54       * This class extends the approach used in FIFO-mode
55       * SynchronousQueues. See the internal documentation, as well as
56       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57       * Lea & Scott
58       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59       *
60 <     * The main extension is to provide different Wait modes
61 <     * for the main "xfer" method that puts or takes items.
62 <     * These don't impact the basic dual-queue logic, but instead
63 <     * control whether or how threads block upon insertion
64 <     * of request or data nodes into the dual queue.
60 >     * The main extension is to provide different Wait modes for the
61 >     * main "xfer" method that puts or takes items.  These don't
62 >     * impact the basic dual-queue logic, but instead control whether
63 >     * or how threads block upon insertion of request or data nodes
64 >     * into the dual queue. It also uses slightly different
65 >     * conventions for tracking whether nodes are off-list or
66 >     * cancelled.
67       */
68  
69      // Wait modes for xfer method
# Line 79 | Line 81 | public class LinkedTransferQueue<E> exte
81       * seems not to vary with number of CPUs (beyond 2) so is just
82       * a constant.
83       */
84 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
84 >    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
85  
86      /**
87       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 96 | public class LinkedTransferQueue<E> exte
96       */
97      static final long spinForTimeoutThreshold = 1000L;
98  
99 <    /**
100 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
101 <     * AtomicReference to represent item. Uses Object, not E, to allow
102 <     * setting item to "this" after use, to avoid garbage
103 <     * retention. Similarly, setting the next field to this is used as
104 <     * sentinel that node is off list.
99 >    /**
100 >     * Node class for LinkedTransferQueue. Opportunistically
101 >     * subclasses from AtomicReference to represent item. Uses Object,
102 >     * not E, to allow setting item to "this" after use, to avoid
103 >     * garbage retention. Similarly, setting the next field to this is
104 >     * used as sentinel that node is off list.
105       */
106      static final class QNode extends AtomicReference<Object> {
107          volatile QNode next;
# Line 131 | Line 133 | public class LinkedTransferQueue<E> exte
133      }
134  
135  
136 <    private final QNode dummy = new QNode(null, false);
137 <    private final PaddedAtomicReference<QNode> head =
138 <        new PaddedAtomicReference<QNode>(dummy);
139 <    private final PaddedAtomicReference<QNode> tail =
138 <        new PaddedAtomicReference<QNode>(dummy);
136 >    /** head of the queue */
137 >    private transient final PaddedAtomicReference<QNode> head;
138 >    /** tail of the queue */
139 >    private transient final PaddedAtomicReference<QNode> tail;
140  
141      /**
142       * Reference to a cancelled node that might not yet have been
143       * unlinked from queue because it was the last inserted node
144       * when it cancelled.
145       */
146 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
146 >    private transient final PaddedAtomicReference<QNode> cleanMe;
147  
148      /**
149       * Tries to cas nh as new head; if successful, unlink
# Line 156 | Line 156 | public class LinkedTransferQueue<E> exte
156          }
157          return false;
158      }
159 <    
159 >
160      /**
161       * Puts or takes an item. Used for most queue operations (except
162 <     * poll() and tryTransfer())
162 >     * poll() and tryTransfer()). See the similar code in
163 >     * SynchronousQueue for detailed explanation.
164       * @param e the item or if null, signifies that this is a take
165       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
166       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 188 | Line 189 | public class LinkedTransferQueue<E> exte
189                      return awaitFulfill(t, s, e, mode, nanos);
190                  }
191              }
192 <            
192 >
193              else if (h != null) {
194                  QNode first = h.next;
195 <                if (t == tail.get() && first != null &&
195 >                if (t == tail.get() && first != null &&
196                      advanceHead(h, first)) {
197                      Object x = first.get();
198                      if (x != first && first.compareAndSet(x, e)) {
# Line 228 | Line 229 | public class LinkedTransferQueue<E> exte
229              }
230              else if (h != null) {
231                  QNode first = h.next;
232 <                if (t == tail.get() &&
232 >                if (t == tail.get() &&
233                      first != null &&
234                      advanceHead(h, first)) {
235                      Object x = first.get();
# Line 252 | Line 253 | public class LinkedTransferQueue<E> exte
253       * @param nanos timeout value
254       * @return matched item, or s if cancelled
255       */
256 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
256 >    private Object awaitFulfill(QNode pred, QNode s, Object e,
257                                  int mode, long nanos) {
258          if (mode == NOWAIT)
259              return null;
# Line 266 | Line 267 | public class LinkedTransferQueue<E> exte
267              Object x = s.get();
268              if (x != e) {                 // Node was matched or cancelled
269                  advanceHead(pred, s);     // unlink if head
270 <                if (x == s)               // was cancelled
271 <                    return clean(pred, s);
272 <                else if (x != null) {    
270 >                if (x == s) {              // was cancelled
271 >                    clean(pred, s);
272 >                    return null;
273 >                }
274 >                else if (x != null) {
275                      s.set(s);             // avoid garbage retention
276                      return x;
277                  }
278                  else
279                      return e;
280              }
278
281              if (mode == TIMEOUT) {
282                  long now = System.nanoTime();
283                  nanos -= now - lastTime;
# Line 288 | Line 290 | public class LinkedTransferQueue<E> exte
290              if (spins < 0) {
291                  QNode h = head.get(); // only spin if at head
292                  spins = ((h != null && h.next == s) ?
293 <                         (mode == TIMEOUT?
293 >                         (mode == TIMEOUT?
294                            maxTimedSpins : maxUntimedSpins) : 0);
295              }
296              if (spins > 0)
# Line 296 | Line 298 | public class LinkedTransferQueue<E> exte
298              else if (s.waiter == null)
299                  s.waiter = w;
300              else if (mode != TIMEOUT) {
301 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
301 >                LockSupport.park(this);
302                  s.waiter = null;
303                  spins = -1;
304              }
305              else if (nanos > spinForTimeoutThreshold) {
306 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
306 >                LockSupport.parkNanos(this, nanos);
307                  s.waiter = null;
308                  spins = -1;
309              }
# Line 311 | Line 311 | public class LinkedTransferQueue<E> exte
311      }
312  
313      /**
314 +     * Returns validated tail for use in cleaning methods
315 +     */
316 +    private QNode getValidatedTail() {
317 +        for (;;) {
318 +            QNode h = head.get();
319 +            QNode first = h.next;
320 +            if (first != null && first.next == first) { // help advance
321 +                advanceHead(h, first);
322 +                continue;
323 +            }
324 +            QNode t = tail.get();
325 +            QNode last = t.next;
326 +            if (t == tail.get()) {
327 +                if (last != null)
328 +                    tail.compareAndSet(t, last); // help advance
329 +                else
330 +                    return t;
331 +            }
332 +        }
333 +    }
334 +
335 +    /**
336       * Gets rid of cancelled node s with original predecessor pred.
337 <     * @return null (to simplify use by callers)
337 >     * @param pred predecessor of cancelled node
338 >     * @param s the cancelled node
339       */
340 <    private Object clean(QNode pred, QNode s) {
340 >    private void clean(QNode pred, QNode s) {
341          Thread w = s.waiter;
342          if (w != null) {             // Wake up thread
343              s.waiter = null;
344              if (w != Thread.currentThread())
345                  LockSupport.unpark(w);
346          }
347 <        
348 <        for (;;) {
349 <            if (pred.next != s) // already cleaned
350 <                return null;
351 <            QNode h = head.get();
352 <            QNode hn = h.next;   // Absorb cancelled first node as head
353 <            if (hn != null && hn.next == hn) {
354 <                advanceHead(h, hn);
355 <                continue;
356 <            }
357 <            QNode t = tail.get();      // Ensure consistent read for tail
358 <            if (t == h)
359 <                return null;
337 <            QNode tn = t.next;
338 <            if (t != tail.get())
339 <                continue;
340 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
347 >        /*
348 >         * At any given time, exactly one node on list cannot be
349 >         * deleted -- the last inserted node. To accommodate this, if
350 >         * we cannot delete s, we save its predecessor as "cleanMe",
351 >         * processing the previously saved version first. At least one
352 >         * of node s or the node previously saved can always be
353 >         * processed, so this always terminates.
354 >         */
355 >        while (pred.next == s) {
356 >            QNode oldpred = reclean();  // First, help get rid of cleanMe
357 >            QNode t = getValidatedTail();
358 >            if (s != t) {               // If not tail, try to unsplice
359 >                QNode sn = s.next;      // s.next == s means s already off list
360                  if (sn == s || pred.casNext(s, sn))
361 <                    return null;
361 >                    break;
362              }
363 <            QNode dp = cleanMe.get();
364 <            if (dp != null) {    // Try unlinking previous cancelled node
365 <                QNode d = dp.next;
366 <                QNode dn;
367 <                if (d == null ||               // d is gone or
368 <                    d == dp ||                 // d is off list or
369 <                    d.get() != d ||            // d not cancelled or
370 <                    (d != t &&                 // d not tail and
371 <                     (dn = d.next) != null &&  //   has successor
372 <                     dn != d &&                //   that is on list
373 <                     dp.casNext(d, dn)))       // d unspliced
374 <                    cleanMe.compareAndSet(dp, null);
375 <                if (dp == pred)
376 <                    return null;      // s is already saved node
377 <            }
378 <            else if (cleanMe.compareAndSet(null, pred))
379 <                return null;          // Postpone cleaning s
363 >            else if (oldpred == pred || // Already saved
364 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
365 >                break;                  // Postpone cleaning
366 >        }
367 >    }
368 >
369 >    /**
370 >     * Tries to unsplice the cancelled node held in cleanMe that was
371 >     * previously uncleanable because it was at tail.
372 >     * @return current cleanMe node (or null)
373 >     */
374 >    private QNode reclean() {
375 >        /*
376 >         * cleanMe is, or at one time was, predecessor of cancelled
377 >         * node s that was the tail so could not be unspliced.  If s
378 >         * is no longer the tail, try to unsplice if necessary and
379 >         * make cleanMe slot available.  This differs from similar
380 >         * code in clean() because we must check that pred still
381 >         * points to a cancelled node that must be unspliced -- if
382 >         * not, we can (must) clear cleanMe without unsplicing.
383 >         * This can loop only due to contention on casNext or
384 >         * clearing cleanMe.
385 >         */
386 >        QNode pred;
387 >        while ((pred = cleanMe.get()) != null) {
388 >            QNode t = getValidatedTail();
389 >            QNode s = pred.next;
390 >            if (s != t) {
391 >                QNode sn;
392 >                if (s == null || s == pred || s.get() != s ||
393 >                    (sn = s.next) == s || pred.casNext(s, sn))
394 >                    cleanMe.compareAndSet(pred, null);
395 >            }
396 >            else // s is still tail; cannot clean
397 >                break;
398          }
399 +        return pred;
400      }
401 <    
401 >
402      /**
403 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
403 >     * Creates an initially empty {@code LinkedTransferQueue}.
404       */
405      public LinkedTransferQueue() {
406 +        QNode dummy = new QNode(null, false);
407 +        head = new PaddedAtomicReference<QNode>(dummy);
408 +        tail = new PaddedAtomicReference<QNode>(dummy);
409 +        cleanMe = new PaddedAtomicReference<QNode>(null);
410      }
411  
412      /**
413 <     * Creates a <tt>LinkedTransferQueue</tt>
413 >     * Creates a {@code LinkedTransferQueue}
414       * initially containing the elements of the given collection,
415       * added in traversal order of the collection's iterator.
416       * @param c the collection of elements to initially contain
# Line 381 | Line 418 | public class LinkedTransferQueue<E> exte
418       *         of its elements are null
419       */
420      public LinkedTransferQueue(Collection<? extends E> c) {
421 +        this();
422          addAll(c);
423      }
424  
# Line 390 | Line 428 | public class LinkedTransferQueue<E> exte
428          xfer(e, NOWAIT, 0);
429      }
430  
431 <    public boolean offer(E e, long timeout, TimeUnit unit)  
431 >    public boolean offer(E e, long timeout, TimeUnit unit)
432          throws InterruptedException {
433          if (e == null) throw new NullPointerException();
434          if (Thread.interrupted()) throw new InterruptedException();
# Line 407 | Line 445 | public class LinkedTransferQueue<E> exte
445      public void transfer(E e) throws InterruptedException {
446          if (e == null) throw new NullPointerException();
447          if (xfer(e, WAIT, 0) == null) {
448 <            Thread.interrupted();
448 >            Thread.interrupted();
449              throw new InterruptedException();
450 <        }
450 >        }
451      }
452  
453      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
# Line 431 | Line 469 | public class LinkedTransferQueue<E> exte
469          Object e = xfer(null, WAIT, 0);
470          if (e != null)
471              return (E)e;
472 <        Thread.interrupted();
472 >        Thread.interrupted();
473          throw new InterruptedException();
474      }
475  
# Line 487 | Line 525 | public class LinkedTransferQueue<E> exte
525                  QNode last = t.next;
526                  QNode first = h.next;
527                  if (t == tail.get()) {
528 <                    if (last != null)
528 >                    if (last != null)
529                          tail.compareAndSet(t, last);
530                      else if (first != null) {
531                          Object x = first.get();
532 <                        if (x == first)
533 <                            advanceHead(h, first);    
532 >                        if (x == first)
533 >                            advanceHead(h, first);
534                          else
535                              return h;
536                      }
# Line 520 | Line 558 | public class LinkedTransferQueue<E> exte
558          QNode currentNode; // last returned node, for remove()
559          QNode prevNode;    // predecessor of last returned node
560          E nextItem;        // Cache of next item, once commited to in next
561 <        
561 >
562          Itr() {
563              nextNode = traversalHead();
564              advance();
565          }
566 <        
566 >
567          E advance() {
568              prevNode = currentNode;
569              currentNode = nextNode;
570              E x = nextItem;
571 <            
571 >
572              QNode p = nextNode.next;
573              for (;;) {
574                  if (p == null || !p.isData) {
# Line 543 | Line 581 | public class LinkedTransferQueue<E> exte
581                      nextNode = p;
582                      nextItem = (E)item;
583                      return x;
584 <                }
584 >                }
585                  prevNode = p;
586                  p = p.next;
587              }
588          }
589 <        
589 >
590          public boolean hasNext() {
591              return nextNode != null;
592          }
593 <        
593 >
594          public E next() {
595              if (nextNode == null) throw new NoSuchElementException();
596              return advance();
597          }
598 <        
598 >
599          public void remove() {
600              QNode p = currentNode;
601              QNode prev = prevNode;
602 <            if (prev == null || p == null)
602 >            if (prev == null || p == null)
603                  throw new IllegalStateException();
604              Object x = p.get();
605              if (x != null && x != p && p.compareAndSet(x, p))
# Line 608 | Line 646 | public class LinkedTransferQueue<E> exte
646              if (p == null)
647                  return false;
648              Object x = p.get();
649 <            if (p != x)
649 >            if (p != x)
650                  return !p.isData;
651          }
652      }
653 <    
653 >
654      /**
655       * Returns the number of elements in this queue.  If this queue
656 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
657 <     * <tt>Integer.MAX_VALUE</tt>.
656 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
657 >     * {@code Integer.MAX_VALUE}.
658       *
659       * <p>Beware that, unlike in most collections, this method is
660       * <em>NOT</em> a constant-time operation. Because of the
# Line 630 | Line 668 | public class LinkedTransferQueue<E> exte
668          QNode h = traversalHead();
669          for (QNode p = h.next; p != null && p.isData; p = p.next) {
670              Object x = p.get();
671 <            if (x != null && x != p) {
671 >            if (x != null && x != p) {
672                  if (++count == Integer.MAX_VALUE) // saturated
673                      break;
674              }
# Line 657 | Line 695 | public class LinkedTransferQueue<E> exte
695      /**
696       * Save the state to a stream (that is, serialize it).
697       *
698 <     * @serialData All of the elements (each an <tt>E</tt>) in
698 >     * @serialData All of the elements (each an {@code E}) in
699       * the proper order, followed by a null
700       * @param s the stream
701       */
# Line 678 | Line 716 | public class LinkedTransferQueue<E> exte
716      private void readObject(java.io.ObjectInputStream s)
717          throws java.io.IOException, ClassNotFoundException {
718          s.defaultReadObject();
719 +        resetHeadAndTail();
720          for (;;) {
721              E item = (E)s.readObject();
722              if (item == null)
# Line 686 | Line 725 | public class LinkedTransferQueue<E> exte
725                  offer(item);
726          }
727      }
728 +
729 +
730 +    // Support for resetting head/tail while deserializing
731 +    private void resetHeadAndTail() {
732 +        QNode dummy = new QNode(null, false);
733 +        _unsafe.putObjectVolatile(this, headOffset,
734 +                                  new PaddedAtomicReference<QNode>(dummy));
735 +        _unsafe.putObjectVolatile(this, tailOffset,
736 +                                  new PaddedAtomicReference<QNode>(dummy));
737 +        _unsafe.putObjectVolatile(this, cleanMeOffset,
738 +                                  new PaddedAtomicReference<QNode>(null));
739 +    }
740 +
741 +    // Temporary Unsafe mechanics for preliminary release
742 +    private static Unsafe getUnsafe() throws Throwable {
743 +        try {
744 +            return Unsafe.getUnsafe();
745 +        } catch (SecurityException se) {
746 +            try {
747 +                return java.security.AccessController.doPrivileged
748 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
749 +                        public Unsafe run() throws Exception {
750 +                            return getUnsafePrivileged();
751 +                        }});
752 +            } catch (java.security.PrivilegedActionException e) {
753 +                throw e.getCause();
754 +            }
755 +        }
756 +    }
757 +
758 +    private static Unsafe getUnsafePrivileged()
759 +            throws NoSuchFieldException, IllegalAccessException {
760 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
761 +        f.setAccessible(true);
762 +        return (Unsafe) f.get(null);
763 +    }
764 +
765 +    private static long fieldOffset(String fieldName)
766 +            throws NoSuchFieldException {
767 +        return _unsafe.objectFieldOffset
768 +            (LinkedTransferQueue.class.getDeclaredField(fieldName));
769 +    }
770 +
771 +    private static final Unsafe _unsafe;
772 +    private static final long headOffset;
773 +    private static final long tailOffset;
774 +    private static final long cleanMeOffset;
775 +    static {
776 +        try {
777 +            _unsafe = getUnsafe();
778 +            headOffset = fieldOffset("head");
779 +            tailOffset = fieldOffset("tail");
780 +            cleanMeOffset = fieldOffset("cleanMe");
781 +        } catch (Throwable e) {
782 +            throw new RuntimeException("Could not initialize intrinsics", e);
783 +        }
784 +    }
785 +
786   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines