ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.8 by dl, Fri Oct 3 00:39:48 2008 UTC vs.
Revision 1.9 by dl, Sun Nov 16 20:24:54 2008 UTC

# Line 51 | Line 51 | public class LinkedTransferQueue<E> exte
51      private static final long serialVersionUID = -3223113410248163686L;
52  
53      /*
54     * This is still a work in progress...
55     *
54       * This class extends the approach used in FIFO-mode
55       * SynchronousQueues. See the internal documentation, as well as
56       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
57       * Lea & Scott
58       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
59       *
60 <     * The main extension is to provide different Wait modes
61 <     * for the main "xfer" method that puts or takes items.
62 <     * These don't impact the basic dual-queue logic, but instead
63 <     * control whether or how threads block upon insertion
64 <     * of request or data nodes into the dual queue.
60 >     * The main extension is to provide different Wait modes for the
61 >     * main "xfer" method that puts or takes items.  These don't
62 >     * impact the basic dual-queue logic, but instead control whether
63 >     * or how threads block upon insertion of request or data nodes
64 >     * into the dual queue. It also uses slightly different
65 >     * conventions for tracking whether nodes are off-list or
66 >     * cancelled.
67       */
68  
69      // Wait modes for xfer method
# Line 97 | Line 97 | public class LinkedTransferQueue<E> exte
97      static final long spinForTimeoutThreshold = 1000L;
98  
99      /**
100 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
101 <     * AtomicReference to represent item. Uses Object, not E, to allow
102 <     * setting item to "this" after use, to avoid garbage
103 <     * retention. Similarly, setting the next field to this is used as
104 <     * sentinel that node is off list.
100 >     * Node class for LinkedTransferQueue. Opportunistically
101 >     * subclasses from AtomicReference to represent item. Uses Object,
102 >     * not E, to allow setting item to "this" after use, to avoid
103 >     * garbage retention. Similarly, setting the next field to this is
104 >     * used as sentinel that node is off list.
105       */
106      static final class QNode extends AtomicReference<Object> {
107          volatile QNode next;
# Line 159 | Line 159 | public class LinkedTransferQueue<E> exte
159  
160      /**
161       * Puts or takes an item. Used for most queue operations (except
162 <     * poll() and tryTransfer())
162 >     * poll() and tryTransfer()). See the similar code in
163 >     * SynchronousQueue for detailed explanation.
164       * @param e the item or if null, signifies that this is a take
165       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
166       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 266 | Line 267 | public class LinkedTransferQueue<E> exte
267              Object x = s.get();
268              if (x != e) {                 // Node was matched or cancelled
269                  advanceHead(pred, s);     // unlink if head
270 <                if (x == s)               // was cancelled
271 <                    return clean(pred, s);
270 >                if (x == s) {              // was cancelled
271 >                    clean(pred, s);
272 >                    return null;
273 >                }
274                  else if (x != null) {
275                      s.set(s);             // avoid garbage retention
276                      return x;
# Line 275 | Line 278 | public class LinkedTransferQueue<E> exte
278                  else
279                      return e;
280              }
278
281              if (mode == TIMEOUT) {
282                  long now = System.nanoTime();
283                  nanos -= now - lastTime;
# Line 311 | Line 313 | public class LinkedTransferQueue<E> exte
313      }
314  
315      /**
316 +     * Returns validated tail for use in cleaning methods
317 +     */
318 +    private QNode getValidatedTail() {
319 +        for (;;) {
320 +            QNode h = head.get();
321 +            QNode first = h.next;
322 +            if (first != null && first.next == first) { // help advance
323 +                advanceHead(h, first);
324 +                continue;
325 +            }
326 +            QNode t = tail.get();
327 +            QNode last = t.next;
328 +            if (t == tail.get()) {
329 +                if (last != null)
330 +                    tail.compareAndSet(t, last); // help advance
331 +                else
332 +                    return t;
333 +            }
334 +        }
335 +    }    
336 +
337 +    /**
338       * Gets rid of cancelled node s with original predecessor pred.
339 <     * @return null (to simplify use by callers)
339 >     * @param pred predecessor of cancelled node
340 >     * @param s the cancelled node
341       */
342 <    private Object clean(QNode pred, QNode s) {
342 >    private void clean(QNode pred, QNode s) {
343          Thread w = s.waiter;
344          if (w != null) {             // Wake up thread
345              s.waiter = null;
346              if (w != Thread.currentThread())
347                  LockSupport.unpark(w);
348          }
349 +        /*
350 +         * At any given time, exactly one node on list cannot be
351 +         * deleted -- the last inserted node. To accommodate this, if
352 +         * we cannot delete s, we save its predecessor as "cleanMe",
353 +         * processing the previously saved version first. At least one
354 +         * of node s or the node previously saved can always be
355 +         * processed, so this always terminates.
356 +         */
357 +        while (pred.next == s) {
358 +            QNode oldpred = reclean();  // First, help get rid of cleanMe
359 +            QNode t = getValidatedTail();
360 +            if (s != t) {               // If not tail, try to unsplice
361 +                QNode sn = s.next;      // s.next == s means s already off list
362 +                if (sn == s || pred.casNext(s, sn))
363 +                    break;
364 +            }
365 +            else if (oldpred == pred || // Already saved
366 +                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
367 +                break;                  // Postpone cleaning
368 +        }
369 +    }
370  
371 <        for (;;) {
372 <            if (pred.next != s) // already cleaned
373 <                return null;
374 <            QNode h = head.get();
375 <            QNode hn = h.next;   // Absorb cancelled first node as head
376 <            if (hn != null && hn.next == hn) {
377 <                advanceHead(h, hn);
378 <                continue;
379 <            }
380 <            QNode t = tail.get();      // Ensure consistent read for tail
381 <            if (t == h)
382 <                return null;
383 <            QNode tn = t.next;
384 <            if (t != tail.get())
385 <                continue;
386 <            if (tn != null) {          // Help advance tail
387 <                tail.compareAndSet(t, tn);
388 <                continue;
389 <            }
390 <            if (s != t) {             // If not tail, try to unsplice
391 <                QNode sn = s.next;
392 <                if (sn == s || pred.casNext(s, sn))
393 <                    return null;
394 <            }
395 <            QNode dp = cleanMe.get();
396 <            if (dp != null) {    // Try unlinking previous cancelled node
351 <                QNode d = dp.next;
352 <                QNode dn;
353 <                if (d == null ||               // d is gone or
354 <                    d == dp ||                 // d is off list or
355 <                    d.get() != d ||            // d not cancelled or
356 <                    (d != t &&                 // d not tail and
357 <                     (dn = d.next) != null &&  //   has successor
358 <                     dn != d &&                //   that is on list
359 <                     dp.casNext(d, dn)))       // d unspliced
360 <                    cleanMe.compareAndSet(dp, null);
361 <                if (dp == pred)
362 <                    return null;      // s is already saved node
371 >    /**
372 >     * Tries to unsplice the cancelled node held in cleanMe that was
373 >     * previously uncleanable because it was at tail.
374 >     * @return current cleanMe node (or null)
375 >     */
376 >    private QNode reclean() {
377 >        /*
378 >         * cleanMe is, or at one time was, predecessor of cancelled
379 >         * node s that was the tail so could not be unspliced.  If s
380 >         * is no longer the tail, try to unsplice if necessary and
381 >         * make cleanMe slot available.  This differs from similar
382 >         * code in clean() because we must check that pred still
383 >         * points to a cancelled node that must be unspliced -- if
384 >         * not, we can (must) clear cleanMe without unsplicing.
385 >         * This can loop only due to contention on casNext or
386 >         * clearing cleanMe.
387 >         */
388 >        QNode pred;
389 >        while ((pred = cleanMe.get()) != null) {
390 >            QNode t = getValidatedTail();
391 >            QNode s = pred.next;
392 >            if (s != t) {
393 >                QNode sn;
394 >                if (s == null || s == pred || s.get() != s ||
395 >                    (sn = s.next) == s || pred.casNext(s, sn))
396 >                    cleanMe.compareAndSet(pred, null);
397              }
398 <            else if (cleanMe.compareAndSet(null, pred))
399 <                return null;          // Postpone cleaning s
398 >            else // s is still tail; cannot clean
399 >                break;
400          }
401 +        return pred;
402      }
403  
404      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines