ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.2 by dl, Mon Jul 23 17:39:11 2007 UTC vs.
Revision 1.10 by jsr166, Mon Jan 5 03:43:07 2009 UTC

# Line 10 | Line 10 | import java.util.concurrent.locks.*;
10   import java.util.concurrent.atomic.*;
11   import java.util.*;
12   import java.io.*;
13 + import sun.misc.Unsafe;
14 + import java.lang.reflect.*;
15  
16   /**
17   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 39 | Line 41 | import java.io.*;
41   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42   * Java Collections Framework</a>.
43   *
44 < * @since 1.5
44 > * @since 1.7
45   * @author Doug Lea
46   * @param <E> the type of elements held in this collection
47   *
# Line 49 | Line 51 | public class LinkedTransferQueue<E> exte
51      private static final long serialVersionUID = -3223113410248163686L;
52  
53      /*
52     * This is still a work in prgress...
53     *
54       * This class extends the approach used in FIFO-mode
55       * SynchronousQueues. See the internal documentation, as well as
56       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57       * Lea & Scott
58       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59       *
60 <     * The main extension is to provide different Wait modes
61 <     * for the main "xfer" method that puts or takes items.
62 <     * These don't impact the basic dual-queue logic, but instead
63 <     * control whether or how threads block upon insertion
64 <     * of request or data nodes into the dual queue.
60 >     * The main extension is to provide different Wait modes for the
61 >     * main "xfer" method that puts or takes items.  These don't
62 >     * impact the basic dual-queue logic, but instead control whether
63 >     * or how threads block upon insertion of request or data nodes
64 >     * into the dual queue. It also uses slightly different
65 >     * conventions for tracking whether nodes are off-list or
66 >     * cancelled.
67       */
68  
69      // Wait modes for xfer method
# Line 79 | Line 81 | public class LinkedTransferQueue<E> exte
81       * seems not to vary with number of CPUs (beyond 2) so is just
82       * a constant.
83       */
84 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
84 >    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
85  
86      /**
87       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 96 | public class LinkedTransferQueue<E> exte
96       */
97      static final long spinForTimeoutThreshold = 1000L;
98  
99 <    /**
100 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
101 <     * AtomicReference to represent item. Uses Object, not E, to allow
102 <     * setting item to "this" after use, to avoid garbage
103 <     * retention. Similarly, setting the next field to this is used as
104 <     * sentinel that node is off list.
99 >    /**
100 >     * Node class for LinkedTransferQueue. Opportunistically
101 >     * subclasses from AtomicReference to represent item. Uses Object,
102 >     * not E, to allow setting item to "this" after use, to avoid
103 >     * garbage retention. Similarly, setting the next field to this is
104 >     * used as sentinel that node is off list.
105       */
106      static final class QNode extends AtomicReference<Object> {
107          volatile QNode next;
# Line 131 | Line 133 | public class LinkedTransferQueue<E> exte
133      }
134  
135  
136 <    private final QNode dummy = new QNode(null, false);
137 <    private final PaddedAtomicReference<QNode> head =
138 <        new PaddedAtomicReference<QNode>(dummy);
139 <    private final PaddedAtomicReference<QNode> tail =
138 <        new PaddedAtomicReference<QNode>(dummy);
136 >    /** head of the queue */
137 >    private transient final PaddedAtomicReference<QNode> head;
138 >    /** tail of the queue */
139 >    private transient final PaddedAtomicReference<QNode> tail;
140  
141      /**
142       * Reference to a cancelled node that might not yet have been
143       * unlinked from queue because it was the last inserted node
144       * when it cancelled.
145       */
146 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
146 >    private transient final PaddedAtomicReference<QNode> cleanMe;
147  
148      /**
149       * Tries to cas nh as new head; if successful, unlink
# Line 156 | Line 156 | public class LinkedTransferQueue<E> exte
156          }
157          return false;
158      }
159 <    
159 >
160      /**
161       * Puts or takes an item. Used for most queue operations (except
162 <     * poll() and tryTransfer())
163 <     * @param e the item or if null, signfies that this is a take
162 >     * poll() and tryTransfer()). See the similar code in
163 >     * SynchronousQueue for detailed explanation.
164 >     * @param e the item or if null, signifies that this is a take
165       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
166       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
167       * @return an item, or null on failure
# Line 188 | Line 189 | public class LinkedTransferQueue<E> exte
189                      return awaitFulfill(t, s, e, mode, nanos);
190                  }
191              }
192 <            
192 >
193              else if (h != null) {
194                  QNode first = h.next;
195 <                if (t == tail.get() && first != null &&
195 >                if (t == tail.get() && first != null &&
196                      advanceHead(h, first)) {
197                      Object x = first.get();
198                      if (x != first && first.compareAndSet(x, e)) {
# Line 206 | Line 207 | public class LinkedTransferQueue<E> exte
207  
208      /**
209       * Version of xfer for poll() and tryTransfer, which
210 <     * simpifies control paths both here and in xfer
210 >     * simplifies control paths both here and in xfer
211       */
212      private Object fulfill(Object e) {
213          boolean isData = (e != null);
# Line 228 | Line 229 | public class LinkedTransferQueue<E> exte
229              }
230              else if (h != null) {
231                  QNode first = h.next;
232 <                if (t == tail.get() &&
232 >                if (t == tail.get() &&
233                      first != null &&
234                      advanceHead(h, first)) {
235                      Object x = first.get();
# Line 252 | Line 253 | public class LinkedTransferQueue<E> exte
253       * @param nanos timeout value
254       * @return matched item, or s if cancelled
255       */
256 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
256 >    private Object awaitFulfill(QNode pred, QNode s, Object e,
257                                  int mode, long nanos) {
258          if (mode == NOWAIT)
259              return null;
# Line 266 | Line 267 | public class LinkedTransferQueue<E> exte
267              Object x = s.get();
268              if (x != e) {                 // Node was matched or cancelled
269                  advanceHead(pred, s);     // unlink if head
270 <                if (x == s)               // was cancelled
271 <                    return clean(pred, s);
272 <                else if (x != null) {    
270 >                if (x == s) {              // was cancelled
271 >                    clean(pred, s);
272 >                    return null;
273 >                }
274 >                else if (x != null) {
275                      s.set(s);             // avoid garbage retention
276                      return x;
277                  }
278                  else
279                      return e;
280              }
278
281              if (mode == TIMEOUT) {
282                  long now = System.nanoTime();
283                  nanos -= now - lastTime;
# Line 288 | Line 290 | public class LinkedTransferQueue<E> exte
290              if (spins < 0) {
291                  QNode h = head.get(); // only spin if at head
292                  spins = ((h != null && h.next == s) ?
293 <                         (mode == TIMEOUT?
293 >                         (mode == TIMEOUT?
294                            maxTimedSpins : maxUntimedSpins) : 0);
295              }
296              if (spins > 0)
# Line 311 | Line 313 | public class LinkedTransferQueue<E> exte
313      }
314  
315      /**
316 +     * Returns validated tail for use in cleaning methods
317 +     */
318 +    private QNode getValidatedTail() {
319 +        for (;;) {
320 +            QNode h = head.get();
321 +            QNode first = h.next;
322 +            if (first != null && first.next == first) { // help advance
323 +                advanceHead(h, first);
324 +                continue;
325 +            }
326 +            QNode t = tail.get();
327 +            QNode last = t.next;
328 +            if (t == tail.get()) {
329 +                if (last != null)
330 +                    tail.compareAndSet(t, last); // help advance
331 +                else
332 +                    return t;
333 +            }
334 +        }
335 +    }
336 +
337 +    /**
338       * Gets rid of cancelled node s with original predecessor pred.
339 <     * @return null (to simplify use by callers)
339 >     * @param pred predecessor of cancelled node
340 >     * @param s the cancelled node
341       */
342 <    private Object clean(QNode pred, QNode s) {
342 >    private void clean(QNode pred, QNode s) {
343          Thread w = s.waiter;
344          if (w != null) {             // Wake up thread
345              s.waiter = null;
346              if (w != Thread.currentThread())
347                  LockSupport.unpark(w);
348          }
349 <        
350 <        for (;;) {
351 <            if (pred.next != s) // already cleaned
352 <                return null;
353 <            QNode h = head.get();
354 <            QNode hn = h.next;   // Absorb cancelled first node as head
355 <            if (hn != null && hn.next == hn) {
356 <                advanceHead(h, hn);
357 <                continue;
358 <            }
359 <            QNode t = tail.get();      // Ensure consistent read for tail
360 <            if (t == h)
361 <                return null;
337 <            QNode tn = t.next;
338 <            if (t != tail.get())
339 <                continue;
340 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
349 >        /*
350 >         * At any given time, exactly one node on list cannot be
351 >         * deleted -- the last inserted node. To accommodate this, if
352 >         * we cannot delete s, we save its predecessor as "cleanMe",
353 >         * processing the previously saved version first. At least one
354 >         * of node s or the node previously saved can always be
355 >         * processed, so this always terminates.
356 >         */
357 >        while (pred.next == s) {
358 >            QNode oldpred = reclean();  // First, help get rid of cleanMe
359 >            QNode t = getValidatedTail();
360 >            if (s != t) {               // If not tail, try to unsplice
361 >                QNode sn = s.next;      // s.next == s means s already off list
362                  if (sn == s || pred.casNext(s, sn))
363 <                    return null;
363 >                    break;
364              }
365 <            QNode dp = cleanMe.get();
366 <            if (dp != null) {    // Try unlinking previous cancelled node
367 <                QNode d = dp.next;
352 <                QNode dn;
353 <                if (d == null ||               // d is gone or
354 <                    d == dp ||                 // d is off list or
355 <                    d.get() != d ||            // d not cancelled or
356 <                    (d != t &&                 // d not tail and
357 <                     (dn = d.next) != null &&  //   has successor
358 <                     dn != d &&                //   that is on list
359 <                     dp.casNext(d, dn)))       // d unspliced
360 <                    cleanMe.compareAndSet(dp, null);
361 <                if (dp == pred)
362 <                    return null;      // s is already saved node
363 <            }
364 <            else if (cleanMe.compareAndSet(null, pred))
365 <                return null;          // Postpone cleaning s
365 >            else if (oldpred == pred || // Already saved
366 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
367 >                break;                  // Postpone cleaning
368          }
369      }
370 <    
370 >
371 >    /**
372 >     * Tries to unsplice the cancelled node held in cleanMe that was
373 >     * previously uncleanable because it was at tail.
374 >     * @return current cleanMe node (or null)
375 >     */
376 >    private QNode reclean() {
377 >        /*
378 >         * cleanMe is, or at one time was, predecessor of cancelled
379 >         * node s that was the tail so could not be unspliced.  If s
380 >         * is no longer the tail, try to unsplice if necessary and
381 >         * make cleanMe slot available.  This differs from similar
382 >         * code in clean() because we must check that pred still
383 >         * points to a cancelled node that must be unspliced -- if
384 >         * not, we can (must) clear cleanMe without unsplicing.
385 >         * This can loop only due to contention on casNext or
386 >         * clearing cleanMe.
387 >         */
388 >        QNode pred;
389 >        while ((pred = cleanMe.get()) != null) {
390 >            QNode t = getValidatedTail();
391 >            QNode s = pred.next;
392 >            if (s != t) {
393 >                QNode sn;
394 >                if (s == null || s == pred || s.get() != s ||
395 >                    (sn = s.next) == s || pred.casNext(s, sn))
396 >                    cleanMe.compareAndSet(pred, null);
397 >            }
398 >            else // s is still tail; cannot clean
399 >                break;
400 >        }
401 >        return pred;
402 >    }
403 >
404      /**
405       * Creates an initially empty <tt>LinkedTransferQueue</tt>.
406       */
407      public LinkedTransferQueue() {
408 +        QNode dummy = new QNode(null, false);
409 +        head = new PaddedAtomicReference<QNode>(dummy);
410 +        tail = new PaddedAtomicReference<QNode>(dummy);
411 +        cleanMe = new PaddedAtomicReference<QNode>(null);
412      }
413  
414      /**
# Line 381 | Line 420 | public class LinkedTransferQueue<E> exte
420       *         of its elements are null
421       */
422      public LinkedTransferQueue(Collection<? extends E> c) {
423 +        this();
424          addAll(c);
425      }
426  
# Line 390 | Line 430 | public class LinkedTransferQueue<E> exte
430          xfer(e, NOWAIT, 0);
431      }
432  
433 <    public boolean offer(E e, long timeout, TimeUnit unit)  
433 >    public boolean offer(E e, long timeout, TimeUnit unit)
434          throws InterruptedException {
435          if (e == null) throw new NullPointerException();
436          if (Thread.interrupted()) throw new InterruptedException();
# Line 407 | Line 447 | public class LinkedTransferQueue<E> exte
447      public void transfer(E e) throws InterruptedException {
448          if (e == null) throw new NullPointerException();
449          if (xfer(e, WAIT, 0) == null) {
450 <            Thread.interrupted();
450 >            Thread.interrupted();
451              throw new InterruptedException();
452 <        }
452 >        }
453      }
454  
455      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
# Line 431 | Line 471 | public class LinkedTransferQueue<E> exte
471          Object e = xfer(null, WAIT, 0);
472          if (e != null)
473              return (E)e;
474 <        Thread.interrupted();
474 >        Thread.interrupted();
475          throw new InterruptedException();
476      }
477  
# Line 487 | Line 527 | public class LinkedTransferQueue<E> exte
527                  QNode last = t.next;
528                  QNode first = h.next;
529                  if (t == tail.get()) {
530 <                    if (last != null)
530 >                    if (last != null)
531                          tail.compareAndSet(t, last);
532                      else if (first != null) {
533                          Object x = first.get();
534 <                        if (x == first)
535 <                            advanceHead(h, first);    
534 >                        if (x == first)
535 >                            advanceHead(h, first);
536                          else
537                              return h;
538                      }
# Line 509 | Line 549 | public class LinkedTransferQueue<E> exte
549      }
550  
551      /**
552 <     * Iterators. Basic strategy os to travers list, treating
552 >     * Iterators. Basic strategy is to traverse list, treating
553       * non-data (i.e., request) nodes as terminating list.
554       * Once a valid data node is found, the item is cached
555       * so that the next call to next() will return it even
# Line 520 | Line 560 | public class LinkedTransferQueue<E> exte
560          QNode currentNode; // last returned node, for remove()
561          QNode prevNode;    // predecessor of last returned node
562          E nextItem;        // Cache of next item, once commited to in next
563 <        
563 >
564          Itr() {
565              nextNode = traversalHead();
566              advance();
567          }
568 <        
568 >
569          E advance() {
570              prevNode = currentNode;
571              currentNode = nextNode;
572              E x = nextItem;
573 <            
573 >
574              QNode p = nextNode.next;
575              for (;;) {
576                  if (p == null || !p.isData) {
# Line 543 | Line 583 | public class LinkedTransferQueue<E> exte
583                      nextNode = p;
584                      nextItem = (E)item;
585                      return x;
586 <                }
586 >                }
587                  prevNode = p;
588                  p = p.next;
589              }
590          }
591 <        
591 >
592          public boolean hasNext() {
593              return nextNode != null;
594          }
595 <        
595 >
596          public E next() {
597              if (nextNode == null) throw new NoSuchElementException();
598              return advance();
599          }
600 <        
600 >
601          public void remove() {
602              QNode p = currentNode;
603              QNode prev = prevNode;
604 <            if (prev == null || p == null)
604 >            if (prev == null || p == null)
605                  throw new IllegalStateException();
606              Object x = p.get();
607              if (x != null && x != p && p.compareAndSet(x, p))
# Line 608 | Line 648 | public class LinkedTransferQueue<E> exte
648              if (p == null)
649                  return false;
650              Object x = p.get();
651 <            if (p != x)
651 >            if (p != x)
652                  return !p.isData;
653          }
654      }
655 <    
655 >
656      /**
657       * Returns the number of elements in this queue.  If this queue
658       * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
# Line 630 | Line 670 | public class LinkedTransferQueue<E> exte
670          QNode h = traversalHead();
671          for (QNode p = h.next; p != null && p.isData; p = p.next) {
672              Object x = p.get();
673 <            if (x != null && x != p) {
673 >            if (x != null && x != p) {
674                  if (++count == Integer.MAX_VALUE) // saturated
675                      break;
676              }
# Line 678 | Line 718 | public class LinkedTransferQueue<E> exte
718      private void readObject(java.io.ObjectInputStream s)
719          throws java.io.IOException, ClassNotFoundException {
720          s.defaultReadObject();
721 +        resetHeadAndTail();
722          for (;;) {
723              E item = (E)s.readObject();
724              if (item == null)
# Line 686 | Line 727 | public class LinkedTransferQueue<E> exte
727                  offer(item);
728          }
729      }
730 +
731 +
732 +    // Support for resetting head/tail while deserializing
733 +
734 +    // Temporary Unsafe mechanics for preliminary release
735 +    private static final Unsafe _unsafe;
736 +    private static final long headOffset;
737 +    private static final long tailOffset;
738 +    private static final long cleanMeOffset;
739 +    static {
740 +        try {
741 +            if (LinkedTransferQueue.class.getClassLoader() != null) {
742 +                Field f = Unsafe.class.getDeclaredField("theUnsafe");
743 +                f.setAccessible(true);
744 +                _unsafe = (Unsafe)f.get(null);
745 +            }
746 +            else
747 +                _unsafe = Unsafe.getUnsafe();
748 +            headOffset = _unsafe.objectFieldOffset
749 +                (LinkedTransferQueue.class.getDeclaredField("head"));
750 +            tailOffset = _unsafe.objectFieldOffset
751 +                (LinkedTransferQueue.class.getDeclaredField("tail"));
752 +            cleanMeOffset = _unsafe.objectFieldOffset
753 +                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
754 +        } catch (Exception e) {
755 +            throw new RuntimeException("Could not initialize intrinsics", e);
756 +        }
757 +    }
758 +
759 +    private void resetHeadAndTail() {
760 +        QNode dummy = new QNode(null, false);
761 +        _unsafe.putObjectVolatile(this, headOffset,
762 +                                  new PaddedAtomicReference<QNode>(dummy));
763 +        _unsafe.putObjectVolatile(this, tailOffset,
764 +                                  new PaddedAtomicReference<QNode>(dummy));
765 +        _unsafe.putObjectVolatile(this, cleanMeOffset,
766 +                                  new PaddedAtomicReference<QNode>(null));
767 +
768 +    }
769 +
770   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines