ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.2 by dl, Mon Jul 23 17:39:11 2007 UTC vs.
Revision 1.44 by jsr166, Tue Aug 4 20:32:16 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.ConcurrentModificationException;
14 > import java.util.Iterator;
15 > import java.util.NoSuchElementException;
16 > import java.util.Queue;
17 > import java.util.concurrent.locks.LockSupport;
18 > import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21 < * An unbounded {@linkplain TransferQueue} based on linked nodes.
21 > * An unbounded {@link TransferQueue} based on linked nodes.
22   * This queue orders elements FIFO (first-in-first-out) with respect
23   * to any given producer.  The <em>head</em> of the queue is that
24   * element that has been on the queue the longest time for some
25   * producer.  The <em>tail</em> of the queue is that element that has
26   * been on the queue the shortest time for some producer.
27   *
28 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
28 > * <p>Beware that, unlike in most collections, the {@code size}
29   * method is <em>NOT</em> a constant-time operation. Because of the
30   * asynchronous nature of these queues, determining the current number
31   * of elements requires a traversal of the elements.
# Line 39 | Line 45 | import java.io.*;
45   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
46   * Java Collections Framework</a>.
47   *
48 < * @since 1.5
48 > * @since 1.7
49   * @author Doug Lea
50   * @param <E> the type of elements held in this collection
45 *
51   */
52   public class LinkedTransferQueue<E> extends AbstractQueue<E>
53      implements TransferQueue<E>, java.io.Serializable {
54      private static final long serialVersionUID = -3223113410248163686L;
55  
56      /*
52     * This is still a work in prgress...
53     *
57       * This class extends the approach used in FIFO-mode
58       * SynchronousQueues. See the internal documentation, as well as
59       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
60       * Lea & Scott
61       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
62       *
63 <     * The main extension is to provide different Wait modes
64 <     * for the main "xfer" method that puts or takes items.
65 <     * These don't impact the basic dual-queue logic, but instead
66 <     * control whether or how threads block upon insertion
67 <     * of request or data nodes into the dual queue.
63 >     * The main extension is to provide different Wait modes for the
64 >     * main "xfer" method that puts or takes items.  These don't
65 >     * impact the basic dual-queue logic, but instead control whether
66 >     * or how threads block upon insertion of request or data nodes
67 >     * into the dual queue. It also uses slightly different
68 >     * conventions for tracking whether nodes are off-list or
69 >     * cancelled.
70       */
71  
72      // Wait modes for xfer method
# Line 79 | Line 84 | public class LinkedTransferQueue<E> exte
84       * seems not to vary with number of CPUs (beyond 2) so is just
85       * a constant.
86       */
87 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
87 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
88  
89      /**
90       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 99 | public class LinkedTransferQueue<E> exte
99       */
100      static final long spinForTimeoutThreshold = 1000L;
101  
102 <    /**
103 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
104 <     * AtomicReference to represent item. Uses Object, not E, to allow
105 <     * setting item to "this" after use, to avoid garbage
106 <     * retention. Similarly, setting the next field to this is used as
107 <     * sentinel that node is off list.
102 >    /**
103 >     * Node class for LinkedTransferQueue. Opportunistically
104 >     * subclasses from AtomicReference to represent item. Uses Object,
105 >     * not E, to allow setting item to "this" after use, to avoid
106 >     * garbage retention. Similarly, setting the next field to this is
107 >     * used as sentinel that node is off list.
108       */
109 <    static final class QNode extends AtomicReference<Object> {
110 <        volatile QNode next;
109 >    static final class Node<E> extends AtomicReference<Object> {
110 >        volatile Node<E> next;
111          volatile Thread waiter;       // to control park/unpark
112          final boolean isData;
113 <        QNode(Object item, boolean isData) {
113 >
114 >        Node(E item, boolean isData) {
115              super(item);
116              this.isData = isData;
117          }
118  
119 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
114 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
115 <            (QNode.class, QNode.class, "next");
119 >        // Unsafe mechanics
120  
121 <        boolean casNext(QNode cmp, QNode val) {
122 <            return nextUpdater.compareAndSet(this, cmp, val);
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124 >
125 >        final boolean casNext(Node<E> cmp, Node<E> val) {
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127 >        }
128 >
129 >        final void clearNext() {
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160 +
161 +        private static final long serialVersionUID = -3375979862319811754L;
162      }
163  
164      /**
# Line 128 | Line 170 | public class LinkedTransferQueue<E> exte
170          // enough padding for 64bytes with 4byte refs
171          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
172          PaddedAtomicReference(T r) { super(r); }
173 +        private static final long serialVersionUID = 8170090609809740854L;
174      }
175  
176  
177 <    private final QNode dummy = new QNode(null, false);
178 <    private final PaddedAtomicReference<QNode> head =
179 <        new PaddedAtomicReference<QNode>(dummy);
180 <    private final PaddedAtomicReference<QNode> tail =
181 <        new PaddedAtomicReference<QNode>(dummy);
177 >    /** head of the queue */
178 >    private transient final PaddedAtomicReference<Node<E>> head;
179 >
180 >    /** tail of the queue */
181 >    private transient final PaddedAtomicReference<Node<E>> tail;
182  
183      /**
184       * Reference to a cancelled node that might not yet have been
185       * unlinked from queue because it was the last inserted node
186       * when it cancelled.
187       */
188 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
188 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
189  
190      /**
191       * Tries to cas nh as new head; if successful, unlink
192       * old head's next node to avoid garbage retention.
193       */
194 <    private boolean advanceHead(QNode h, QNode nh) {
194 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
195          if (h == head.get() && head.compareAndSet(h, nh)) {
196 <            h.next = h; // forget old next
196 >            h.clearNext(); // forget old next
197              return true;
198          }
199          return false;
200      }
201 <    
201 >
202      /**
203       * Puts or takes an item. Used for most queue operations (except
204 <     * poll() and tryTransfer())
205 <     * @param e the item or if null, signfies that this is a take
204 >     * poll() and tryTransfer()). See the similar code in
205 >     * SynchronousQueue for detailed explanation.
206 >     *
207 >     * @param e the item or if null, signifies that this is a take
208       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
209       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
210       * @return an item, or null on failure
211       */
212 <    private Object xfer(Object e, int mode, long nanos) {
212 >    private E xfer(E e, int mode, long nanos) {
213          boolean isData = (e != null);
214 <        QNode s = null;
215 <        final PaddedAtomicReference<QNode> head = this.head;
216 <        final PaddedAtomicReference<QNode> tail = this.tail;
214 >        Node<E> s = null;
215 >        final PaddedAtomicReference<Node<E>> head = this.head;
216 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
217  
218          for (;;) {
219 <            QNode t = tail.get();
220 <            QNode h = head.get();
219 >            Node<E> t = tail.get();
220 >            Node<E> h = head.get();
221  
222 <            if (t != null && (t == h || t.isData == isData)) {
222 >            if (t == h || t.isData == isData) {
223                  if (s == null)
224 <                    s = new QNode(e, isData);
225 <                QNode last = t.next;
224 >                    s = new Node<E>(e, isData);
225 >                Node<E> last = t.next;
226                  if (last != null) {
227                      if (t == tail.get())
228                          tail.compareAndSet(t, last);
# Line 187 | Line 231 | public class LinkedTransferQueue<E> exte
231                      tail.compareAndSet(t, s);
232                      return awaitFulfill(t, s, e, mode, nanos);
233                  }
234 <            }
235 <            
236 <            else if (h != null) {
193 <                QNode first = h.next;
194 <                if (t == tail.get() && first != null &&
234 >            } else {
235 >                Node<E> first = h.next;
236 >                if (t == tail.get() && first != null &&
237                      advanceHead(h, first)) {
238                      Object x = first.get();
239                      if (x != first && first.compareAndSet(x, e)) {
240                          LockSupport.unpark(first.waiter);
241 <                        return isData? e : x;
241 >                        return isData ? e : (E) x;
242                      }
243                  }
244              }
# Line 206 | Line 248 | public class LinkedTransferQueue<E> exte
248  
249      /**
250       * Version of xfer for poll() and tryTransfer, which
251 <     * simpifies control paths both here and in xfer
251 >     * simplifies control paths both here and in xfer.
252       */
253 <    private Object fulfill(Object e) {
253 >    private E fulfill(E e) {
254          boolean isData = (e != null);
255 <        final PaddedAtomicReference<QNode> head = this.head;
256 <        final PaddedAtomicReference<QNode> tail = this.tail;
255 >        final PaddedAtomicReference<Node<E>> head = this.head;
256 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
257  
258          for (;;) {
259 <            QNode t = tail.get();
260 <            QNode h = head.get();
259 >            Node<E> t = tail.get();
260 >            Node<E> h = head.get();
261  
262 <            if (t != null && (t == h || t.isData == isData)) {
263 <                QNode last = t.next;
262 >            if (t == h || t.isData == isData) {
263 >                Node<E> last = t.next;
264                  if (t == tail.get()) {
265                      if (last != null)
266                          tail.compareAndSet(t, last);
267                      else
268                          return null;
269                  }
270 <            }
271 <            else if (h != null) {
272 <                QNode first = h.next;
231 <                if (t == tail.get() &&
270 >            } else {
271 >                Node<E> first = h.next;
272 >                if (t == tail.get() &&
273                      first != null &&
274                      advanceHead(h, first)) {
275                      Object x = first.get();
276                      if (x != first && first.compareAndSet(x, e)) {
277                          LockSupport.unpark(first.waiter);
278 <                        return isData? e : x;
278 >                        return isData ? e : (E) x;
279                      }
280                  }
281              }
# Line 250 | Line 291 | public class LinkedTransferQueue<E> exte
291       * @param e the comparison value for checking match
292       * @param mode mode
293       * @param nanos timeout value
294 <     * @return matched item, or s if cancelled
294 >     * @return matched item, or null if cancelled
295       */
296 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
297 <                                int mode, long nanos) {
296 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
297 >                           int mode, long nanos) {
298          if (mode == NOWAIT)
299              return null;
300  
301 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
301 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
302          Thread w = Thread.currentThread();
303          int spins = -1; // set to desired spin count below
304          for (;;) {
# Line 266 | Line 307 | public class LinkedTransferQueue<E> exte
307              Object x = s.get();
308              if (x != e) {                 // Node was matched or cancelled
309                  advanceHead(pred, s);     // unlink if head
310 <                if (x == s)               // was cancelled
311 <                    return clean(pred, s);
312 <                else if (x != null) {    
310 >                if (x == s) {             // was cancelled
311 >                    clean(pred, s);
312 >                    return null;
313 >                }
314 >                else if (x != null) {
315                      s.set(s);             // avoid garbage retention
316 <                    return x;
316 >                    return (E) x;
317                  }
318                  else
319                      return e;
320              }
278
321              if (mode == TIMEOUT) {
322                  long now = System.nanoTime();
323                  nanos -= now - lastTime;
# Line 286 | Line 328 | public class LinkedTransferQueue<E> exte
328                  }
329              }
330              if (spins < 0) {
331 <                QNode h = head.get(); // only spin if at head
332 <                spins = ((h != null && h.next == s) ?
333 <                         (mode == TIMEOUT?
334 <                          maxTimedSpins : maxUntimedSpins) : 0);
331 >                Node<E> h = head.get(); // only spin if at head
332 >                spins = ((h.next != s) ? 0 :
333 >                         (mode == TIMEOUT) ? maxTimedSpins :
334 >                         maxUntimedSpins);
335              }
336              if (spins > 0)
337                  --spins;
338              else if (s.waiter == null)
339                  s.waiter = w;
340              else if (mode != TIMEOUT) {
341 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
341 >                LockSupport.park(this);
342                  s.waiter = null;
343                  spins = -1;
344              }
345              else if (nanos > spinForTimeoutThreshold) {
346 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
346 >                LockSupport.parkNanos(this, nanos);
347                  s.waiter = null;
348                  spins = -1;
349              }
# Line 311 | Line 351 | public class LinkedTransferQueue<E> exte
351      }
352  
353      /**
354 +     * Returns validated tail for use in cleaning methods.
355 +     */
356 +    private Node<E> getValidatedTail() {
357 +        for (;;) {
358 +            Node<E> h = head.get();
359 +            Node<E> first = h.next;
360 +            if (first != null && first.get() == first) { // help advance
361 +                advanceHead(h, first);
362 +                continue;
363 +            }
364 +            Node<E> t = tail.get();
365 +            Node<E> last = t.next;
366 +            if (t == tail.get()) {
367 +                if (last != null)
368 +                    tail.compareAndSet(t, last); // help advance
369 +                else
370 +                    return t;
371 +            }
372 +        }
373 +    }
374 +
375 +    /**
376       * Gets rid of cancelled node s with original predecessor pred.
377 <     * @return null (to simplify use by callers)
377 >     *
378 >     * @param pred predecessor of cancelled node
379 >     * @param s the cancelled node
380       */
381 <    private Object clean(QNode pred, QNode s) {
381 >    private void clean(Node<E> pred, Node<E> s) {
382          Thread w = s.waiter;
383          if (w != null) {             // Wake up thread
384              s.waiter = null;
385              if (w != Thread.currentThread())
386                  LockSupport.unpark(w);
387          }
388 <        
389 <        for (;;) {
390 <            if (pred.next != s) // already cleaned
391 <                return null;
392 <            QNode h = head.get();
393 <            QNode hn = h.next;   // Absorb cancelled first node as head
394 <            if (hn != null && hn.next == hn) {
395 <                advanceHead(h, hn);
396 <                continue;
397 <            }
398 <            QNode t = tail.get();      // Ensure consistent read for tail
399 <            if (t == h)
400 <                return null;
401 <            QNode tn = t.next;
402 <            if (t != tail.get())
403 <                continue;
404 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
388 >
389 >        if (pred == null)
390 >            return;
391 >
392 >        /*
393 >         * At any given time, exactly one node on list cannot be
394 >         * deleted -- the last inserted node. To accommodate this, if
395 >         * we cannot delete s, we save its predecessor as "cleanMe",
396 >         * processing the previously saved version first. At least one
397 >         * of node s or the node previously saved can always be
398 >         * processed, so this always terminates.
399 >         */
400 >        while (pred.next == s) {
401 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
402 >            Node<E> t = getValidatedTail();
403 >            if (s != t) {               // If not tail, try to unsplice
404 >                Node<E> sn = s.next;      // s.next == s means s already off list
405                  if (sn == s || pred.casNext(s, sn))
406 <                    return null;
406 >                    break;
407              }
408 <            QNode dp = cleanMe.get();
409 <            if (dp != null) {    // Try unlinking previous cancelled node
410 <                QNode d = dp.next;
352 <                QNode dn;
353 <                if (d == null ||               // d is gone or
354 <                    d == dp ||                 // d is off list or
355 <                    d.get() != d ||            // d not cancelled or
356 <                    (d != t &&                 // d not tail and
357 <                     (dn = d.next) != null &&  //   has successor
358 <                     dn != d &&                //   that is on list
359 <                     dp.casNext(d, dn)))       // d unspliced
360 <                    cleanMe.compareAndSet(dp, null);
361 <                if (dp == pred)
362 <                    return null;      // s is already saved node
363 <            }
364 <            else if (cleanMe.compareAndSet(null, pred))
365 <                return null;          // Postpone cleaning s
408 >            else if (oldpred == pred || // Already saved
409 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
410 >                break;                  // Postpone cleaning
411          }
412      }
413 <    
413 >
414 >    /**
415 >     * Tries to unsplice the cancelled node held in cleanMe that was
416 >     * previously uncleanable because it was at tail.
417 >     *
418 >     * @return current cleanMe node (or null)
419 >     */
420 >    private Node<E> reclean() {
421 >        /*
422 >         * cleanMe is, or at one time was, predecessor of cancelled
423 >         * node s that was the tail so could not be unspliced.  If s
424 >         * is no longer the tail, try to unsplice if necessary and
425 >         * make cleanMe slot available.  This differs from similar
426 >         * code in clean() because we must check that pred still
427 >         * points to a cancelled node that must be unspliced -- if
428 >         * not, we can (must) clear cleanMe without unsplicing.
429 >         * This can loop only due to contention on casNext or
430 >         * clearing cleanMe.
431 >         */
432 >        Node<E> pred;
433 >        while ((pred = cleanMe.get()) != null) {
434 >            Node<E> t = getValidatedTail();
435 >            Node<E> s = pred.next;
436 >            if (s != t) {
437 >                Node<E> sn;
438 >                if (s == null || s == pred || s.get() != s ||
439 >                    (sn = s.next) == s || pred.casNext(s, sn))
440 >                    cleanMe.compareAndSet(pred, null);
441 >            }
442 >            else // s is still tail; cannot clean
443 >                break;
444 >        }
445 >        return pred;
446 >    }
447 >
448      /**
449 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
449 >     * Creates an initially empty {@code LinkedTransferQueue}.
450       */
451      public LinkedTransferQueue() {
452 +        Node<E> dummy = new Node<E>(null, false);
453 +        head = new PaddedAtomicReference<Node<E>>(dummy);
454 +        tail = new PaddedAtomicReference<Node<E>>(dummy);
455 +        cleanMe = new PaddedAtomicReference<Node<E>>(null);
456      }
457  
458      /**
459 <     * Creates a <tt>LinkedTransferQueue</tt>
459 >     * Creates a {@code LinkedTransferQueue}
460       * initially containing the elements of the given collection,
461       * added in traversal order of the collection's iterator.
462 +     *
463       * @param c the collection of elements to initially contain
464       * @throws NullPointerException if the specified collection or any
465       *         of its elements are null
466       */
467      public LinkedTransferQueue(Collection<? extends E> c) {
468 +        this();
469          addAll(c);
470      }
471  
472 <    public void put(E e) throws InterruptedException {
473 <        if (e == null) throw new NullPointerException();
474 <        if (Thread.interrupted()) throw new InterruptedException();
475 <        xfer(e, NOWAIT, 0);
472 >    /**
473 >     * Inserts the specified element at the tail of this queue.
474 >     * As the queue is unbounded, this method will never block.
475 >     *
476 >     * @throws NullPointerException if the specified element is null
477 >     */
478 >    public void put(E e) {
479 >        offer(e);
480      }
481  
482 <    public boolean offer(E e, long timeout, TimeUnit unit)  
483 <        throws InterruptedException {
484 <        if (e == null) throw new NullPointerException();
485 <        if (Thread.interrupted()) throw new InterruptedException();
486 <        xfer(e, NOWAIT, 0);
487 <        return true;
482 >    /**
483 >     * Inserts the specified element at the tail of this queue.
484 >     * As the queue is unbounded, this method will never block or
485 >     * return {@code false}.
486 >     *
487 >     * @return {@code true} (as specified by
488 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
489 >     * @throws NullPointerException if the specified element is null
490 >     */
491 >    public boolean offer(E e, long timeout, TimeUnit unit) {
492 >        return offer(e);
493      }
494  
495 +    /**
496 +     * Inserts the specified element at the tail of this queue.
497 +     * As the queue is unbounded, this method will never return {@code false}.
498 +     *
499 +     * @return {@code true} (as specified by
500 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
501 +     * @throws NullPointerException if the specified element is null
502 +     */
503      public boolean offer(E e) {
504          if (e == null) throw new NullPointerException();
505          xfer(e, NOWAIT, 0);
506          return true;
507      }
508  
509 +    /**
510 +     * Inserts the specified element at the tail of this queue.
511 +     * As the queue is unbounded, this method will never throw
512 +     * {@link IllegalStateException} or return {@code false}.
513 +     *
514 +     * @return {@code true} (as specified by {@link Collection#add})
515 +     * @throws NullPointerException if the specified element is null
516 +     */
517 +    public boolean add(E e) {
518 +        return offer(e);
519 +    }
520 +
521 +    /**
522 +     * Transfers the element to a waiting consumer immediately, if possible.
523 +     *
524 +     * <p>More precisely, transfers the specified element immediately
525 +     * if there exists a consumer already waiting to receive it (in
526 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
527 +     * otherwise returning {@code false} without enqueuing the element.
528 +     *
529 +     * @throws NullPointerException if the specified element is null
530 +     */
531 +    public boolean tryTransfer(E e) {
532 +        if (e == null) throw new NullPointerException();
533 +        return fulfill(e) != null;
534 +    }
535 +
536 +    /**
537 +     * Transfers the element to a consumer, waiting if necessary to do so.
538 +     *
539 +     * <p>More precisely, transfers the specified element immediately
540 +     * if there exists a consumer already waiting to receive it (in
541 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
542 +     * else inserts the specified element at the tail of this queue
543 +     * and waits until the element is received by a consumer.
544 +     *
545 +     * @throws NullPointerException if the specified element is null
546 +     */
547      public void transfer(E e) throws InterruptedException {
548          if (e == null) throw new NullPointerException();
549          if (xfer(e, WAIT, 0) == null) {
550 <            Thread.interrupted();
550 >            Thread.interrupted();
551              throw new InterruptedException();
552 <        }
552 >        }
553      }
554  
555 +    /**
556 +     * Transfers the element to a consumer if it is possible to do so
557 +     * before the timeout elapses.
558 +     *
559 +     * <p>More precisely, transfers the specified element immediately
560 +     * if there exists a consumer already waiting to receive it (in
561 +     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
562 +     * else inserts the specified element at the tail of this queue
563 +     * and waits until the element is received by a consumer,
564 +     * returning {@code false} if the specified wait time elapses
565 +     * before the element can be transferred.
566 +     *
567 +     * @throws NullPointerException if the specified element is null
568 +     */
569      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
570          throws InterruptedException {
571          if (e == null) throw new NullPointerException();
# Line 422 | Line 576 | public class LinkedTransferQueue<E> exte
576          throw new InterruptedException();
577      }
578  
425    public boolean tryTransfer(E e) {
426        if (e == null) throw new NullPointerException();
427        return fulfill(e) != null;
428    }
429
579      public E take() throws InterruptedException {
580 <        Object e = xfer(null, WAIT, 0);
580 >        E e = xfer(null, WAIT, 0);
581          if (e != null)
582 <            return (E)e;
583 <        Thread.interrupted();
582 >            return e;
583 >        Thread.interrupted();
584          throw new InterruptedException();
585      }
586  
587      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
588 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
588 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
589          if (e != null || !Thread.interrupted())
590 <            return (E)e;
590 >            return e;
591          throw new InterruptedException();
592      }
593  
594      public E poll() {
595 <        return (E)fulfill(null);
595 >        return fulfill(null);
596      }
597  
598 +    /**
599 +     * @throws NullPointerException     {@inheritDoc}
600 +     * @throws IllegalArgumentException {@inheritDoc}
601 +     */
602      public int drainTo(Collection<? super E> c) {
603          if (c == null)
604              throw new NullPointerException();
# Line 460 | Line 613 | public class LinkedTransferQueue<E> exte
613          return n;
614      }
615  
616 +    /**
617 +     * @throws NullPointerException     {@inheritDoc}
618 +     * @throws IllegalArgumentException {@inheritDoc}
619 +     */
620      public int drainTo(Collection<? super E> c, int maxElements) {
621          if (c == null)
622              throw new NullPointerException();
# Line 477 | Line 634 | public class LinkedTransferQueue<E> exte
634      // Traversal-based methods
635  
636      /**
637 <     * Return head after performing any outstanding helping steps
637 >     * Returns head after performing any outstanding helping steps.
638       */
639 <    private QNode traversalHead() {
639 >    private Node<E> traversalHead() {
640          for (;;) {
641 <            QNode t = tail.get();
642 <            QNode h = head.get();
643 <            if (h != null && t != null) {
644 <                QNode last = t.next;
645 <                QNode first = h.next;
646 <                if (t == tail.get()) {
647 <                    if (last != null)
648 <                        tail.compareAndSet(t, last);
649 <                    else if (first != null) {
650 <                        Object x = first.get();
651 <                        if (x == first)
495 <                            advanceHead(h, first);    
496 <                        else
497 <                            return h;
498 <                    }
641 >            Node<E> t = tail.get();
642 >            Node<E> h = head.get();
643 >            Node<E> last = t.next;
644 >            Node<E> first = h.next;
645 >            if (t == tail.get()) {
646 >                if (last != null)
647 >                    tail.compareAndSet(t, last);
648 >                else if (first != null) {
649 >                    Object x = first.get();
650 >                    if (x == first)
651 >                        advanceHead(h, first);
652                      else
653                          return h;
654                  }
655 +                else
656 +                    return h;
657              }
658 +            reclean();
659          }
660      }
661  
662 <
662 >    /**
663 >     * Returns an iterator over the elements in this queue in proper
664 >     * sequence, from head to tail.
665 >     *
666 >     * <p>The returned iterator is a "weakly consistent" iterator that
667 >     * will never throw
668 >     * {@link ConcurrentModificationException ConcurrentModificationException},
669 >     * and guarantees to traverse elements as they existed upon
670 >     * construction of the iterator, and may (but is not guaranteed
671 >     * to) reflect any modifications subsequent to construction.
672 >     *
673 >     * @return an iterator over the elements in this queue in proper sequence
674 >     */
675      public Iterator<E> iterator() {
676          return new Itr();
677      }
678  
679      /**
680 <     * Iterators. Basic strategy os to travers list, treating
680 >     * Iterators. Basic strategy is to traverse list, treating
681       * non-data (i.e., request) nodes as terminating list.
682       * Once a valid data node is found, the item is cached
683       * so that the next call to next() will return it even
684       * if subsequently removed.
685       */
686      class Itr implements Iterator<E> {
687 <        QNode nextNode;    // Next node to return next
688 <        QNode currentNode; // last returned node, for remove()
689 <        QNode prevNode;    // predecessor of last returned node
690 <        E nextItem;        // Cache of next item, once commited to in next
691 <        
687 >        Node<E> next;        // node to return next
688 >        Node<E> pnext;       // predecessor of next
689 >        Node<E> curr;        // last returned node, for remove()
690 >        Node<E> pcurr;       // predecessor of curr, for remove()
691 >        E nextItem;          // Cache of next item, once committed to in next
692 >
693          Itr() {
525            nextNode = traversalHead();
694              advance();
695          }
696 <        
697 <        E advance() {
698 <            prevNode = currentNode;
699 <            currentNode = nextNode;
700 <            E x = nextItem;
701 <            
702 <            QNode p = nextNode.next;
696 >
697 >        /**
698 >         * Moves to next valid node and returns item to return for
699 >         * next(), or null if no such.
700 >         */
701 >        private E advance() {
702 >            pcurr = pnext;
703 >            curr = next;
704 >            E item = nextItem;
705 >
706              for (;;) {
707 <                if (p == null || !p.isData) {
708 <                    nextNode = null;
709 <                    nextItem = null;
710 <                    return x;
711 <                }
712 <                Object item = p.get();
713 <                if (item != p && item != null) {
714 <                    nextNode = p;
715 <                    nextItem = (E)item;
716 <                    return x;
717 <                }
718 <                prevNode = p;
719 <                p = p.next;
707 >                pnext = (next == null) ? traversalHead() : next;
708 >                next = pnext.next;
709 >                if (next == pnext) {
710 >                    next = null;
711 >                    continue;  // restart
712 >                }
713 >                if (next == null)
714 >                    break;
715 >                Object x = next.get();
716 >                if (x != null && x != next) {
717 >                    nextItem = (E) x;
718 >                    break;
719 >                }
720              }
721 +            return item;
722          }
723 <        
723 >
724          public boolean hasNext() {
725 <            return nextNode != null;
725 >            return next != null;
726          }
727 <        
727 >
728          public E next() {
729 <            if (nextNode == null) throw new NoSuchElementException();
729 >            if (next == null)
730 >                throw new NoSuchElementException();
731              return advance();
732          }
733 <        
733 >
734          public void remove() {
735 <            QNode p = currentNode;
736 <            QNode prev = prevNode;
564 <            if (prev == null || p == null)
735 >            Node<E> p = curr;
736 >            if (p == null)
737                  throw new IllegalStateException();
738              Object x = p.get();
739              if (x != null && x != p && p.compareAndSet(x, p))
740 <                clean(prev, p);
740 >                clean(pcurr, p);
741          }
742      }
743  
744      public E peek() {
745          for (;;) {
746 <            QNode h = traversalHead();
747 <            QNode p = h.next;
746 >            Node<E> h = traversalHead();
747 >            Node<E> p = h.next;
748              if (p == null)
749                  return null;
750              Object x = p.get();
# Line 580 | Line 752 | public class LinkedTransferQueue<E> exte
752                  if (!p.isData)
753                      return null;
754                  if (x != null)
755 <                    return (E)x;
755 >                    return (E) x;
756              }
757          }
758      }
759  
760 +    /**
761 +     * Returns {@code true} if this queue contains no elements.
762 +     *
763 +     * @return {@code true} if this queue contains no elements
764 +     */
765      public boolean isEmpty() {
766          for (;;) {
767 <            QNode h = traversalHead();
768 <            QNode p = h.next;
767 >            Node<E> h = traversalHead();
768 >            Node<E> p = h.next;
769              if (p == null)
770                  return true;
771              Object x = p.get();
# Line 603 | Line 780 | public class LinkedTransferQueue<E> exte
780  
781      public boolean hasWaitingConsumer() {
782          for (;;) {
783 <            QNode h = traversalHead();
784 <            QNode p = h.next;
783 >            Node<E> h = traversalHead();
784 >            Node<E> p = h.next;
785              if (p == null)
786                  return false;
787              Object x = p.get();
788 <            if (p != x)
788 >            if (p != x)
789                  return !p.isData;
790          }
791      }
792 <    
792 >
793      /**
794       * Returns the number of elements in this queue.  If this queue
795 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
796 <     * <tt>Integer.MAX_VALUE</tt>.
795 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
796 >     * {@code Integer.MAX_VALUE}.
797       *
798       * <p>Beware that, unlike in most collections, this method is
799       * <em>NOT</em> a constant-time operation. Because of the
# Line 626 | Line 803 | public class LinkedTransferQueue<E> exte
803       * @return the number of elements in this queue
804       */
805      public int size() {
806 <        int count = 0;
807 <        QNode h = traversalHead();
808 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
809 <            Object x = p.get();
810 <            if (x != null && x != p) {
811 <                if (++count == Integer.MAX_VALUE) // saturated
806 >        for (;;) {
807 >            int count = 0;
808 >            Node<E> pred = traversalHead();
809 >            for (;;) {
810 >                Node<E> q = pred.next;
811 >                if (q == pred) // restart
812                      break;
813 +                if (q == null || !q.isData)
814 +                    return count;
815 +                Object x = q.get();
816 +                if (x != null && x != q) {
817 +                    if (++count == Integer.MAX_VALUE) // saturated
818 +                        return count;
819 +                }
820 +                pred = q;
821              }
822          }
638        return count;
823      }
824  
825      public int getWaitingConsumerCount() {
826 <        int count = 0;
827 <        QNode h = traversalHead();
828 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
829 <            if (p.get() == null) {
830 <                if (++count == Integer.MAX_VALUE)
826 >        // converse of size -- count valid non-data nodes
827 >        for (;;) {
828 >            int count = 0;
829 >            Node<E> pred = traversalHead();
830 >            for (;;) {
831 >                Node<E> q = pred.next;
832 >                if (q == pred) // restart
833                      break;
834 +                if (q == null || q.isData)
835 +                    return count;
836 +                Object x = q.get();
837 +                if (x == null) {
838 +                    if (++count == Integer.MAX_VALUE) // saturated
839 +                        return count;
840 +                }
841 +                pred = q;
842              }
843          }
650        return count;
844      }
845  
846 +    /**
847 +     * Removes a single instance of the specified element from this queue,
848 +     * if it is present.  More formally, removes an element {@code e} such
849 +     * that {@code o.equals(e)}, if this queue contains one or more such
850 +     * elements.
851 +     * Returns {@code true} if this queue contained the specified element
852 +     * (or equivalently, if this queue changed as a result of the call).
853 +     *
854 +     * @param o element to be removed from this queue, if present
855 +     * @return {@code true} if this queue changed as a result of the call
856 +     */
857 +    public boolean remove(Object o) {
858 +        if (o == null)
859 +            return false;
860 +        for (;;) {
861 +            Node<E> pred = traversalHead();
862 +            for (;;) {
863 +                Node<E> q = pred.next;
864 +                if (q == pred) // restart
865 +                    break;
866 +                if (q == null || !q.isData)
867 +                    return false;
868 +                Object x = q.get();
869 +                if (x != null && x != q && o.equals(x) &&
870 +                    q.compareAndSet(x, q)) {
871 +                    clean(pred, q);
872 +                    return true;
873 +                }
874 +                pred = q;
875 +            }
876 +        }
877 +    }
878 +
879 +    /**
880 +     * Always returns {@code Integer.MAX_VALUE} because a
881 +     * {@code LinkedTransferQueue} is not capacity constrained.
882 +     *
883 +     * @return {@code Integer.MAX_VALUE} (as specified by
884 +     *         {@link BlockingQueue#remainingCapacity()})
885 +     */
886      public int remainingCapacity() {
887          return Integer.MAX_VALUE;
888      }
# Line 657 | Line 890 | public class LinkedTransferQueue<E> exte
890      /**
891       * Save the state to a stream (that is, serialize it).
892       *
893 <     * @serialData All of the elements (each an <tt>E</tt>) in
893 >     * @serialData All of the elements (each an {@code E}) in
894       * the proper order, followed by a null
895       * @param s the stream
896       */
897      private void writeObject(java.io.ObjectOutputStream s)
898          throws java.io.IOException {
899          s.defaultWriteObject();
900 <        for (Iterator<E> it = iterator(); it.hasNext(); )
901 <            s.writeObject(it.next());
900 >        for (E e : this)
901 >            s.writeObject(e);
902          // Use trailing null as sentinel
903          s.writeObject(null);
904      }
# Line 673 | Line 906 | public class LinkedTransferQueue<E> exte
906      /**
907       * Reconstitute the Queue instance from a stream (that is,
908       * deserialize it).
909 +     *
910       * @param s the stream
911       */
912      private void readObject(java.io.ObjectInputStream s)
913          throws java.io.IOException, ClassNotFoundException {
914          s.defaultReadObject();
915 +        resetHeadAndTail();
916          for (;;) {
917 <            E item = (E)s.readObject();
917 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
918              if (item == null)
919                  break;
920              else
921                  offer(item);
922          }
923      }
924 +
925 +    // Support for resetting head/tail while deserializing
926 +    private void resetHeadAndTail() {
927 +        Node<E> dummy = new Node<E>(null, false);
928 +        UNSAFE.putObjectVolatile(this, headOffset,
929 +                                 new PaddedAtomicReference<Node<E>>(dummy));
930 +        UNSAFE.putObjectVolatile(this, tailOffset,
931 +                                 new PaddedAtomicReference<Node<E>>(dummy));
932 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
933 +                                 new PaddedAtomicReference<Node<E>>(null));
934 +    }
935 +
936 +    // Unsafe mechanics
937 +
938 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
939 +    private static final long headOffset =
940 +        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
941 +    private static final long tailOffset =
942 +        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
943 +    private static final long cleanMeOffset =
944 +        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
945 +
946 +
947 +    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
948 +                                  String field, Class<?> klazz) {
949 +        try {
950 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
951 +        } catch (NoSuchFieldException e) {
952 +            // Convert Exception to corresponding Error
953 +            NoSuchFieldError error = new NoSuchFieldError(field);
954 +            error.initCause(e);
955 +            throw error;
956 +        }
957 +    }
958 +
959 +    /**
960 +     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
961 +     * Replace with a simple call to Unsafe.getUnsafe when integrating
962 +     * into a jdk.
963 +     *
964 +     * @return a sun.misc.Unsafe
965 +     */
966 +    private static sun.misc.Unsafe getUnsafe() {
967 +        try {
968 +            return sun.misc.Unsafe.getUnsafe();
969 +        } catch (SecurityException se) {
970 +            try {
971 +                return java.security.AccessController.doPrivileged
972 +                    (new java.security
973 +                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
974 +                        public sun.misc.Unsafe run() throws Exception {
975 +                            java.lang.reflect.Field f = sun.misc
976 +                                .Unsafe.class.getDeclaredField("theUnsafe");
977 +                            f.setAccessible(true);
978 +                            return (sun.misc.Unsafe) f.get(null);
979 +                        }});
980 +            } catch (java.security.PrivilegedActionException e) {
981 +                throw new RuntimeException("Could not initialize intrinsics",
982 +                                           e.getCause());
983 +            }
984 +        }
985 +    }
986   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines