ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.1 by dl, Tue May 29 09:55:32 2007 UTC vs.
Revision 1.8 by dl, Fri Oct 3 00:39:48 2008 UTC

# Line 10 | Line 10 | import java.util.concurrent.locks.*;
10   import java.util.concurrent.atomic.*;
11   import java.util.*;
12   import java.io.*;
13 + import sun.misc.Unsafe;
14 + import java.lang.reflect.*;
15  
16   /**
17   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 39 | Line 41 | import java.io.*;
41   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42   * Java Collections Framework</a>.
43   *
44 < * @since 1.5
44 > * @since 1.7
45   * @author Doug Lea
46   * @param <E> the type of elements held in this collection
47   *
# Line 49 | Line 51 | public class LinkedTransferQueue<E> exte
51      private static final long serialVersionUID = -3223113410248163686L;
52  
53      /*
54 <     * This is still a work in prgress...
54 >     * This is still a work in progress...
55       *
56       * This class extends the approach used in FIFO-mode
57       * SynchronousQueues. See the internal documentation, as well as
# Line 79 | Line 81 | public class LinkedTransferQueue<E> exte
81       * seems not to vary with number of CPUs (beyond 2) so is just
82       * a constant.
83       */
84 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
84 >    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
85  
86      /**
87       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 96 | public class LinkedTransferQueue<E> exte
96       */
97      static final long spinForTimeoutThreshold = 1000L;
98  
99 <    /**
99 >    /**
100       * Node class for LinkedTransferQueue. Opportunistically subclasses from
101       * AtomicReference to represent item. Uses Object, not E, to allow
102       * setting item to "this" after use, to avoid garbage
# Line 131 | Line 133 | public class LinkedTransferQueue<E> exte
133      }
134  
135  
136 <    private final QNode dummy = new QNode(null, false);
137 <    private final PaddedAtomicReference<QNode> head =
138 <        new PaddedAtomicReference<QNode>(dummy);
139 <    private final PaddedAtomicReference<QNode> tail =
138 <        new PaddedAtomicReference<QNode>(dummy);
136 >    /** head of the queue */
137 >    private transient final PaddedAtomicReference<QNode> head;
138 >    /** tail of the queue */
139 >    private transient final PaddedAtomicReference<QNode> tail;
140  
141      /**
142       * Reference to a cancelled node that might not yet have been
143       * unlinked from queue because it was the last inserted node
144       * when it cancelled.
145       */
146 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
146 >    private transient final PaddedAtomicReference<QNode> cleanMe;
147  
148      /**
149       * Tries to cas nh as new head; if successful, unlink
# Line 156 | Line 156 | public class LinkedTransferQueue<E> exte
156          }
157          return false;
158      }
159 <    
159 >
160      /**
161       * Puts or takes an item. Used for most queue operations (except
162       * poll() and tryTransfer())
163 <     * @param e the item or if null, signfies that this is a take
163 >     * @param e the item or if null, signifies that this is a take
164       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
165       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
166       * @return an item, or null on failure
# Line 188 | Line 188 | public class LinkedTransferQueue<E> exte
188                      return awaitFulfill(t, s, e, mode, nanos);
189                  }
190              }
191 <            
191 >
192              else if (h != null) {
193                  QNode first = h.next;
194 <                if (t == tail.get() && first != null &&
194 >                if (t == tail.get() && first != null &&
195                      advanceHead(h, first)) {
196                      Object x = first.get();
197                      if (x != first && first.compareAndSet(x, e)) {
# Line 206 | Line 206 | public class LinkedTransferQueue<E> exte
206  
207      /**
208       * Version of xfer for poll() and tryTransfer, which
209 <     * simpifies control paths both here and in xfer
209 >     * simplifies control paths both here and in xfer
210       */
211      private Object fulfill(Object e) {
212          boolean isData = (e != null);
# Line 228 | Line 228 | public class LinkedTransferQueue<E> exte
228              }
229              else if (h != null) {
230                  QNode first = h.next;
231 <                if (t == tail.get() &&
231 >                if (t == tail.get() &&
232                      first != null &&
233                      advanceHead(h, first)) {
234                      Object x = first.get();
# Line 252 | Line 252 | public class LinkedTransferQueue<E> exte
252       * @param nanos timeout value
253       * @return matched item, or s if cancelled
254       */
255 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
255 >    private Object awaitFulfill(QNode pred, QNode s, Object e,
256                                  int mode, long nanos) {
257          if (mode == NOWAIT)
258              return null;
# Line 268 | Line 268 | public class LinkedTransferQueue<E> exte
268                  advanceHead(pred, s);     // unlink if head
269                  if (x == s)               // was cancelled
270                      return clean(pred, s);
271 <                else if (x != null) {    
271 >                else if (x != null) {
272                      s.set(s);             // avoid garbage retention
273                      return x;
274                  }
# Line 288 | Line 288 | public class LinkedTransferQueue<E> exte
288              if (spins < 0) {
289                  QNode h = head.get(); // only spin if at head
290                  spins = ((h != null && h.next == s) ?
291 <                         (mode == TIMEOUT?
291 >                         (mode == TIMEOUT?
292                            maxTimedSpins : maxUntimedSpins) : 0);
293              }
294              if (spins > 0)
# Line 321 | Line 321 | public class LinkedTransferQueue<E> exte
321              if (w != Thread.currentThread())
322                  LockSupport.unpark(w);
323          }
324 <        
324 >
325          for (;;) {
326              if (pred.next != s) // already cleaned
327 <                return null;
327 >                return null;
328              QNode h = head.get();
329              QNode hn = h.next;   // Absorb cancelled first node as head
330              if (hn != null && hn.next == hn) {
# Line 360 | Line 360 | public class LinkedTransferQueue<E> exte
360                      cleanMe.compareAndSet(dp, null);
361                  if (dp == pred)
362                      return null;      // s is already saved node
363 <            }
363 >            }
364              else if (cleanMe.compareAndSet(null, pred))
365                  return null;          // Postpone cleaning s
366          }
367      }
368 <    
368 >
369      /**
370       * Creates an initially empty <tt>LinkedTransferQueue</tt>.
371       */
372      public LinkedTransferQueue() {
373 +        QNode dummy = new QNode(null, false);
374 +        head = new PaddedAtomicReference<QNode>(dummy);
375 +        tail = new PaddedAtomicReference<QNode>(dummy);
376 +        cleanMe = new PaddedAtomicReference<QNode>(null);
377      }
378  
379      /**
# Line 381 | Line 385 | public class LinkedTransferQueue<E> exte
385       *         of its elements are null
386       */
387      public LinkedTransferQueue(Collection<? extends E> c) {
388 +        this();
389          addAll(c);
390      }
391  
# Line 390 | Line 395 | public class LinkedTransferQueue<E> exte
395          xfer(e, NOWAIT, 0);
396      }
397  
398 <    public boolean offer(E e, long timeout, TimeUnit unit)  
398 >    public boolean offer(E e, long timeout, TimeUnit unit)
399          throws InterruptedException {
400          if (e == null) throw new NullPointerException();
401          if (Thread.interrupted()) throw new InterruptedException();
# Line 407 | Line 412 | public class LinkedTransferQueue<E> exte
412      public void transfer(E e) throws InterruptedException {
413          if (e == null) throw new NullPointerException();
414          if (xfer(e, WAIT, 0) == null) {
415 <            Thread.interrupted();
415 >            Thread.interrupted();
416              throw new InterruptedException();
417 <        }
417 >        }
418      }
419  
420      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
# Line 431 | Line 436 | public class LinkedTransferQueue<E> exte
436          Object e = xfer(null, WAIT, 0);
437          if (e != null)
438              return (E)e;
439 <        Thread.interrupted();
439 >        Thread.interrupted();
440          throw new InterruptedException();
441      }
442  
# Line 487 | Line 492 | public class LinkedTransferQueue<E> exte
492                  QNode last = t.next;
493                  QNode first = h.next;
494                  if (t == tail.get()) {
495 <                    if (last != null)
495 >                    if (last != null)
496                          tail.compareAndSet(t, last);
497                      else if (first != null) {
498                          Object x = first.get();
499 <                        if (x == first)
500 <                            advanceHead(h, first);    
499 >                        if (x == first)
500 >                            advanceHead(h, first);
501                          else
502                              return h;
503                      }
# Line 509 | Line 514 | public class LinkedTransferQueue<E> exte
514      }
515  
516      /**
517 <     * Iterators. Basic strategy os to travers list, treating
517 >     * Iterators. Basic strategy is to traverse list, treating
518       * non-data (i.e., request) nodes as terminating list.
519       * Once a valid data node is found, the item is cached
520       * so that the next call to next() will return it even
# Line 520 | Line 525 | public class LinkedTransferQueue<E> exte
525          QNode currentNode; // last returned node, for remove()
526          QNode prevNode;    // predecessor of last returned node
527          E nextItem;        // Cache of next item, once commited to in next
528 <        
528 >
529          Itr() {
530              nextNode = traversalHead();
531              advance();
532          }
533 <        
533 >
534          E advance() {
535              prevNode = currentNode;
536              currentNode = nextNode;
537              E x = nextItem;
538 <            
538 >
539              QNode p = nextNode.next;
540              for (;;) {
541                  if (p == null || !p.isData) {
# Line 543 | Line 548 | public class LinkedTransferQueue<E> exte
548                      nextNode = p;
549                      nextItem = (E)item;
550                      return x;
551 <                }
551 >                }
552                  prevNode = p;
553                  p = p.next;
554              }
555          }
556 <        
556 >
557          public boolean hasNext() {
558              return nextNode != null;
559          }
560 <        
560 >
561          public E next() {
562              if (nextNode == null) throw new NoSuchElementException();
563              return advance();
564          }
565 <        
565 >
566          public void remove() {
567              QNode p = currentNode;
568              QNode prev = prevNode;
569 <            if (prev == null || p == null)
569 >            if (prev == null || p == null)
570                  throw new IllegalStateException();
571              Object x = p.get();
572              if (x != null && x != p && p.compareAndSet(x, p))
# Line 585 | Line 590 | public class LinkedTransferQueue<E> exte
590          }
591      }
592  
593 +    public boolean isEmpty() {
594 +        for (;;) {
595 +            QNode h = traversalHead();
596 +            QNode p = h.next;
597 +            if (p == null)
598 +                return true;
599 +            Object x = p.get();
600 +            if (p != x) {
601 +                if (!p.isData)
602 +                    return true;
603 +                if (x != null)
604 +                    return false;
605 +            }
606 +        }
607 +    }
608 +
609      public boolean hasWaitingConsumer() {
610          for (;;) {
611              QNode h = traversalHead();
# Line 592 | Line 613 | public class LinkedTransferQueue<E> exte
613              if (p == null)
614                  return false;
615              Object x = p.get();
616 <            if (p != x)
616 >            if (p != x)
617                  return !p.isData;
618          }
619      }
620 <    
620 >
621      /**
622       * Returns the number of elements in this queue.  If this queue
623       * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
# Line 614 | Line 635 | public class LinkedTransferQueue<E> exte
635          QNode h = traversalHead();
636          for (QNode p = h.next; p != null && p.isData; p = p.next) {
637              Object x = p.get();
638 <            if (x != null && x != p) {
638 >            if (x != null && x != p) {
639                  if (++count == Integer.MAX_VALUE) // saturated
640                      break;
641              }
# Line 662 | Line 683 | public class LinkedTransferQueue<E> exte
683      private void readObject(java.io.ObjectInputStream s)
684          throws java.io.IOException, ClassNotFoundException {
685          s.defaultReadObject();
686 +        resetHeadAndTail();
687          for (;;) {
688              E item = (E)s.readObject();
689              if (item == null)
# Line 670 | Line 692 | public class LinkedTransferQueue<E> exte
692                  offer(item);
693          }
694      }
695 +
696 +
697 +    // Support for resetting head/tail while deserializing
698 +
699 +    // Temporary Unsafe mechanics for preliminary release
700 +    private static final Unsafe _unsafe;
701 +    private static final long headOffset;
702 +    private static final long tailOffset;
703 +    private static final long cleanMeOffset;
704 +    static {
705 +        try {
706 +            if (LinkedTransferQueue.class.getClassLoader() != null) {
707 +                Field f = Unsafe.class.getDeclaredField("theUnsafe");
708 +                f.setAccessible(true);
709 +                _unsafe = (Unsafe)f.get(null);
710 +            }
711 +            else
712 +                _unsafe = Unsafe.getUnsafe();
713 +            headOffset = _unsafe.objectFieldOffset
714 +                (LinkedTransferQueue.class.getDeclaredField("head"));
715 +            tailOffset = _unsafe.objectFieldOffset
716 +                (LinkedTransferQueue.class.getDeclaredField("tail"));
717 +            cleanMeOffset = _unsafe.objectFieldOffset
718 +                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
719 +        } catch (Exception e) {
720 +            throw new RuntimeException("Could not initialize intrinsics", e);
721 +        }
722 +    }
723 +
724 +    private void resetHeadAndTail() {
725 +        QNode dummy = new QNode(null, false);
726 +        _unsafe.putObjectVolatile(this, headOffset,
727 +                                  new PaddedAtomicReference<QNode>(dummy));
728 +        _unsafe.putObjectVolatile(this, tailOffset,
729 +                                  new PaddedAtomicReference<QNode>(dummy));
730 +        _unsafe.putObjectVolatile(this, cleanMeOffset,
731 +                                  new PaddedAtomicReference<QNode>(null));
732 +
733 +    }
734 +
735   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines