ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.1 by dl, Tue May 29 09:55:32 2007 UTC vs.
Revision 1.33 by dl, Thu Jul 30 13:30:19 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.Iterator;
14 > import java.util.NoSuchElementException;
15 > import java.util.concurrent.locks.LockSupport;
16 > import java.util.concurrent.atomic.AtomicReference;
17  
18   /**
19   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 19 | Line 23 | import java.io.*;
23   * producer.  The <em>tail</em> of the queue is that element that has
24   * been on the queue the shortest time for some producer.
25   *
26 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
26 > * <p>Beware that, unlike in most collections, the {@code size}
27   * method is <em>NOT</em> a constant-time operation. Because of the
28   * asynchronous nature of these queues, determining the current number
29   * of elements requires a traversal of the elements.
# Line 39 | Line 43 | import java.io.*;
43   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
44   * Java Collections Framework</a>.
45   *
46 < * @since 1.5
46 > * @since 1.7
47   * @author Doug Lea
48   * @param <E> the type of elements held in this collection
45 *
49   */
50   public class LinkedTransferQueue<E> extends AbstractQueue<E>
51      implements TransferQueue<E>, java.io.Serializable {
52      private static final long serialVersionUID = -3223113410248163686L;
53  
54      /*
52     * This is still a work in prgress...
53     *
55       * This class extends the approach used in FIFO-mode
56       * SynchronousQueues. See the internal documentation, as well as
57       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
58       * Lea & Scott
59       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
60       *
61 <     * The main extension is to provide different Wait modes
62 <     * for the main "xfer" method that puts or takes items.
63 <     * These don't impact the basic dual-queue logic, but instead
64 <     * control whether or how threads block upon insertion
65 <     * of request or data nodes into the dual queue.
61 >     * The main extension is to provide different Wait modes for the
62 >     * main "xfer" method that puts or takes items.  These don't
63 >     * impact the basic dual-queue logic, but instead control whether
64 >     * or how threads block upon insertion of request or data nodes
65 >     * into the dual queue. It also uses slightly different
66 >     * conventions for tracking whether nodes are off-list or
67 >     * cancelled.
68       */
69  
70      // Wait modes for xfer method
# Line 79 | Line 82 | public class LinkedTransferQueue<E> exte
82       * seems not to vary with number of CPUs (beyond 2) so is just
83       * a constant.
84       */
85 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
85 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
86  
87      /**
88       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 97 | public class LinkedTransferQueue<E> exte
97       */
98      static final long spinForTimeoutThreshold = 1000L;
99  
100 <    /**
101 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
102 <     * AtomicReference to represent item. Uses Object, not E, to allow
103 <     * setting item to "this" after use, to avoid garbage
104 <     * retention. Similarly, setting the next field to this is used as
105 <     * sentinel that node is off list.
100 >    /**
101 >     * Node class for LinkedTransferQueue. Opportunistically
102 >     * subclasses from AtomicReference to represent item. Uses Object,
103 >     * not E, to allow setting item to "this" after use, to avoid
104 >     * garbage retention. Similarly, setting the next field to this is
105 >     * used as sentinel that node is off list.
106       */
107 <    static final class QNode extends AtomicReference<Object> {
108 <        volatile QNode next;
107 >    static final class Node<E> extends AtomicReference<Object> {
108 >        volatile Node<E> next;
109          volatile Thread waiter;       // to control park/unpark
110          final boolean isData;
111 <        QNode(Object item, boolean isData) {
111 >
112 >        Node(E item, boolean isData) {
113              super(item);
114              this.isData = isData;
115          }
116  
117 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
114 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
115 <            (QNode.class, QNode.class, "next");
117 >        // Unsafe mechanics
118  
119 <        boolean casNext(QNode cmp, QNode val) {
120 <            return nextUpdater.compareAndSet(this, cmp, val);
119 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
120 >        private static final long nextOffset =
121 >            objectFieldOffset(UNSAFE, "next", Node.class);
122 >
123 >        final boolean casNext(Node<E> cmp, Node<E> val) {
124 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
125 >        }
126 >
127 >        final void clearNext() {
128 >            UNSAFE.putOrderedObject(this, nextOffset, this);
129 >        }
130 >
131 >        /**
132 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
133 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
134 >         * into a jdk.
135 >         *
136 >         * @return a sun.misc.Unsafe
137 >         */
138 >        private static sun.misc.Unsafe getUnsafe() {
139 >            try {
140 >                return sun.misc.Unsafe.getUnsafe();
141 >            } catch (SecurityException se) {
142 >                try {
143 >                    return java.security.AccessController.doPrivileged
144 >                        (new java.security
145 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
146 >                            public sun.misc.Unsafe run() throws Exception {
147 >                                java.lang.reflect.Field f = sun.misc
148 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
149 >                                f.setAccessible(true);
150 >                                return (sun.misc.Unsafe) f.get(null);
151 >                            }});
152 >                } catch (java.security.PrivilegedActionException e) {
153 >                    throw new RuntimeException("Could not initialize intrinsics",
154 >                                               e.getCause());
155 >                }
156 >            }
157          }
158 +
159 +        private static final long serialVersionUID = -3375979862319811754L;
160      }
161  
162      /**
# Line 128 | Line 168 | public class LinkedTransferQueue<E> exte
168          // enough padding for 64bytes with 4byte refs
169          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
170          PaddedAtomicReference(T r) { super(r); }
171 +        private static final long serialVersionUID = 8170090609809740854L;
172      }
173  
174  
175 <    private final QNode dummy = new QNode(null, false);
176 <    private final PaddedAtomicReference<QNode> head =
177 <        new PaddedAtomicReference<QNode>(dummy);
178 <    private final PaddedAtomicReference<QNode> tail =
179 <        new PaddedAtomicReference<QNode>(dummy);
175 >    /** head of the queue */
176 >    private transient final PaddedAtomicReference<Node<E>> head;
177 >
178 >    /** tail of the queue */
179 >    private transient final PaddedAtomicReference<Node<E>> tail;
180  
181      /**
182       * Reference to a cancelled node that might not yet have been
183       * unlinked from queue because it was the last inserted node
184       * when it cancelled.
185       */
186 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
186 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
187  
188      /**
189       * Tries to cas nh as new head; if successful, unlink
190       * old head's next node to avoid garbage retention.
191       */
192 <    private boolean advanceHead(QNode h, QNode nh) {
192 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
193          if (h == head.get() && head.compareAndSet(h, nh)) {
194 <            h.next = h; // forget old next
194 >            h.clearNext(); // forget old next
195              return true;
196          }
197          return false;
198      }
199 <    
199 >
200      /**
201       * Puts or takes an item. Used for most queue operations (except
202 <     * poll() and tryTransfer())
203 <     * @param e the item or if null, signfies that this is a take
202 >     * poll() and tryTransfer()). See the similar code in
203 >     * SynchronousQueue for detailed explanation.
204 >     *
205 >     * @param e the item or if null, signifies that this is a take
206       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
207       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
208       * @return an item, or null on failure
209       */
210 <    private Object xfer(Object e, int mode, long nanos) {
210 >    private E xfer(E e, int mode, long nanos) {
211          boolean isData = (e != null);
212 <        QNode s = null;
213 <        final PaddedAtomicReference<QNode> head = this.head;
214 <        final PaddedAtomicReference<QNode> tail = this.tail;
212 >        Node<E> s = null;
213 >        final PaddedAtomicReference<Node<E>> head = this.head;
214 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
215  
216          for (;;) {
217 <            QNode t = tail.get();
218 <            QNode h = head.get();
217 >            Node<E> t = tail.get();
218 >            Node<E> h = head.get();
219  
220              if (t != null && (t == h || t.isData == isData)) {
221                  if (s == null)
222 <                    s = new QNode(e, isData);
223 <                QNode last = t.next;
222 >                    s = new Node<E>(e, isData);
223 >                Node<E> last = t.next;
224                  if (last != null) {
225                      if (t == tail.get())
226                          tail.compareAndSet(t, last);
# Line 188 | Line 230 | public class LinkedTransferQueue<E> exte
230                      return awaitFulfill(t, s, e, mode, nanos);
231                  }
232              }
233 <            
233 >
234              else if (h != null) {
235 <                QNode first = h.next;
236 <                if (t == tail.get() && first != null &&
235 >                Node<E> first = h.next;
236 >                if (t == tail.get() && first != null &&
237                      advanceHead(h, first)) {
238                      Object x = first.get();
239                      if (x != first && first.compareAndSet(x, e)) {
240                          LockSupport.unpark(first.waiter);
241 <                        return isData? e : x;
241 >                        return isData ? e : (E) x;
242                      }
243                  }
244              }
# Line 206 | Line 248 | public class LinkedTransferQueue<E> exte
248  
249      /**
250       * Version of xfer for poll() and tryTransfer, which
251 <     * simpifies control paths both here and in xfer
251 >     * simplifies control paths both here and in xfer.
252       */
253 <    private Object fulfill(Object e) {
253 >    private E fulfill(E e) {
254          boolean isData = (e != null);
255 <        final PaddedAtomicReference<QNode> head = this.head;
256 <        final PaddedAtomicReference<QNode> tail = this.tail;
255 >        final PaddedAtomicReference<Node<E>> head = this.head;
256 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
257  
258          for (;;) {
259 <            QNode t = tail.get();
260 <            QNode h = head.get();
259 >            Node<E> t = tail.get();
260 >            Node<E> h = head.get();
261  
262              if (t != null && (t == h || t.isData == isData)) {
263 <                QNode last = t.next;
263 >                Node<E> last = t.next;
264                  if (t == tail.get()) {
265                      if (last != null)
266                          tail.compareAndSet(t, last);
# Line 227 | Line 269 | public class LinkedTransferQueue<E> exte
269                  }
270              }
271              else if (h != null) {
272 <                QNode first = h.next;
273 <                if (t == tail.get() &&
272 >                Node<E> first = h.next;
273 >                if (t == tail.get() &&
274                      first != null &&
275                      advanceHead(h, first)) {
276                      Object x = first.get();
277                      if (x != first && first.compareAndSet(x, e)) {
278                          LockSupport.unpark(first.waiter);
279 <                        return isData? e : x;
279 >                        return isData ? e : (E) x;
280                      }
281                  }
282              }
# Line 252 | Line 294 | public class LinkedTransferQueue<E> exte
294       * @param nanos timeout value
295       * @return matched item, or s if cancelled
296       */
297 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
298 <                                int mode, long nanos) {
297 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
298 >                           int mode, long nanos) {
299          if (mode == NOWAIT)
300              return null;
301  
302 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
302 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
303          Thread w = Thread.currentThread();
304          int spins = -1; // set to desired spin count below
305          for (;;) {
# Line 266 | Line 308 | public class LinkedTransferQueue<E> exte
308              Object x = s.get();
309              if (x != e) {                 // Node was matched or cancelled
310                  advanceHead(pred, s);     // unlink if head
311 <                if (x == s)               // was cancelled
312 <                    return clean(pred, s);
313 <                else if (x != null) {    
311 >                if (x == s) {             // was cancelled
312 >                    clean(pred, s);
313 >                    return null;
314 >                }
315 >                else if (x != null) {
316                      s.set(s);             // avoid garbage retention
317 <                    return x;
317 >                    return (E) x;
318                  }
319                  else
320                      return e;
321              }
278
322              if (mode == TIMEOUT) {
323                  long now = System.nanoTime();
324                  nanos -= now - lastTime;
# Line 286 | Line 329 | public class LinkedTransferQueue<E> exte
329                  }
330              }
331              if (spins < 0) {
332 <                QNode h = head.get(); // only spin if at head
332 >                Node<E> h = head.get(); // only spin if at head
333                  spins = ((h != null && h.next == s) ?
334 <                         (mode == TIMEOUT?
334 >                         ((mode == TIMEOUT) ?
335                            maxTimedSpins : maxUntimedSpins) : 0);
336              }
337              if (spins > 0)
# Line 296 | Line 339 | public class LinkedTransferQueue<E> exte
339              else if (s.waiter == null)
340                  s.waiter = w;
341              else if (mode != TIMEOUT) {
342 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
342 >                LockSupport.park(this);
343                  s.waiter = null;
344                  spins = -1;
345              }
346              else if (nanos > spinForTimeoutThreshold) {
347 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
347 >                LockSupport.parkNanos(this, nanos);
348                  s.waiter = null;
349                  spins = -1;
350              }
# Line 311 | Line 352 | public class LinkedTransferQueue<E> exte
352      }
353  
354      /**
355 +     * Returns validated tail for use in cleaning methods.
356 +     */
357 +    private Node<E> getValidatedTail() {
358 +        for (;;) {
359 +            Node<E> h = head.get();
360 +            Node<E> first = h.next;
361 +            if (first != null && first.next == first) { // help advance
362 +                advanceHead(h, first);
363 +                continue;
364 +            }
365 +            Node<E> t = tail.get();
366 +            Node<E> last = t.next;
367 +            if (t == tail.get()) {
368 +                if (last != null)
369 +                    tail.compareAndSet(t, last); // help advance
370 +                else
371 +                    return t;
372 +            }
373 +        }
374 +    }
375 +
376 +    /**
377       * Gets rid of cancelled node s with original predecessor pred.
378 <     * @return null (to simplify use by callers)
378 >     *
379 >     * @param pred predecessor of cancelled node
380 >     * @param s the cancelled node
381       */
382 <    private Object clean(QNode pred, QNode s) {
382 >    private void clean(Node<E> pred, Node<E> s) {
383          Thread w = s.waiter;
384          if (w != null) {             // Wake up thread
385              s.waiter = null;
386              if (w != Thread.currentThread())
387                  LockSupport.unpark(w);
388          }
389 <        
390 <        for (;;) {
391 <            if (pred.next != s) // already cleaned
392 <                return null;
393 <            QNode h = head.get();
394 <            QNode hn = h.next;   // Absorb cancelled first node as head
395 <            if (hn != null && hn.next == hn) {
396 <                advanceHead(h, hn);
397 <                continue;
398 <            }
399 <            QNode t = tail.get();      // Ensure consistent read for tail
400 <            if (t == h)
401 <                return null;
402 <            QNode tn = t.next;
403 <            if (t != tail.get())
404 <                continue;
405 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
389 >
390 >        if (pred == null)
391 >            return;
392 >
393 >        /*
394 >         * At any given time, exactly one node on list cannot be
395 >         * deleted -- the last inserted node. To accommodate this, if
396 >         * we cannot delete s, we save its predecessor as "cleanMe",
397 >         * processing the previously saved version first. At least one
398 >         * of node s or the node previously saved can always be
399 >         * processed, so this always terminates.
400 >         */
401 >        while (pred.next == s) {
402 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
403 >            Node<E> t = getValidatedTail();
404 >            if (s != t) {               // If not tail, try to unsplice
405 >                Node<E> sn = s.next;      // s.next == s means s already off list
406                  if (sn == s || pred.casNext(s, sn))
407 <                    return null;
407 >                    break;
408 >            }
409 >            else if (oldpred == pred || // Already saved
410 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
411 >                break;                  // Postpone cleaning
412 >        }
413 >    }
414 >
415 >    /**
416 >     * Tries to unsplice the cancelled node held in cleanMe that was
417 >     * previously uncleanable because it was at tail.
418 >     *
419 >     * @return current cleanMe node (or null)
420 >     */
421 >    private Node<E> reclean() {
422 >        /*
423 >         * cleanMe is, or at one time was, predecessor of cancelled
424 >         * node s that was the tail so could not be unspliced.  If s
425 >         * is no longer the tail, try to unsplice if necessary and
426 >         * make cleanMe slot available.  This differs from similar
427 >         * code in clean() because we must check that pred still
428 >         * points to a cancelled node that must be unspliced -- if
429 >         * not, we can (must) clear cleanMe without unsplicing.
430 >         * This can loop only due to contention on casNext or
431 >         * clearing cleanMe.
432 >         */
433 >        Node<E> pred;
434 >        while ((pred = cleanMe.get()) != null) {
435 >            Node<E> t = getValidatedTail();
436 >            Node<E> s = pred.next;
437 >            if (s != t) {
438 >                Node<E> sn;
439 >                if (s == null || s == pred || s.get() != s ||
440 >                    (sn = s.next) == s || pred.casNext(s, sn))
441 >                    cleanMe.compareAndSet(pred, null);
442              }
443 <            QNode dp = cleanMe.get();
444 <            if (dp != null) {    // Try unlinking previous cancelled node
351 <                QNode d = dp.next;
352 <                QNode dn;
353 <                if (d == null ||               // d is gone or
354 <                    d == dp ||                 // d is off list or
355 <                    d.get() != d ||            // d not cancelled or
356 <                    (d != t &&                 // d not tail and
357 <                     (dn = d.next) != null &&  //   has successor
358 <                     dn != d &&                //   that is on list
359 <                     dp.casNext(d, dn)))       // d unspliced
360 <                    cleanMe.compareAndSet(dp, null);
361 <                if (dp == pred)
362 <                    return null;      // s is already saved node
363 <            }
364 <            else if (cleanMe.compareAndSet(null, pred))
365 <                return null;          // Postpone cleaning s
443 >            else // s is still tail; cannot clean
444 >                break;
445          }
446 +        return pred;
447      }
448 <    
448 >
449      /**
450 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
450 >     * Creates an initially empty {@code LinkedTransferQueue}.
451       */
452      public LinkedTransferQueue() {
453 +        Node<E> dummy = new Node<E>(null, false);
454 +        head = new PaddedAtomicReference<Node<E>>(dummy);
455 +        tail = new PaddedAtomicReference<Node<E>>(dummy);
456 +        cleanMe = new PaddedAtomicReference<Node<E>>(null);
457      }
458  
459      /**
460 <     * Creates a <tt>LinkedTransferQueue</tt>
460 >     * Creates a {@code LinkedTransferQueue}
461       * initially containing the elements of the given collection,
462       * added in traversal order of the collection's iterator.
463 +     *
464       * @param c the collection of elements to initially contain
465       * @throws NullPointerException if the specified collection or any
466       *         of its elements are null
467       */
468      public LinkedTransferQueue(Collection<? extends E> c) {
469 +        this();
470          addAll(c);
471      }
472  
473 +    /**
474 +     * @throws InterruptedException {@inheritDoc}
475 +     * @throws NullPointerException {@inheritDoc}
476 +     */
477      public void put(E e) throws InterruptedException {
478          if (e == null) throw new NullPointerException();
479          if (Thread.interrupted()) throw new InterruptedException();
480          xfer(e, NOWAIT, 0);
481      }
482  
483 <    public boolean offer(E e, long timeout, TimeUnit unit)  
483 >    /**
484 >     * @throws InterruptedException {@inheritDoc}
485 >     * @throws NullPointerException {@inheritDoc}
486 >     */
487 >    public boolean offer(E e, long timeout, TimeUnit unit)
488          throws InterruptedException {
489          if (e == null) throw new NullPointerException();
490          if (Thread.interrupted()) throw new InterruptedException();
# Line 398 | Line 492 | public class LinkedTransferQueue<E> exte
492          return true;
493      }
494  
495 +    /**
496 +     * @throws NullPointerException {@inheritDoc}
497 +     */
498      public boolean offer(E e) {
499          if (e == null) throw new NullPointerException();
500          xfer(e, NOWAIT, 0);
501          return true;
502      }
503  
504 +    /**
505 +     * @throws NullPointerException {@inheritDoc}
506 +     */
507 +    public boolean add(E e) {
508 +        if (e == null) throw new NullPointerException();
509 +        xfer(e, NOWAIT, 0);
510 +        return true;
511 +    }
512 +
513 +    /**
514 +     * @throws InterruptedException {@inheritDoc}
515 +     * @throws NullPointerException {@inheritDoc}
516 +     */
517      public void transfer(E e) throws InterruptedException {
518          if (e == null) throw new NullPointerException();
519          if (xfer(e, WAIT, 0) == null) {
520 <            Thread.interrupted();
520 >            Thread.interrupted();
521              throw new InterruptedException();
522 <        }
522 >        }
523      }
524  
525 +    /**
526 +     * @throws InterruptedException {@inheritDoc}
527 +     * @throws NullPointerException {@inheritDoc}
528 +     */
529      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
530          throws InterruptedException {
531          if (e == null) throw new NullPointerException();
# Line 422 | Line 536 | public class LinkedTransferQueue<E> exte
536          throw new InterruptedException();
537      }
538  
539 +    /**
540 +     * @throws NullPointerException {@inheritDoc}
541 +     */
542      public boolean tryTransfer(E e) {
543          if (e == null) throw new NullPointerException();
544          return fulfill(e) != null;
545      }
546  
547 +    /**
548 +     * @throws InterruptedException {@inheritDoc}
549 +     */
550      public E take() throws InterruptedException {
551          Object e = xfer(null, WAIT, 0);
552          if (e != null)
553 <            return (E)e;
554 <        Thread.interrupted();
553 >            return (E) e;
554 >        Thread.interrupted();
555          throw new InterruptedException();
556      }
557  
558 +    /**
559 +     * @throws InterruptedException {@inheritDoc}
560 +     */
561      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
562          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
563          if (e != null || !Thread.interrupted())
564 <            return (E)e;
564 >            return (E) e;
565          throw new InterruptedException();
566      }
567  
568      public E poll() {
569 <        return (E)fulfill(null);
569 >        return fulfill(null);
570      }
571  
572 +    /**
573 +     * @throws NullPointerException     {@inheritDoc}
574 +     * @throws IllegalArgumentException {@inheritDoc}
575 +     */
576      public int drainTo(Collection<? super E> c) {
577          if (c == null)
578              throw new NullPointerException();
# Line 460 | Line 587 | public class LinkedTransferQueue<E> exte
587          return n;
588      }
589  
590 +    /**
591 +     * @throws NullPointerException     {@inheritDoc}
592 +     * @throws IllegalArgumentException {@inheritDoc}
593 +     */
594      public int drainTo(Collection<? super E> c, int maxElements) {
595          if (c == null)
596              throw new NullPointerException();
# Line 477 | Line 608 | public class LinkedTransferQueue<E> exte
608      // Traversal-based methods
609  
610      /**
611 <     * Return head after performing any outstanding helping steps
611 >     * Returns head after performing any outstanding helping steps.
612       */
613 <    private QNode traversalHead() {
613 >    private Node<E> traversalHead() {
614          for (;;) {
615 <            QNode t = tail.get();
616 <            QNode h = head.get();
615 >            Node<E> t = tail.get();
616 >            Node<E> h = head.get();
617              if (h != null && t != null) {
618 <                QNode last = t.next;
619 <                QNode first = h.next;
618 >                Node<E> last = t.next;
619 >                Node<E> first = h.next;
620                  if (t == tail.get()) {
621 <                    if (last != null)
621 >                    if (last != null)
622                          tail.compareAndSet(t, last);
623                      else if (first != null) {
624                          Object x = first.get();
625 <                        if (x == first)
626 <                            advanceHead(h, first);    
625 >                        if (x == first)
626 >                            advanceHead(h, first);
627                          else
628                              return h;
629                      }
# Line 500 | Line 631 | public class LinkedTransferQueue<E> exte
631                          return h;
632                  }
633              }
634 +            reclean();
635          }
636      }
637  
506
638      public Iterator<E> iterator() {
639          return new Itr();
640      }
641  
642      /**
643 <     * Iterators. Basic strategy os to travers list, treating
643 >     * Iterators. Basic strategy is to traverse list, treating
644       * non-data (i.e., request) nodes as terminating list.
645       * Once a valid data node is found, the item is cached
646       * so that the next call to next() will return it even
647       * if subsequently removed.
648       */
649      class Itr implements Iterator<E> {
650 <        QNode nextNode;    // Next node to return next
651 <        QNode currentNode; // last returned node, for remove()
652 <        QNode prevNode;    // predecessor of last returned node
653 <        E nextItem;        // Cache of next item, once commited to in next
654 <        
650 >        Node<E> next;        // node to return next
651 >        Node<E> pnext;       // predecessor of next
652 >        Node<E> curr;        // last returned node, for remove()
653 >        Node<E> pcurr;       // predecessor of curr, for remove()
654 >        E nextItem;          // Cache of next item, once committed to in next
655 >
656          Itr() {
525            nextNode = traversalHead();
657              advance();
658          }
659 <        
660 <        E advance() {
661 <            prevNode = currentNode;
662 <            currentNode = nextNode;
663 <            E x = nextItem;
664 <            
665 <            QNode p = nextNode.next;
659 >
660 >        /**
661 >         * Moves to next valid node and returns item to return for
662 >         * next(), or null if no such.
663 >         */
664 >        private E advance() {
665 >            pcurr = pnext;
666 >            curr = next;
667 >            E item = nextItem;
668 >
669              for (;;) {
670 <                if (p == null || !p.isData) {
671 <                    nextNode = null;
672 <                    nextItem = null;
673 <                    return x;
674 <                }
675 <                Object item = p.get();
676 <                if (item != p && item != null) {
677 <                    nextNode = p;
678 <                    nextItem = (E)item;
679 <                    return x;
680 <                }
681 <                prevNode = p;
682 <                p = p.next;
670 >                pnext = next == null ? traversalHead() : next;
671 >                next = pnext.next;
672 >                if (next == pnext) {
673 >                    next = null;
674 >                    continue;  // restart
675 >                }
676 >                if (next == null)
677 >                    break;
678 >                Object x = next.get();
679 >                if (x != null && x != next) {
680 >                    nextItem = (E) x;
681 >                    break;
682 >                }
683              }
684 +            return item;
685          }
686 <        
686 >
687          public boolean hasNext() {
688 <            return nextNode != null;
688 >            return next != null;
689          }
690 <        
690 >
691          public E next() {
692 <            if (nextNode == null) throw new NoSuchElementException();
692 >            if (next == null)
693 >                throw new NoSuchElementException();
694              return advance();
695          }
696 <        
696 >
697          public void remove() {
698 <            QNode p = currentNode;
699 <            QNode prev = prevNode;
564 <            if (prev == null || p == null)
698 >            Node<E> p = curr;
699 >            if (p == null)
700                  throw new IllegalStateException();
701              Object x = p.get();
702              if (x != null && x != p && p.compareAndSet(x, p))
703 <                clean(prev, p);
703 >                clean(pcurr, p);
704          }
705      }
706  
707      public E peek() {
708          for (;;) {
709 <            QNode h = traversalHead();
710 <            QNode p = h.next;
709 >            Node<E> h = traversalHead();
710 >            Node<E> p = h.next;
711              if (p == null)
712                  return null;
713              Object x = p.get();
# Line 580 | Line 715 | public class LinkedTransferQueue<E> exte
715                  if (!p.isData)
716                      return null;
717                  if (x != null)
718 <                    return (E)x;
718 >                    return (E) x;
719 >            }
720 >        }
721 >    }
722 >
723 >    public boolean isEmpty() {
724 >        for (;;) {
725 >            Node<E> h = traversalHead();
726 >            Node<E> p = h.next;
727 >            if (p == null)
728 >                return true;
729 >            Object x = p.get();
730 >            if (p != x) {
731 >                if (!p.isData)
732 >                    return true;
733 >                if (x != null)
734 >                    return false;
735              }
736          }
737      }
738  
739      public boolean hasWaitingConsumer() {
740          for (;;) {
741 <            QNode h = traversalHead();
742 <            QNode p = h.next;
741 >            Node<E> h = traversalHead();
742 >            Node<E> p = h.next;
743              if (p == null)
744                  return false;
745              Object x = p.get();
746 <            if (p != x)
746 >            if (p != x)
747                  return !p.isData;
748          }
749      }
750 <    
750 >
751      /**
752       * Returns the number of elements in this queue.  If this queue
753 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
754 <     * <tt>Integer.MAX_VALUE</tt>.
753 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
754 >     * {@code Integer.MAX_VALUE}.
755       *
756       * <p>Beware that, unlike in most collections, this method is
757       * <em>NOT</em> a constant-time operation. Because of the
# Line 610 | Line 761 | public class LinkedTransferQueue<E> exte
761       * @return the number of elements in this queue
762       */
763      public int size() {
764 <        int count = 0;
765 <        QNode h = traversalHead();
766 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
767 <            Object x = p.get();
768 <            if (x != null && x != p) {
769 <                if (++count == Integer.MAX_VALUE) // saturated
764 >        for (;;) {
765 >            int count = 0;
766 >            Node<E> pred = traversalHead();
767 >            for (;;) {
768 >                Node<E> q = pred.next;
769 >                if (q == pred) // restart
770                      break;
771 +                if (q == null || !q.isData)
772 +                    return count;
773 +                Object x = q.get();
774 +                if (x != null && x != q) {
775 +                    if (++count == Integer.MAX_VALUE) // saturated
776 +                        return count;
777 +                }
778 +                pred = q;
779              }
780          }
622        return count;
781      }
782  
783      public int getWaitingConsumerCount() {
784 <        int count = 0;
785 <        QNode h = traversalHead();
786 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
787 <            if (p.get() == null) {
788 <                if (++count == Integer.MAX_VALUE)
784 >        // converse of size -- count valid non-data nodes
785 >        for (;;) {
786 >            int count = 0;
787 >            Node<E> pred = traversalHead();
788 >            for (;;) {
789 >                Node<E> q = pred.next;
790 >                if (q == pred) // restart
791                      break;
792 +                if (q == null || q.isData)
793 +                    return count;
794 +                Object x = q.get();
795 +                if (x == null) {
796 +                    if (++count == Integer.MAX_VALUE) // saturated
797 +                        return count;
798 +                }
799 +                pred = q;
800 +            }
801 +        }
802 +    }
803 +
804 +    public boolean remove(Object o) {
805 +        if (o == null)
806 +            return false;
807 +        for (;;) {
808 +            Node<E> pred = traversalHead();
809 +            for (;;) {
810 +                Node<E> q = pred.next;
811 +                if (q == pred) // restart
812 +                    break;
813 +                if (q == null || !q.isData)
814 +                    return false;
815 +                Object x = q.get();
816 +                if (x != null && x != q && o.equals(x) &&
817 +                    q.compareAndSet(x, q)) {
818 +                    clean(pred, q);
819 +                    return true;
820 +                }
821 +                pred = q;
822              }
823          }
634        return count;
824      }
825  
826      public int remainingCapacity() {
# Line 641 | Line 830 | public class LinkedTransferQueue<E> exte
830      /**
831       * Save the state to a stream (that is, serialize it).
832       *
833 <     * @serialData All of the elements (each an <tt>E</tt>) in
833 >     * @serialData All of the elements (each an {@code E}) in
834       * the proper order, followed by a null
835       * @param s the stream
836       */
837      private void writeObject(java.io.ObjectOutputStream s)
838          throws java.io.IOException {
839          s.defaultWriteObject();
840 <        for (Iterator<E> it = iterator(); it.hasNext(); )
841 <            s.writeObject(it.next());
840 >        for (E e : this)
841 >            s.writeObject(e);
842          // Use trailing null as sentinel
843          s.writeObject(null);
844      }
# Line 657 | Line 846 | public class LinkedTransferQueue<E> exte
846      /**
847       * Reconstitute the Queue instance from a stream (that is,
848       * deserialize it).
849 +     *
850       * @param s the stream
851       */
852      private void readObject(java.io.ObjectInputStream s)
853          throws java.io.IOException, ClassNotFoundException {
854          s.defaultReadObject();
855 +        resetHeadAndTail();
856          for (;;) {
857 <            E item = (E)s.readObject();
857 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
858              if (item == null)
859                  break;
860              else
861                  offer(item);
862          }
863      }
864 +
865 +    // Support for resetting head/tail while deserializing
866 +    private void resetHeadAndTail() {
867 +        Node<E> dummy = new Node<E>(null, false);
868 +        UNSAFE.putObjectVolatile(this, headOffset,
869 +                                 new PaddedAtomicReference<Node<E>>(dummy));
870 +        UNSAFE.putObjectVolatile(this, tailOffset,
871 +                                 new PaddedAtomicReference<Node<E>>(dummy));
872 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
873 +                                 new PaddedAtomicReference<Node<E>>(null));
874 +    }
875 +
876 +    // Unsafe mechanics
877 +
878 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
879 +    private static final long headOffset =
880 +        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
881 +    private static final long tailOffset =
882 +        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
883 +    private static final long cleanMeOffset =
884 +        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
885 +
886 +
887 +    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
888 +                                  String field, Class<?> klazz) {
889 +        try {
890 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
891 +        } catch (NoSuchFieldException e) {
892 +            // Convert Exception to corresponding Error
893 +            NoSuchFieldError error = new NoSuchFieldError(field);
894 +            error.initCause(e);
895 +            throw error;
896 +        }
897 +    }
898 +
899 +    /**
900 +     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
901 +     * Replace with a simple call to Unsafe.getUnsafe when integrating
902 +     * into a jdk.
903 +     *
904 +     * @return a sun.misc.Unsafe
905 +     */
906 +    private static sun.misc.Unsafe getUnsafe() {
907 +        try {
908 +            return sun.misc.Unsafe.getUnsafe();
909 +        } catch (SecurityException se) {
910 +            try {
911 +                return java.security.AccessController.doPrivileged
912 +                    (new java.security
913 +                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
914 +                        public sun.misc.Unsafe run() throws Exception {
915 +                            java.lang.reflect.Field f = sun.misc
916 +                                .Unsafe.class.getDeclaredField("theUnsafe");
917 +                            f.setAccessible(true);
918 +                            return (sun.misc.Unsafe) f.get(null);
919 +                        }});
920 +            } catch (java.security.PrivilegedActionException e) {
921 +                throw new RuntimeException("Could not initialize intrinsics",
922 +                                           e.getCause());
923 +            }
924 +        }
925 +    }
926   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines