ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SynchronousQueue.java (file contents):
Revision 1.57 by jsr166, Thu Aug 18 18:34:54 2005 UTC vs.
Revision 1.58 by jsr166, Thu Aug 18 20:44:14 2005 UTC

# Line 36 | Line 36 | import java.util.*;
36   * <p> This class supports an optional fairness policy for ordering
37   * waiting producer and consumer threads.  By default, this ordering
38   * is not guaranteed. However, a queue constructed with fairness set
39 < * to <tt>true</tt> grants threads access in FIFO order.
39 > * to <tt>true</tt> grants threads access in FIFO order.
40   *
41   * <p>This class and its iterator implement all of the
42   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 98 | Line 98 | public class SynchronousQueue<E> extends
98       *     of further adaptations.
99       *  2. SynchronousQueues must block threads waiting to become
100       *     fulfilled.
101 <     *  3. Support for cancellation via timeout and interrupts,
102 <     *     including cleaning out cancelled nodes/threads
101 >     *  3. Support for cancellation via timeout and interrupts,
102 >     *     including cleaning out cancelled nodes/threads
103       *     from lists to avoid garbage retention and memory depletion.
104       *
105       * Blocking is mainly accomplished using LockSupport park/unpark,
# Line 247 | Line 247 | public class SynchronousQueue<E> extends
247              }
248  
249              /**
250 <             * Try to cancel a wait by matching node to itself.
250 >             * Try to cancel a wait by matching node to itself.
251               */
252              void tryCancel() {
253                  matchUpdater.compareAndSet(this, null, this);
# Line 315 | Line 315 | public class SynchronousQueue<E> extends
315                  SNode h = head;
316                  if (h == null || h.mode == mode) {  // empty or same-mode
317                      if (timed && nanos <= 0) {      // can't wait
318 <                        if (h != null && h.isCancelled())
318 >                        if (h != null && h.isCancelled())
319                              casHead(h, h.next);     // pop cancelled node
320                          else
321 <                            return null;
321 >                            return null;
322                      } else if (casHead(h, s = snode(s, e, h, mode))) {
323                          SNode m = awaitFulfill(s, timed, nanos);
324                          if (m == s) {               // wait was cancelled
# Line 406 | Line 406 | public class SynchronousQueue<E> extends
406                      return m;
407                  if (timed) {
408                      long now = System.nanoTime();
409 <                    nanos -= now - lastTime;                    
409 >                    nanos -= now - lastTime;
410                      lastTime = now;
411                      if (nanos <= 0) {
412                          s.tryCancel();
# Line 437 | Line 437 | public class SynchronousQueue<E> extends
437           * Unlinks s from the stack.
438           */
439          void clean(SNode s) {
440 <            s.item = null;   // forget item
440 >            s.item = null;   // forget item
441              s.waiter = null; // forget thread
442  
443              /*
# Line 487 | Line 487 | public class SynchronousQueue<E> extends
487              volatile QNode next;          // next node in queue
488              volatile Object item;         // CAS'ed to or from null
489              volatile Thread waiter;       // to control park/unpark
490 <            final boolean isData;
490 >            final boolean isData;
491  
492              QNode(Object item, boolean isData) {
493                  this.item = item;
# Line 513 | Line 513 | public class SynchronousQueue<E> extends
513              }
514  
515              /**
516 <             * Try to cancel by CAS'ing ref to this as item.  
516 >             * Try to cancel by CAS'ing ref to this as item.
517               */
518              void tryCancel(Object cmp) {
519                  itemUpdater.compareAndSet(this, cmp, this);
# Line 523 | Line 523 | public class SynchronousQueue<E> extends
523                  return item == this;
524              }
525  
526 <            /**
526 >            /**
527               * Returns true if this node is known to be off the queue
528               * because its next pointer has been forgotten due to
529               * an advanceHead operation.
# Line 591 | Line 591 | public class SynchronousQueue<E> extends
591           * Puts or takes an item.
592           */
593          Object transfer(Object e, boolean timed, long nanos) {
594 <            /* Basic algorithm is to loop trying to take either of
594 >            /* Basic algorithm is to loop trying to take either of
595               * two actions:
596               *
597 <             * 1. If queue apparently empty or holding same-mode nodes,
597 >             * 1. If queue apparently empty or holding same-mode nodes,
598               *    try to add node to queue of waiters, wait to be
599               *    fulfilled (or cancelled) and return matching item.
600               *
# Line 697 | Line 697 | public class SynchronousQueue<E> extends
697                      return x;
698                  if (timed) {
699                      long now = System.nanoTime();
700 <                    nanos -= now - lastTime;                    
700 >                    nanos -= now - lastTime;
701                      lastTime = now;
702                      if (nanos <= 0) {
703                          s.tryCancel(e);
# Line 761 | Line 761 | public class SynchronousQueue<E> extends
761                           (dn = d.next) != null &&  //   has successor
762                           dn != d &&                //   that is on list
763                           dp.casNext(d, dn)))       // d unspliced
764 <                        casCleanMe(dp, null);
765 <                    if (dp == pred)                
764 >                        casCleanMe(dp, null);
765 >                    if (dp == pred)
766                          return;      // s is already saved node
767 <                } else if (casCleanMe(null, pred))
767 >                } else if (casCleanMe(null, pred))
768                      return;          // Postpone cleaning s
769              }
770          }
# Line 817 | Line 817 | public class SynchronousQueue<E> extends
817       * @throws InterruptedException {@inheritDoc}
818       * @throws NullPointerException {@inheritDoc}
819       */
820 <    public boolean offer(E o, long timeout, TimeUnit unit)
820 >    public boolean offer(E o, long timeout, TimeUnit unit)
821          throws InterruptedException {
822          if (o == null) throw new NullPointerException();
823          if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines