ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.7 by dl, Wed Sep 24 10:48:43 2008 UTC vs.
Revision 1.36 by jsr166, Fri Jul 31 07:30:29 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
14 < import sun.misc.Unsafe;
15 < import java.lang.reflect.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.ConcurrentModificationException;
14 > import java.util.Iterator;
15 > import java.util.NoSuchElementException;
16 > import java.util.Queue;
17 > import java.util.concurrent.locks.LockSupport;
18 > import java.util.concurrent.atomic.AtomicReference;
19  
20   /**
21   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 21 | Line 25 | import java.lang.reflect.*;
25   * producer.  The <em>tail</em> of the queue is that element that has
26   * been on the queue the shortest time for some producer.
27   *
28 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
28 > * <p>Beware that, unlike in most collections, the {@code size}
29   * method is <em>NOT</em> a constant-time operation. Because of the
30   * asynchronous nature of these queues, determining the current number
31   * of elements requires a traversal of the elements.
# Line 44 | Line 48 | import java.lang.reflect.*;
48   * @since 1.7
49   * @author Doug Lea
50   * @param <E> the type of elements held in this collection
47 *
51   */
52   public class LinkedTransferQueue<E> extends AbstractQueue<E>
53      implements TransferQueue<E>, java.io.Serializable {
54      private static final long serialVersionUID = -3223113410248163686L;
55  
56      /*
54     * This is still a work in progress...
55     *
57       * This class extends the approach used in FIFO-mode
58       * SynchronousQueues. See the internal documentation, as well as
59       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
60       * Lea & Scott
61       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
62       *
63 <     * The main extension is to provide different Wait modes
64 <     * for the main "xfer" method that puts or takes items.
65 <     * These don't impact the basic dual-queue logic, but instead
66 <     * control whether or how threads block upon insertion
67 <     * of request or data nodes into the dual queue.
63 >     * The main extension is to provide different Wait modes for the
64 >     * main "xfer" method that puts or takes items.  These don't
65 >     * impact the basic dual-queue logic, but instead control whether
66 >     * or how threads block upon insertion of request or data nodes
67 >     * into the dual queue. It also uses slightly different
68 >     * conventions for tracking whether nodes are off-list or
69 >     * cancelled.
70       */
71  
72      // Wait modes for xfer method
# Line 81 | Line 84 | public class LinkedTransferQueue<E> exte
84       * seems not to vary with number of CPUs (beyond 2) so is just
85       * a constant.
86       */
87 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
87 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
88  
89      /**
90       * The number of times to spin before blocking in untimed waits.
# Line 97 | Line 100 | public class LinkedTransferQueue<E> exte
100      static final long spinForTimeoutThreshold = 1000L;
101  
102      /**
103 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
104 <     * AtomicReference to represent item. Uses Object, not E, to allow
105 <     * setting item to "this" after use, to avoid garbage
106 <     * retention. Similarly, setting the next field to this is used as
107 <     * sentinel that node is off list.
103 >     * Node class for LinkedTransferQueue. Opportunistically
104 >     * subclasses from AtomicReference to represent item. Uses Object,
105 >     * not E, to allow setting item to "this" after use, to avoid
106 >     * garbage retention. Similarly, setting the next field to this is
107 >     * used as sentinel that node is off list.
108       */
109 <    static final class QNode extends AtomicReference<Object> {
110 <        volatile QNode next;
109 >    static final class Node<E> extends AtomicReference<Object> {
110 >        volatile Node<E> next;
111          volatile Thread waiter;       // to control park/unpark
112          final boolean isData;
113 <        QNode(Object item, boolean isData) {
113 >
114 >        Node(E item, boolean isData) {
115              super(item);
116              this.isData = isData;
117          }
118  
119 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
116 <            nextUpdater = AtomicReferenceFieldUpdater.newUpdater
117 <            (QNode.class, QNode.class, "next");
119 >        // Unsafe mechanics
120  
121 <        boolean casNext(QNode cmp, QNode val) {
122 <            return nextUpdater.compareAndSet(this, cmp, val);
121 >        private static final sun.misc.Unsafe UNSAFE = getUnsafe();
122 >        private static final long nextOffset =
123 >            objectFieldOffset(UNSAFE, "next", Node.class);
124 >
125 >        final boolean casNext(Node<E> cmp, Node<E> val) {
126 >            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
127 >        }
128 >
129 >        final void clearNext() {
130 >            UNSAFE.putOrderedObject(this, nextOffset, this);
131 >        }
132 >
133 >        /**
134 >         * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
135 >         * Replace with a simple call to Unsafe.getUnsafe when integrating
136 >         * into a jdk.
137 >         *
138 >         * @return a sun.misc.Unsafe
139 >         */
140 >        private static sun.misc.Unsafe getUnsafe() {
141 >            try {
142 >                return sun.misc.Unsafe.getUnsafe();
143 >            } catch (SecurityException se) {
144 >                try {
145 >                    return java.security.AccessController.doPrivileged
146 >                        (new java.security
147 >                         .PrivilegedExceptionAction<sun.misc.Unsafe>() {
148 >                            public sun.misc.Unsafe run() throws Exception {
149 >                                java.lang.reflect.Field f = sun.misc
150 >                                    .Unsafe.class.getDeclaredField("theUnsafe");
151 >                                f.setAccessible(true);
152 >                                return (sun.misc.Unsafe) f.get(null);
153 >                            }});
154 >                } catch (java.security.PrivilegedActionException e) {
155 >                    throw new RuntimeException("Could not initialize intrinsics",
156 >                                               e.getCause());
157 >                }
158 >            }
159          }
160 +
161 +        private static final long serialVersionUID = -3375979862319811754L;
162      }
163  
164      /**
# Line 130 | Line 170 | public class LinkedTransferQueue<E> exte
170          // enough padding for 64bytes with 4byte refs
171          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
172          PaddedAtomicReference(T r) { super(r); }
173 +        private static final long serialVersionUID = 8170090609809740854L;
174      }
175  
176  
177      /** head of the queue */
178 <    private transient final PaddedAtomicReference<QNode> head;
178 >    private transient final PaddedAtomicReference<Node<E>> head;
179 >
180      /** tail of the queue */
181 <    private transient final PaddedAtomicReference<QNode> tail;
181 >    private transient final PaddedAtomicReference<Node<E>> tail;
182  
183      /**
184       * Reference to a cancelled node that might not yet have been
185       * unlinked from queue because it was the last inserted node
186       * when it cancelled.
187       */
188 <    private transient final PaddedAtomicReference<QNode> cleanMe;
188 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
189  
190      /**
191       * Tries to cas nh as new head; if successful, unlink
192       * old head's next node to avoid garbage retention.
193       */
194 <    private boolean advanceHead(QNode h, QNode nh) {
194 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
195          if (h == head.get() && head.compareAndSet(h, nh)) {
196 <            h.next = h; // forget old next
196 >            h.clearNext(); // forget old next
197              return true;
198          }
199          return false;
# Line 159 | Line 201 | public class LinkedTransferQueue<E> exte
201  
202      /**
203       * Puts or takes an item. Used for most queue operations (except
204 <     * poll() and tryTransfer())
204 >     * poll() and tryTransfer()). See the similar code in
205 >     * SynchronousQueue for detailed explanation.
206 >     *
207       * @param e the item or if null, signifies that this is a take
208       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
209       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
210       * @return an item, or null on failure
211       */
212 <    private Object xfer(Object e, int mode, long nanos) {
212 >    private E xfer(E e, int mode, long nanos) {
213          boolean isData = (e != null);
214 <        QNode s = null;
215 <        final PaddedAtomicReference<QNode> head = this.head;
216 <        final PaddedAtomicReference<QNode> tail = this.tail;
214 >        Node<E> s = null;
215 >        final PaddedAtomicReference<Node<E>> head = this.head;
216 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
217  
218          for (;;) {
219 <            QNode t = tail.get();
220 <            QNode h = head.get();
219 >            Node<E> t = tail.get();
220 >            Node<E> h = head.get();
221  
222              if (t != null && (t == h || t.isData == isData)) {
223                  if (s == null)
224 <                    s = new QNode(e, isData);
225 <                QNode last = t.next;
224 >                    s = new Node<E>(e, isData);
225 >                Node<E> last = t.next;
226                  if (last != null) {
227                      if (t == tail.get())
228                          tail.compareAndSet(t, last);
# Line 190 | Line 234 | public class LinkedTransferQueue<E> exte
234              }
235  
236              else if (h != null) {
237 <                QNode first = h.next;
237 >                Node<E> first = h.next;
238                  if (t == tail.get() && first != null &&
239                      advanceHead(h, first)) {
240                      Object x = first.get();
241                      if (x != first && first.compareAndSet(x, e)) {
242                          LockSupport.unpark(first.waiter);
243 <                        return isData? e : x;
243 >                        return isData ? e : (E) x;
244                      }
245                  }
246              }
# Line 206 | Line 250 | public class LinkedTransferQueue<E> exte
250  
251      /**
252       * Version of xfer for poll() and tryTransfer, which
253 <     * simplifies control paths both here and in xfer
253 >     * simplifies control paths both here and in xfer.
254       */
255 <    private Object fulfill(Object e) {
255 >    private E fulfill(E e) {
256          boolean isData = (e != null);
257 <        final PaddedAtomicReference<QNode> head = this.head;
258 <        final PaddedAtomicReference<QNode> tail = this.tail;
257 >        final PaddedAtomicReference<Node<E>> head = this.head;
258 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
259  
260          for (;;) {
261 <            QNode t = tail.get();
262 <            QNode h = head.get();
261 >            Node<E> t = tail.get();
262 >            Node<E> h = head.get();
263  
264              if (t != null && (t == h || t.isData == isData)) {
265 <                QNode last = t.next;
265 >                Node<E> last = t.next;
266                  if (t == tail.get()) {
267                      if (last != null)
268                          tail.compareAndSet(t, last);
# Line 227 | Line 271 | public class LinkedTransferQueue<E> exte
271                  }
272              }
273              else if (h != null) {
274 <                QNode first = h.next;
274 >                Node<E> first = h.next;
275                  if (t == tail.get() &&
276                      first != null &&
277                      advanceHead(h, first)) {
278                      Object x = first.get();
279                      if (x != first && first.compareAndSet(x, e)) {
280                          LockSupport.unpark(first.waiter);
281 <                        return isData? e : x;
281 >                        return isData ? e : (E) x;
282                      }
283                  }
284              }
# Line 252 | Line 296 | public class LinkedTransferQueue<E> exte
296       * @param nanos timeout value
297       * @return matched item, or s if cancelled
298       */
299 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
300 <                                int mode, long nanos) {
299 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
300 >                           int mode, long nanos) {
301          if (mode == NOWAIT)
302              return null;
303  
304 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
304 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
305          Thread w = Thread.currentThread();
306          int spins = -1; // set to desired spin count below
307          for (;;) {
# Line 266 | Line 310 | public class LinkedTransferQueue<E> exte
310              Object x = s.get();
311              if (x != e) {                 // Node was matched or cancelled
312                  advanceHead(pred, s);     // unlink if head
313 <                if (x == s)               // was cancelled
314 <                    return clean(pred, s);
313 >                if (x == s) {             // was cancelled
314 >                    clean(pred, s);
315 >                    return null;
316 >                }
317                  else if (x != null) {
318                      s.set(s);             // avoid garbage retention
319 <                    return x;
319 >                    return (E) x;
320                  }
321                  else
322                      return e;
323              }
278
324              if (mode == TIMEOUT) {
325                  long now = System.nanoTime();
326                  nanos -= now - lastTime;
# Line 286 | Line 331 | public class LinkedTransferQueue<E> exte
331                  }
332              }
333              if (spins < 0) {
334 <                QNode h = head.get(); // only spin if at head
334 >                Node<E> h = head.get(); // only spin if at head
335                  spins = ((h != null && h.next == s) ?
336 <                         (mode == TIMEOUT?
336 >                         ((mode == TIMEOUT) ?
337                            maxTimedSpins : maxUntimedSpins) : 0);
338              }
339              if (spins > 0)
# Line 296 | Line 341 | public class LinkedTransferQueue<E> exte
341              else if (s.waiter == null)
342                  s.waiter = w;
343              else if (mode != TIMEOUT) {
344 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
344 >                LockSupport.park(this);
345                  s.waiter = null;
346                  spins = -1;
347              }
348              else if (nanos > spinForTimeoutThreshold) {
349 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
349 >                LockSupport.parkNanos(this, nanos);
350                  s.waiter = null;
351                  spins = -1;
352              }
# Line 311 | Line 354 | public class LinkedTransferQueue<E> exte
354      }
355  
356      /**
357 +     * Returns validated tail for use in cleaning methods.
358 +     */
359 +    private Node<E> getValidatedTail() {
360 +        for (;;) {
361 +            Node<E> h = head.get();
362 +            Node<E> first = h.next;
363 +            if (first != null && first.next == first) { // help advance
364 +                advanceHead(h, first);
365 +                continue;
366 +            }
367 +            Node<E> t = tail.get();
368 +            Node<E> last = t.next;
369 +            if (t == tail.get()) {
370 +                if (last != null)
371 +                    tail.compareAndSet(t, last); // help advance
372 +                else
373 +                    return t;
374 +            }
375 +        }
376 +    }
377 +
378 +    /**
379       * Gets rid of cancelled node s with original predecessor pred.
380 <     * @return null (to simplify use by callers)
380 >     *
381 >     * @param pred predecessor of cancelled node
382 >     * @param s the cancelled node
383       */
384 <    private Object clean(QNode pred, QNode s) {
384 >    private void clean(Node<E> pred, Node<E> s) {
385          Thread w = s.waiter;
386          if (w != null) {             // Wake up thread
387              s.waiter = null;
# Line 322 | Line 389 | public class LinkedTransferQueue<E> exte
389                  LockSupport.unpark(w);
390          }
391  
392 <        for (;;) {
393 <            if (pred.next != s) // already cleaned
394 <                return null;
395 <            QNode h = head.get();
396 <            QNode hn = h.next;   // Absorb cancelled first node as head
397 <            if (hn != null && hn.next == hn) {
398 <                advanceHead(h, hn);
399 <                continue;
400 <            }
401 <            QNode t = tail.get();      // Ensure consistent read for tail
402 <            if (t == h)
403 <                return null;
404 <            QNode tn = t.next;
405 <            if (t != tail.get())
406 <                continue;
407 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
392 >        if (pred == null)
393 >            return;
394 >
395 >        /*
396 >         * At any given time, exactly one node on list cannot be
397 >         * deleted -- the last inserted node. To accommodate this, if
398 >         * we cannot delete s, we save its predecessor as "cleanMe",
399 >         * processing the previously saved version first. At least one
400 >         * of node s or the node previously saved can always be
401 >         * processed, so this always terminates.
402 >         */
403 >        while (pred.next == s) {
404 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
405 >            Node<E> t = getValidatedTail();
406 >            if (s != t) {               // If not tail, try to unsplice
407 >                Node<E> sn = s.next;      // s.next == s means s already off list
408                  if (sn == s || pred.casNext(s, sn))
409 <                    return null;
409 >                    break;
410              }
411 <            QNode dp = cleanMe.get();
412 <            if (dp != null) {    // Try unlinking previous cancelled node
413 <                QNode d = dp.next;
414 <                QNode dn;
415 <                if (d == null ||               // d is gone or
416 <                    d == dp ||                 // d is off list or
417 <                    d.get() != d ||            // d not cancelled or
418 <                    (d != t &&                 // d not tail and
419 <                     (dn = d.next) != null &&  //   has successor
420 <                     dn != d &&                //   that is on list
421 <                     dp.casNext(d, dn)))       // d unspliced
422 <                    cleanMe.compareAndSet(dp, null);
423 <                if (dp == pred)
424 <                    return null;      // s is already saved node
411 >            else if (oldpred == pred || // Already saved
412 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
413 >                break;                  // Postpone cleaning
414 >        }
415 >    }
416 >
417 >    /**
418 >     * Tries to unsplice the cancelled node held in cleanMe that was
419 >     * previously uncleanable because it was at tail.
420 >     *
421 >     * @return current cleanMe node (or null)
422 >     */
423 >    private Node<E> reclean() {
424 >        /*
425 >         * cleanMe is, or at one time was, predecessor of cancelled
426 >         * node s that was the tail so could not be unspliced.  If s
427 >         * is no longer the tail, try to unsplice if necessary and
428 >         * make cleanMe slot available.  This differs from similar
429 >         * code in clean() because we must check that pred still
430 >         * points to a cancelled node that must be unspliced -- if
431 >         * not, we can (must) clear cleanMe without unsplicing.
432 >         * This can loop only due to contention on casNext or
433 >         * clearing cleanMe.
434 >         */
435 >        Node<E> pred;
436 >        while ((pred = cleanMe.get()) != null) {
437 >            Node<E> t = getValidatedTail();
438 >            Node<E> s = pred.next;
439 >            if (s != t) {
440 >                Node<E> sn;
441 >                if (s == null || s == pred || s.get() != s ||
442 >                    (sn = s.next) == s || pred.casNext(s, sn))
443 >                    cleanMe.compareAndSet(pred, null);
444              }
445 <            else if (cleanMe.compareAndSet(null, pred))
446 <                return null;          // Postpone cleaning s
445 >            else // s is still tail; cannot clean
446 >                break;
447          }
448 +        return pred;
449      }
450  
451      /**
452 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
452 >     * Creates an initially empty {@code LinkedTransferQueue}.
453       */
454      public LinkedTransferQueue() {
455 <        QNode dummy = new QNode(null, false);
456 <        head = new PaddedAtomicReference<QNode>(dummy);
457 <        tail = new PaddedAtomicReference<QNode>(dummy);
458 <        cleanMe = new PaddedAtomicReference<QNode>(null);
455 >        Node<E> dummy = new Node<E>(null, false);
456 >        head = new PaddedAtomicReference<Node<E>>(dummy);
457 >        tail = new PaddedAtomicReference<Node<E>>(dummy);
458 >        cleanMe = new PaddedAtomicReference<Node<E>>(null);
459      }
460  
461      /**
462 <     * Creates a <tt>LinkedTransferQueue</tt>
462 >     * Creates a {@code LinkedTransferQueue}
463       * initially containing the elements of the given collection,
464       * added in traversal order of the collection's iterator.
465 +     *
466       * @param c the collection of elements to initially contain
467       * @throws NullPointerException if the specified collection or any
468       *         of its elements are null
# Line 389 | Line 472 | public class LinkedTransferQueue<E> exte
472          addAll(c);
473      }
474  
475 <    public void put(E e) throws InterruptedException {
476 <        if (e == null) throw new NullPointerException();
477 <        if (Thread.interrupted()) throw new InterruptedException();
478 <        xfer(e, NOWAIT, 0);
475 >    /**
476 >     * Inserts the specified element at the tail of this queue.
477 >     * As the queue is unbounded, this method will never block.
478 >     *
479 >     * @throws NullPointerException if the specified element is null
480 >     */
481 >    public void put(E e) {
482 >        offer(e);
483      }
484  
485 <    public boolean offer(E e, long timeout, TimeUnit unit)
486 <        throws InterruptedException {
487 <        if (e == null) throw new NullPointerException();
488 <        if (Thread.interrupted()) throw new InterruptedException();
489 <        xfer(e, NOWAIT, 0);
490 <        return true;
485 >    /**
486 >     * Inserts the specified element at the tail of this queue.
487 >     * As the queue is unbounded, this method will never block or
488 >     * return {@code false}.
489 >     *
490 >     * @return {@code true} (as specified by
491 >     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
492 >     * @throws NullPointerException if the specified element is null
493 >     */
494 >    public boolean offer(E e, long timeout, TimeUnit unit) {
495 >        return offer(e);
496      }
497  
498 +    /**
499 +     * Inserts the specified element at the tail of this queue.
500 +     * As the queue is unbounded, this method will never return {@code false}.
501 +     *
502 +     * @return {@code true} (as specified by
503 +     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
504 +     * @throws NullPointerException if the specified element is null
505 +     */
506      public boolean offer(E e) {
507          if (e == null) throw new NullPointerException();
508          xfer(e, NOWAIT, 0);
509          return true;
510      }
511  
512 +    /**
513 +     * Inserts the specified element at the tail of this queue.
514 +     * As the queue is unbounded this method will never throw
515 +     * {@link IllegalStateException} or return {@code false}.
516 +     *
517 +     * @return {@code true} (as specified by {@link Collection#add})
518 +     * @throws NullPointerException if the specified element is null
519 +     */
520 +    public boolean add(E e) {
521 +        return offer(e);
522 +    }
523 +
524 +    /**
525 +     * Transfers the specified element immediately if there exists a
526 +     * consumer already waiting to receive it (in {@link #take} or
527 +     * timed {@link #poll(long,TimeUnit) poll}), otherwise
528 +     * returning {@code false} without enqueuing the element.
529 +     *
530 +     * @throws NullPointerException if the specified element is null
531 +     */
532 +    public boolean tryTransfer(E e) {
533 +        if (e == null) throw new NullPointerException();
534 +        return fulfill(e) != null;
535 +    }
536 +
537 +    /**
538 +     * Inserts the specified element at the tail of this queue,
539 +     * waiting if necessary for the element to be received by a
540 +     * consumer invoking {@code take} or {@code poll}.
541 +     *
542 +     * @throws NullPointerException if the specified element is null
543 +     */
544      public void transfer(E e) throws InterruptedException {
545          if (e == null) throw new NullPointerException();
546          if (xfer(e, WAIT, 0) == null) {
# Line 417 | Line 549 | public class LinkedTransferQueue<E> exte
549          }
550      }
551  
552 +    /**
553 +     * Inserts the specified element at the tail of this queue,
554 +     * waiting up to the specified wait time for the element to be
555 +     * received by a consumer invoking {@code take} or {@code poll}.
556 +     *
557 +     * @throws NullPointerException if the specified element is null
558 +     */
559      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
560          throws InterruptedException {
561          if (e == null) throw new NullPointerException();
# Line 427 | Line 566 | public class LinkedTransferQueue<E> exte
566          throw new InterruptedException();
567      }
568  
430    public boolean tryTransfer(E e) {
431        if (e == null) throw new NullPointerException();
432        return fulfill(e) != null;
433    }
434
569      public E take() throws InterruptedException {
570 <        Object e = xfer(null, WAIT, 0);
570 >        E e = xfer(null, WAIT, 0);
571          if (e != null)
572 <            return (E)e;
572 >            return e;
573          Thread.interrupted();
574          throw new InterruptedException();
575      }
576  
577      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
578 <        Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
578 >        E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
579          if (e != null || !Thread.interrupted())
580 <            return (E)e;
580 >            return e;
581          throw new InterruptedException();
582      }
583  
584      public E poll() {
585 <        return (E)fulfill(null);
585 >        return fulfill(null);
586      }
587  
588 +    /**
589 +     * @throws NullPointerException     {@inheritDoc}
590 +     * @throws IllegalArgumentException {@inheritDoc}
591 +     */
592      public int drainTo(Collection<? super E> c) {
593          if (c == null)
594              throw new NullPointerException();
# Line 465 | Line 603 | public class LinkedTransferQueue<E> exte
603          return n;
604      }
605  
606 +    /**
607 +     * @throws NullPointerException     {@inheritDoc}
608 +     * @throws IllegalArgumentException {@inheritDoc}
609 +     */
610      public int drainTo(Collection<? super E> c, int maxElements) {
611          if (c == null)
612              throw new NullPointerException();
# Line 482 | Line 624 | public class LinkedTransferQueue<E> exte
624      // Traversal-based methods
625  
626      /**
627 <     * Return head after performing any outstanding helping steps
627 >     * Returns head after performing any outstanding helping steps.
628       */
629 <    private QNode traversalHead() {
629 >    private Node<E> traversalHead() {
630          for (;;) {
631 <            QNode t = tail.get();
632 <            QNode h = head.get();
631 >            Node<E> t = tail.get();
632 >            Node<E> h = head.get();
633              if (h != null && t != null) {
634 <                QNode last = t.next;
635 <                QNode first = h.next;
634 >                Node<E> last = t.next;
635 >                Node<E> first = h.next;
636                  if (t == tail.get()) {
637                      if (last != null)
638                          tail.compareAndSet(t, last);
# Line 505 | Line 647 | public class LinkedTransferQueue<E> exte
647                          return h;
648                  }
649              }
650 +            reclean();
651          }
652      }
653  
654 <
654 >    /**
655 >     * Returns an iterator over the elements in this queue in proper
656 >     * sequence, from head to tail.
657 >     *
658 >     * <p>The returned iterator is a "weakly consistent" iterator that
659 >     * will never throw
660 >     * {@link ConcurrentModificationException ConcurrentModificationException},
661 >     * and guarantees to traverse elements as they existed upon
662 >     * construction of the iterator, and may (but is not guaranteed
663 >     * to) reflect any modifications subsequent to construction.
664 >     *
665 >     * @return an iterator over the elements in this queue in proper sequence
666 >     */
667      public Iterator<E> iterator() {
668          return new Itr();
669      }
# Line 521 | Line 676 | public class LinkedTransferQueue<E> exte
676       * if subsequently removed.
677       */
678      class Itr implements Iterator<E> {
679 <        QNode nextNode;    // Next node to return next
680 <        QNode currentNode; // last returned node, for remove()
681 <        QNode prevNode;    // predecessor of last returned node
682 <        E nextItem;        // Cache of next item, once commited to in next
679 >        Node<E> next;        // node to return next
680 >        Node<E> pnext;       // predecessor of next
681 >        Node<E> curr;        // last returned node, for remove()
682 >        Node<E> pcurr;       // predecessor of curr, for remove()
683 >        E nextItem;          // Cache of next item, once committed to in next
684  
685          Itr() {
530            nextNode = traversalHead();
686              advance();
687          }
688  
689 <        E advance() {
690 <            prevNode = currentNode;
691 <            currentNode = nextNode;
692 <            E x = nextItem;
689 >        /**
690 >         * Moves to next valid node and returns item to return for
691 >         * next(), or null if no such.
692 >         */
693 >        private E advance() {
694 >            pcurr = pnext;
695 >            curr = next;
696 >            E item = nextItem;
697  
539            QNode p = nextNode.next;
698              for (;;) {
699 <                if (p == null || !p.isData) {
700 <                    nextNode = null;
701 <                    nextItem = null;
702 <                    return x;
703 <                }
704 <                Object item = p.get();
705 <                if (item != p && item != null) {
706 <                    nextNode = p;
707 <                    nextItem = (E)item;
708 <                    return x;
699 >                pnext = (next == null) ? traversalHead() : next;
700 >                next = pnext.next;
701 >                if (next == pnext) {
702 >                    next = null;
703 >                    continue;  // restart
704 >                }
705 >                if (next == null)
706 >                    break;
707 >                Object x = next.get();
708 >                if (x != null && x != next) {
709 >                    nextItem = (E) x;
710 >                    break;
711                  }
552                prevNode = p;
553                p = p.next;
712              }
713 +            return item;
714          }
715  
716          public boolean hasNext() {
717 <            return nextNode != null;
717 >            return next != null;
718          }
719  
720          public E next() {
721 <            if (nextNode == null) throw new NoSuchElementException();
721 >            if (next == null)
722 >                throw new NoSuchElementException();
723              return advance();
724          }
725  
726          public void remove() {
727 <            QNode p = currentNode;
728 <            QNode prev = prevNode;
569 <            if (prev == null || p == null)
727 >            Node<E> p = curr;
728 >            if (p == null)
729                  throw new IllegalStateException();
730              Object x = p.get();
731              if (x != null && x != p && p.compareAndSet(x, p))
732 <                clean(prev, p);
732 >                clean(pcurr, p);
733          }
734      }
735  
736      public E peek() {
737          for (;;) {
738 <            QNode h = traversalHead();
739 <            QNode p = h.next;
738 >            Node<E> h = traversalHead();
739 >            Node<E> p = h.next;
740              if (p == null)
741                  return null;
742              Object x = p.get();
# Line 585 | Line 744 | public class LinkedTransferQueue<E> exte
744                  if (!p.isData)
745                      return null;
746                  if (x != null)
747 <                    return (E)x;
747 >                    return (E) x;
748              }
749          }
750      }
751  
752      public boolean isEmpty() {
753          for (;;) {
754 <            QNode h = traversalHead();
755 <            QNode p = h.next;
754 >            Node<E> h = traversalHead();
755 >            Node<E> p = h.next;
756              if (p == null)
757                  return true;
758              Object x = p.get();
# Line 608 | Line 767 | public class LinkedTransferQueue<E> exte
767  
768      public boolean hasWaitingConsumer() {
769          for (;;) {
770 <            QNode h = traversalHead();
771 <            QNode p = h.next;
770 >            Node<E> h = traversalHead();
771 >            Node<E> p = h.next;
772              if (p == null)
773                  return false;
774              Object x = p.get();
# Line 620 | Line 779 | public class LinkedTransferQueue<E> exte
779  
780      /**
781       * Returns the number of elements in this queue.  If this queue
782 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
783 <     * <tt>Integer.MAX_VALUE</tt>.
782 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
783 >     * {@code Integer.MAX_VALUE}.
784       *
785       * <p>Beware that, unlike in most collections, this method is
786       * <em>NOT</em> a constant-time operation. Because of the
# Line 631 | Line 790 | public class LinkedTransferQueue<E> exte
790       * @return the number of elements in this queue
791       */
792      public int size() {
793 <        int count = 0;
794 <        QNode h = traversalHead();
795 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
796 <            Object x = p.get();
797 <            if (x != null && x != p) {
798 <                if (++count == Integer.MAX_VALUE) // saturated
793 >        for (;;) {
794 >            int count = 0;
795 >            Node<E> pred = traversalHead();
796 >            for (;;) {
797 >                Node<E> q = pred.next;
798 >                if (q == pred) // restart
799                      break;
800 +                if (q == null || !q.isData)
801 +                    return count;
802 +                Object x = q.get();
803 +                if (x != null && x != q) {
804 +                    if (++count == Integer.MAX_VALUE) // saturated
805 +                        return count;
806 +                }
807 +                pred = q;
808              }
809          }
643        return count;
810      }
811  
812      public int getWaitingConsumerCount() {
813 <        int count = 0;
814 <        QNode h = traversalHead();
815 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
816 <            if (p.get() == null) {
817 <                if (++count == Integer.MAX_VALUE)
813 >        // converse of size -- count valid non-data nodes
814 >        for (;;) {
815 >            int count = 0;
816 >            Node<E> pred = traversalHead();
817 >            for (;;) {
818 >                Node<E> q = pred.next;
819 >                if (q == pred) // restart
820 >                    break;
821 >                if (q == null || q.isData)
822 >                    return count;
823 >                Object x = q.get();
824 >                if (x == null) {
825 >                    if (++count == Integer.MAX_VALUE) // saturated
826 >                        return count;
827 >                }
828 >                pred = q;
829 >            }
830 >        }
831 >    }
832 >
833 >    public boolean remove(Object o) {
834 >        if (o == null)
835 >            return false;
836 >        for (;;) {
837 >            Node<E> pred = traversalHead();
838 >            for (;;) {
839 >                Node<E> q = pred.next;
840 >                if (q == pred) // restart
841                      break;
842 +                if (q == null || !q.isData)
843 +                    return false;
844 +                Object x = q.get();
845 +                if (x != null && x != q && o.equals(x) &&
846 +                    q.compareAndSet(x, q)) {
847 +                    clean(pred, q);
848 +                    return true;
849 +                }
850 +                pred = q;
851              }
852          }
655        return count;
853      }
854  
855 +    /**
856 +     * Always returns {@code Integer.MAX_VALUE} because a
857 +     * {@code LinkedTransferQueue} is not capacity constrained.
858 +     *
859 +     * @return {@code Integer.MAX_VALUE} (as specified by
860 +     *         {@link BlockingQueue#remainingCapacity()})
861 +     */
862      public int remainingCapacity() {
863          return Integer.MAX_VALUE;
864      }
# Line 662 | Line 866 | public class LinkedTransferQueue<E> exte
866      /**
867       * Save the state to a stream (that is, serialize it).
868       *
869 <     * @serialData All of the elements (each an <tt>E</tt>) in
869 >     * @serialData All of the elements (each an {@code E}) in
870       * the proper order, followed by a null
871       * @param s the stream
872       */
873      private void writeObject(java.io.ObjectOutputStream s)
874          throws java.io.IOException {
875          s.defaultWriteObject();
876 <        for (Iterator<E> it = iterator(); it.hasNext(); )
877 <            s.writeObject(it.next());
876 >        for (E e : this)
877 >            s.writeObject(e);
878          // Use trailing null as sentinel
879          s.writeObject(null);
880      }
# Line 678 | Line 882 | public class LinkedTransferQueue<E> exte
882      /**
883       * Reconstitute the Queue instance from a stream (that is,
884       * deserialize it).
885 +     *
886       * @param s the stream
887       */
888      private void readObject(java.io.ObjectInputStream s)
# Line 685 | Line 890 | public class LinkedTransferQueue<E> exte
890          s.defaultReadObject();
891          resetHeadAndTail();
892          for (;;) {
893 <            E item = (E)s.readObject();
893 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
894              if (item == null)
895                  break;
896              else
# Line 693 | Line 898 | public class LinkedTransferQueue<E> exte
898          }
899      }
900  
696
901      // Support for resetting head/tail while deserializing
902 +    private void resetHeadAndTail() {
903 +        Node<E> dummy = new Node<E>(null, false);
904 +        UNSAFE.putObjectVolatile(this, headOffset,
905 +                                 new PaddedAtomicReference<Node<E>>(dummy));
906 +        UNSAFE.putObjectVolatile(this, tailOffset,
907 +                                 new PaddedAtomicReference<Node<E>>(dummy));
908 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
909 +                                 new PaddedAtomicReference<Node<E>>(null));
910 +    }
911 +
912 +    // Unsafe mechanics
913 +
914 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
915 +    private static final long headOffset =
916 +        objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
917 +    private static final long tailOffset =
918 +        objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
919 +    private static final long cleanMeOffset =
920 +        objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class);
921  
922 <    // Temporary Unsafe mechanics for preliminary release
923 <    private static final Unsafe _unsafe;
924 <    private static final long headOffset;
702 <    private static final long tailOffset;
703 <    private static final long cleanMeOffset;
704 <    static {
922 >
923 >    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
924 >                                  String field, Class<?> klazz) {
925          try {
926 <            if (LinkedTransferQueue.class.getClassLoader() != null) {
927 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
928 <                f.setAccessible(true);
929 <                _unsafe = (Unsafe)f.get(null);
930 <            }
931 <            else
712 <                _unsafe = Unsafe.getUnsafe();
713 <            headOffset = _unsafe.objectFieldOffset
714 <                (LinkedTransferQueue.class.getDeclaredField("head"));
715 <            tailOffset = _unsafe.objectFieldOffset
716 <                (LinkedTransferQueue.class.getDeclaredField("tail"));
717 <            cleanMeOffset = _unsafe.objectFieldOffset
718 <                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
719 <        } catch (Exception e) {
720 <            throw new RuntimeException("Could not initialize intrinsics", e);
926 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
927 >        } catch (NoSuchFieldException e) {
928 >            // Convert Exception to corresponding Error
929 >            NoSuchFieldError error = new NoSuchFieldError(field);
930 >            error.initCause(e);
931 >            throw error;
932          }
933      }
934  
935 <    private void resetHeadAndTail() {
936 <        QNode dummy = new QNode(null, false);
937 <        _unsafe.putObjectVolatile(this, headOffset, dummy);
938 <        _unsafe.putObjectVolatile(this, tailOffset, dummy);
939 <        _unsafe.putObjectVolatile(this, cleanMeOffset,
940 <                                  new PaddedAtomicReference<QNode>(null));
941 <
935 >    /**
936 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
937 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
938 >     * into a jdk.
939 >     *
940 >     * @return a sun.misc.Unsafe
941 >     */
942 >    private static sun.misc.Unsafe getUnsafe() {
943 >        try {
944 >            return sun.misc.Unsafe.getUnsafe();
945 >        } catch (SecurityException se) {
946 >            try {
947 >                return java.security.AccessController.doPrivileged
948 >                    (new java.security
949 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
950 >                        public sun.misc.Unsafe run() throws Exception {
951 >                            java.lang.reflect.Field f = sun.misc
952 >                                .Unsafe.class.getDeclaredField("theUnsafe");
953 >                            f.setAccessible(true);
954 >                            return (sun.misc.Unsafe) f.get(null);
955 >                        }});
956 >            } catch (java.security.PrivilegedActionException e) {
957 >                throw new RuntimeException("Could not initialize intrinsics",
958 >                                           e.getCause());
959 >            }
960 >        }
961      }
732
962   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines