ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.2 by dl, Mon Jul 23 17:39:11 2007 UTC vs.
Revision 1.6 by jsr166, Fri Jul 25 18:13:15 2008 UTC

# Line 39 | Line 39 | import java.io.*;
39   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40   * Java Collections Framework</a>.
41   *
42 < * @since 1.5
42 > * @since 1.7
43   * @author Doug Lea
44   * @param <E> the type of elements held in this collection
45   *
# Line 49 | Line 49 | public class LinkedTransferQueue<E> exte
49      private static final long serialVersionUID = -3223113410248163686L;
50  
51      /*
52 <     * This is still a work in prgress...
52 >     * This is still a work in progress...
53       *
54       * This class extends the approach used in FIFO-mode
55       * SynchronousQueues. See the internal documentation, as well as
# Line 79 | Line 79 | public class LinkedTransferQueue<E> exte
79       * seems not to vary with number of CPUs (beyond 2) so is just
80       * a constant.
81       */
82 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
82 >    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
83  
84      /**
85       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 94 | public class LinkedTransferQueue<E> exte
94       */
95      static final long spinForTimeoutThreshold = 1000L;
96  
97 <    /**
97 >    /**
98       * Node class for LinkedTransferQueue. Opportunistically subclasses from
99       * AtomicReference to represent item. Uses Object, not E, to allow
100       * setting item to "this" after use, to avoid garbage
# Line 132 | Line 132 | public class LinkedTransferQueue<E> exte
132  
133  
134      private final QNode dummy = new QNode(null, false);
135 <    private final PaddedAtomicReference<QNode> head =
135 >    private final PaddedAtomicReference<QNode> head =
136          new PaddedAtomicReference<QNode>(dummy);
137 <    private final PaddedAtomicReference<QNode> tail =
137 >    private final PaddedAtomicReference<QNode> tail =
138          new PaddedAtomicReference<QNode>(dummy);
139  
140      /**
# Line 156 | Line 156 | public class LinkedTransferQueue<E> exte
156          }
157          return false;
158      }
159 <    
159 >
160      /**
161       * Puts or takes an item. Used for most queue operations (except
162       * poll() and tryTransfer())
163 <     * @param e the item or if null, signfies that this is a take
163 >     * @param e the item or if null, signifies that this is a take
164       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
165       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
166       * @return an item, or null on failure
# Line 188 | Line 188 | public class LinkedTransferQueue<E> exte
188                      return awaitFulfill(t, s, e, mode, nanos);
189                  }
190              }
191 <            
191 >
192              else if (h != null) {
193                  QNode first = h.next;
194 <                if (t == tail.get() && first != null &&
194 >                if (t == tail.get() && first != null &&
195                      advanceHead(h, first)) {
196                      Object x = first.get();
197                      if (x != first && first.compareAndSet(x, e)) {
# Line 206 | Line 206 | public class LinkedTransferQueue<E> exte
206  
207      /**
208       * Version of xfer for poll() and tryTransfer, which
209 <     * simpifies control paths both here and in xfer
209 >     * simplifies control paths both here and in xfer
210       */
211      private Object fulfill(Object e) {
212          boolean isData = (e != null);
# Line 228 | Line 228 | public class LinkedTransferQueue<E> exte
228              }
229              else if (h != null) {
230                  QNode first = h.next;
231 <                if (t == tail.get() &&
231 >                if (t == tail.get() &&
232                      first != null &&
233                      advanceHead(h, first)) {
234                      Object x = first.get();
# Line 252 | Line 252 | public class LinkedTransferQueue<E> exte
252       * @param nanos timeout value
253       * @return matched item, or s if cancelled
254       */
255 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
255 >    private Object awaitFulfill(QNode pred, QNode s, Object e,
256                                  int mode, long nanos) {
257          if (mode == NOWAIT)
258              return null;
# Line 268 | Line 268 | public class LinkedTransferQueue<E> exte
268                  advanceHead(pred, s);     // unlink if head
269                  if (x == s)               // was cancelled
270                      return clean(pred, s);
271 <                else if (x != null) {    
271 >                else if (x != null) {
272                      s.set(s);             // avoid garbage retention
273                      return x;
274                  }
# Line 288 | Line 288 | public class LinkedTransferQueue<E> exte
288              if (spins < 0) {
289                  QNode h = head.get(); // only spin if at head
290                  spins = ((h != null && h.next == s) ?
291 <                         (mode == TIMEOUT?
291 >                         (mode == TIMEOUT?
292                            maxTimedSpins : maxUntimedSpins) : 0);
293              }
294              if (spins > 0)
# Line 321 | Line 321 | public class LinkedTransferQueue<E> exte
321              if (w != Thread.currentThread())
322                  LockSupport.unpark(w);
323          }
324 <        
324 >
325          for (;;) {
326              if (pred.next != s) // already cleaned
327 <                return null;
327 >                return null;
328              QNode h = head.get();
329              QNode hn = h.next;   // Absorb cancelled first node as head
330              if (hn != null && hn.next == hn) {
# Line 360 | Line 360 | public class LinkedTransferQueue<E> exte
360                      cleanMe.compareAndSet(dp, null);
361                  if (dp == pred)
362                      return null;      // s is already saved node
363 <            }
363 >            }
364              else if (cleanMe.compareAndSet(null, pred))
365                  return null;          // Postpone cleaning s
366          }
367      }
368 <    
368 >
369      /**
370       * Creates an initially empty <tt>LinkedTransferQueue</tt>.
371       */
# Line 390 | Line 390 | public class LinkedTransferQueue<E> exte
390          xfer(e, NOWAIT, 0);
391      }
392  
393 <    public boolean offer(E e, long timeout, TimeUnit unit)  
393 >    public boolean offer(E e, long timeout, TimeUnit unit)
394          throws InterruptedException {
395          if (e == null) throw new NullPointerException();
396          if (Thread.interrupted()) throw new InterruptedException();
# Line 407 | Line 407 | public class LinkedTransferQueue<E> exte
407      public void transfer(E e) throws InterruptedException {
408          if (e == null) throw new NullPointerException();
409          if (xfer(e, WAIT, 0) == null) {
410 <            Thread.interrupted();
410 >            Thread.interrupted();
411              throw new InterruptedException();
412 <        }
412 >        }
413      }
414  
415      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
# Line 431 | Line 431 | public class LinkedTransferQueue<E> exte
431          Object e = xfer(null, WAIT, 0);
432          if (e != null)
433              return (E)e;
434 <        Thread.interrupted();
434 >        Thread.interrupted();
435          throw new InterruptedException();
436      }
437  
# Line 487 | Line 487 | public class LinkedTransferQueue<E> exte
487                  QNode last = t.next;
488                  QNode first = h.next;
489                  if (t == tail.get()) {
490 <                    if (last != null)
490 >                    if (last != null)
491                          tail.compareAndSet(t, last);
492                      else if (first != null) {
493                          Object x = first.get();
494 <                        if (x == first)
495 <                            advanceHead(h, first);    
494 >                        if (x == first)
495 >                            advanceHead(h, first);
496                          else
497                              return h;
498                      }
# Line 509 | Line 509 | public class LinkedTransferQueue<E> exte
509      }
510  
511      /**
512 <     * Iterators. Basic strategy os to travers list, treating
512 >     * Iterators. Basic strategy is to traverse list, treating
513       * non-data (i.e., request) nodes as terminating list.
514       * Once a valid data node is found, the item is cached
515       * so that the next call to next() will return it even
# Line 520 | Line 520 | public class LinkedTransferQueue<E> exte
520          QNode currentNode; // last returned node, for remove()
521          QNode prevNode;    // predecessor of last returned node
522          E nextItem;        // Cache of next item, once commited to in next
523 <        
523 >
524          Itr() {
525              nextNode = traversalHead();
526              advance();
527          }
528 <        
528 >
529          E advance() {
530              prevNode = currentNode;
531              currentNode = nextNode;
532              E x = nextItem;
533 <            
533 >
534              QNode p = nextNode.next;
535              for (;;) {
536                  if (p == null || !p.isData) {
# Line 543 | Line 543 | public class LinkedTransferQueue<E> exte
543                      nextNode = p;
544                      nextItem = (E)item;
545                      return x;
546 <                }
546 >                }
547                  prevNode = p;
548                  p = p.next;
549              }
550          }
551 <        
551 >
552          public boolean hasNext() {
553              return nextNode != null;
554          }
555 <        
555 >
556          public E next() {
557              if (nextNode == null) throw new NoSuchElementException();
558              return advance();
559          }
560 <        
560 >
561          public void remove() {
562              QNode p = currentNode;
563              QNode prev = prevNode;
564 <            if (prev == null || p == null)
564 >            if (prev == null || p == null)
565                  throw new IllegalStateException();
566              Object x = p.get();
567              if (x != null && x != p && p.compareAndSet(x, p))
# Line 608 | Line 608 | public class LinkedTransferQueue<E> exte
608              if (p == null)
609                  return false;
610              Object x = p.get();
611 <            if (p != x)
611 >            if (p != x)
612                  return !p.isData;
613          }
614      }
615 <    
615 >
616      /**
617       * Returns the number of elements in this queue.  If this queue
618       * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
# Line 630 | Line 630 | public class LinkedTransferQueue<E> exte
630          QNode h = traversalHead();
631          for (QNode p = h.next; p != null && p.isData; p = p.next) {
632              Object x = p.get();
633 <            if (x != null && x != p) {
633 >            if (x != null && x != p) {
634                  if (++count == Integer.MAX_VALUE) // saturated
635                      break;
636              }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines