[cvs] / jsr166 / src / main / java / util / concurrent / SynchronousQueue.java Repository:
ViewVC logotype

Diff of /jsr166/src/main/java/util/concurrent/SynchronousQueue.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.54, Sun Jun 19 23:13:42 2005 UTC revision 1.55, Mon Aug 1 15:26:40 2005 UTC
# Line 1  Line 1 
1  /*  /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166   * Written by Doug Lea, Bill Scherer, and Michael Scott with
3   * Expert Group and released to the public domain, as explained at   * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5   * http://creativecommons.org/licenses/publicdomain   * http://creativecommons.org/licenses/publicdomain
6   */   */
7    
8  package java.util.concurrent;  package java.util.concurrent;
9  import java.util.concurrent.locks.*;  import java.util.concurrent.locks.*;
10    import java.util.concurrent.atomic.*;
11  import java.util.*;  import java.util.*;
12    
13  /**  /**
# Line 34  Line 36 
36   * <p> This class supports an optional fairness policy for ordering   * <p> This class supports an optional fairness policy for ordering
37   * waiting producer and consumer threads.  By default, this ordering   * waiting producer and consumer threads.  By default, this ordering
38   * is not guaranteed. However, a queue constructed with fairness set   * is not guaranteed. However, a queue constructed with fairness set
39   * to <tt>true</tt> grants threads access in FIFO order. Fairness   * to <tt>true</tt> grants threads access in FIFO order.
  * generally decreases throughput but reduces variability and avoids  
  * starvation.  
40   *   *
41   * <p>This class and its iterator implement all of the   * <p>This class and its iterator implement all of the
42   * <em>optional</em> methods of the {@link Collection} and {@link   * <em>optional</em> methods of the {@link Collection} and {@link
# Line 55  Line 55 
55      private static final long serialVersionUID = -3223113410248163686L;      private static final long serialVersionUID = -3223113410248163686L;
56    
57      /*      /*
58        This implementation divides actions into two cases for puts:       * This class implements extensions of the dual stack and dual
59         * queue algorithms described in "Nonblocking Concurrent Objects
60        * An arriving producer that does not already have a waiting consumer       * with Condition Synchronization", by W. N. Scherer III and
61          creates a node holding item, and then waits for a consumer to take it.       * M. L. Scott.  18th Annual Conf. on Distributed Computing,
62        * An arriving producer that does already have a waiting consumer fills       * Oct. 2004 (see also
63          the slot node created by the consumer, and notifies it to continue.       * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
64         * The (Lifo) stack is used for non-fair mode, and the (Fifo)
65        And symmetrically, two for takes:       * queue for fair mode. The performance of the two is generally
66         * similar. Fifo usually supports higher throughput under
67        * An arriving consumer that does not already have a waiting producer       * contention but Lifo maintains higher thread locality in common
68          creates an empty slot node, and then waits for a producer to fill it.       * applications.
69        * An arriving consumer that does already have a waiting producer takes       *
70          item from the node created by the producer, and notifies it to continue.       * A dual queue (and similarly stack) is one that at any given
71         * time either holds "data" -- items provided by put operations,
72        When a put or take waiting for the actions of its counterpart       * or "requests" -- slots representing take operations, or is
73        aborts due to interruption or timeout, it marks the node       * empty. A call to "fulfill" (i.e., a call requesting an item
74        it created as "CANCELLED", which causes its counterpart to retry       * from a queue holding data or vice versa) dequeues a
75        the entire put or take sequence.       * complementary node.  The most interesting feature of these
76         * queues is that any operation can figure out which mode the
77        This requires keeping two simple queues, waitingProducers and       * queue is in, and act accordingly without needing locks.
78        waitingConsumers. Each of these can be FIFO (preserves fairness)       *
79        or LIFO (improves throughput).       * Both the queue and stack extend abstract class Transferer
80      */       * defining the single method transfer that does a put or a
81         * take. These are unified into a single method because in dual
82      /** Lock protecting both wait queues */       * data structures, the put and take operations are symmetrical,
83      private final ReentrantLock qlock;       * so nearly all code can be combined. The resulting transfer
84      /** Queue holding waiting puts */       * methods are on the long side, but are easier to follow than
85      private final WaitQueue waitingProducers;       * they would be if broken up into nearly-duplicated parts.
86      /** Queue holding waiting takes */       *
87      private final WaitQueue waitingConsumers;       * The queue and stack data structures share many conceptual
88         * similarities but very few concrete details. For simplicity,
89         * they are kept distinct so that they can later evolve
90         * separately.
91         *
92         * The algorithms here differ from the versions in the above paper
93         * in extending them for use in synchronous queues, as well as
94         * dealing with cancellation. The main differences include:
95         *
96         *  1. The orginal algorithms used bit-marked pointers, but
97         *     the ones here use mode bits in nodes, leading to a number
98         *     of further adaptations.
99         *  2. SynchronousQueues must block threads waiting to become
100         *     fulfilled.
101         *  3. Nodes/threads that have been cancelled due to timeouts
102         *     or interruptions are cleaned out of the lists to
103         *     avoid garbage retention and memory depletion.
104         *
105         * Blocking is mainly accomplished using LockSupport park/unpark,
106         * except that nodes that appear to be the next ones to become
107         * fulfilled first spin a bit (on multiprocessors only). On very
108         * busy synchronous queues, spinning can dramatically improve
109         * throughput. And on less busy ones, the amount of spinning is
110         * small enough not to be noticeable.
111         *
112         * Cleaning is done in different ways in queues vs stacks.  For
113         * queues, we can almost always remove a node immediately in O(1)
114         * time (modulo retries for consistency checks) when it is
115         * cancelled. But if it may be pinned as the current tail, it must
116         * wait until some subsequent cancellation. For stacks, we need a
117         * potentially O(n) traversal to be sure that we can remove the
118         * node, but this can run concurrently with other threads
119         * accessing the stack.
120         *
121         * While garbage collection takes care of most node reclamation
122         * issues that otherwise complicate nonblocking algorithms, care
123         * is made to "forget" references to data, other nodes, and
124         * threads that might be held on to long-term by blocked
125         * threads. In cases where setting to null would otherwise
126         * conflict with main algorithms, this is done by changing a
127         * node's link to now point to the node itself. This doesn't arise
128         * much for Stack nodes (because blocked threads do not hang on to
129         * old head pointers), but references in Queue nodes must be
130         * agressively forgotten to avoid reachability of everything any
131         * node has ever referred to since arrival.
132         */
133    
134        /**
135         * Shared internal API for dual stacks and queues.
136         */
137        static abstract class Transferer {
138            /**
139             * Perform a put or take.
140             * @param e if non-null, the item to be handed to a consumer;
141             * if null, requests that transfer return an item offered by
142             * producer.
143             * @param timed if this operation should timeout
144             * @param nanos the timeout, in nanoseconds
145             * @return if nonnull, the item provided or received; if null,
146             * the operation failed due to timeout or interrupt -- the
147             * caller can distinguish which of these occurred by checking
148             * Thread.interrupted.
149             */
150            abstract Object transfer(Object e, boolean timed, long nanos);
151        }
152    
153        /** The number of CPUs, for spin control */
154        static final int NCPUS = Runtime.getRuntime().availableProcessors();
155    
156        /**
157         * The number of times to spin before blocking in timed waits.
158         * The value is empirically derived -- it works well across a
159         * variety of processors and OSes. Emprically, the best value
160         * seems not to vary with number of CPUs (beyond 2) so is just
161         * a constant.
162         */
163        static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
164    
165        /**
166         * The number of times to spin before blocking in untimed
167         * waits.  This is greater than timed value because untimed
168         * waits spin faster since they don't need to check times on
169         * each spin.
170         */
171        static final int maxUntimedSpins = maxTimedSpins * 16;
172    
173      /**      /**
174       * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.       * The number of nanoseconds for which it is faster to spin
175         * rather than to use timed park. A rough estimate suffices.
176       */       */
177      public SynchronousQueue() {      static final long spinForTimeoutThreshold = 1000L;
178          this(false);  
179        /** Dual stack  */
180        static final class TransferStack extends Transferer {
181            /*
182             * This extends Scherer-Scott dual stack algorithm, differing,
183             * among other ways, by using "covering" nodes rather than
184             * bit-marked pointers: Fulfilling operations push on marker
185             * nodes (with FULFILLING bit set in mode) to reserve a spot
186             * to match a waiting node.
187             */
188    
189            /* Modes for SNodes, ORed together in node fields */
190            /** Node represents an unfulfilled consumer */
191            static final int REQUEST    = 0;
192            /** Node represents an unfulfilled producer */
193            static final int DATA       = 1;
194            /** Node is fulfilling another unfulfilled DATA or REQUEST */
195            static final int FULFILLING = 2;
196    
197            /** Return true if m has fulfilling bit set */
198            static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
199    
200            /** Node class for TransferStacks. */
201            static final class SNode {
202                volatile SNode next;        // next node in stack
203                volatile SNode match;       // the node matched to this
204                volatile Thread waiter;     // to control park/unpark
205                Object item;                // data; or null for REQUESTs
206                int mode;
207                // Note: item and mode fields don't need to be volatile
208                // since they are always written before, and read after,
209                // other volatile/atomic operations.
210    
211                SNode(Object item) {
212                    this.item = item;
213                }
214    
215                static final AtomicReferenceFieldUpdater<SNode, SNode>
216                    nextUpdater = AtomicReferenceFieldUpdater.newUpdater
217                    (SNode.class, SNode.class, "next");
218    
219                boolean casNext(SNode cmp, SNode val) {
220                    return (cmp == next &&
221                            nextUpdater.compareAndSet(this, cmp, val));
222                }
223    
224                static final AtomicReferenceFieldUpdater<SNode, SNode>
225                    matchUpdater = AtomicReferenceFieldUpdater.newUpdater
226                    (SNode.class, SNode.class, "match");
227    
228                /**
229                 * Try to match node s to this node, if so, waking up
230                 * thread. Fulfillers call tryMatch to identify their
231                 * waiters. Waiters block until they have been
232                 * matched.
233                 * @param s the node to match
234                 * @return true if successfully matched to s
235                 */
236                boolean tryMatch(SNode s) {
237                    if (match == null &&
238                        matchUpdater.compareAndSet(this, null, s)) {
239                        Thread w = waiter;
240                        if (w != null) {    // waiters need at most one unpark
241                            waiter = null;
242                            LockSupport.unpark(w);
243                        }
244                        return true;
245                    }
246                    return match == s;
247      }      }
248    
249      /**      /**
250       * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.               * Try to cancel a wait by matching node to itself.
      * @param fair if true, threads contend in FIFO order for access;  
      * otherwise the order is unspecified.  
251       */       */
252      public SynchronousQueue(boolean fair) {              void tryCancel() {
253          if (fair) {                  matchUpdater.compareAndSet(this, null, this);
254              qlock = new ReentrantLock(true);              }
255              waitingProducers = new FifoWaitQueue();  
256              waitingConsumers = new FifoWaitQueue();              boolean isCancelled() {
257                    return match == this;
258          }          }
         else {  
             qlock = new ReentrantLock();  
             waitingProducers = new LifoWaitQueue();  
             waitingConsumers = new LifoWaitQueue();  
259          }          }
260    
261            /** The head (top) of the stack */
262            volatile SNode head;
263    
264            static final AtomicReferenceFieldUpdater<TransferStack, SNode>
265                headUpdater = AtomicReferenceFieldUpdater.newUpdater
266                (TransferStack.class,  SNode.class, "head");
267    
268            boolean casHead(SNode h, SNode nh) {
269                return h == head && headUpdater.compareAndSet(this, h, nh);
270      }      }
271    
272      /**      /**
273       * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.           * Create or reset fields of a node. Called only from transfer
274       * These queues have all transient fields, but are serializable           * where the node to push on stack is lazily created and
275       * in order to recover fairness settings when deserialized.           * reused when possible to help reduce intervals between reads
276             * and CASes of head and to avoid surges of garbage when CASes
277             * to push nodes fail due to contention.
278       */       */
279      static abstract class WaitQueue implements java.io.Serializable {          static SNode snode(SNode s, Object e, SNode next, int mode) {
280          /** Creates, adds, and returns node for x. */              if (s == null) s = new SNode(e);
281          abstract Node enq(Object x);              s.mode = mode;
282          /** Removes and returns node, or null if empty. */              s.next = next;
283          abstract Node deq();              return s;
         /** Removes a cancelled node to avoid garbage retention. */  
         abstract void unlink(Node node);  
         /** Returns true if a cancelled node might be on queue. */  
         abstract boolean shouldUnlink(Node node);  
284      }      }
285    
286      /**      /**
287       * FIFO queue to hold waiting puts/takes.           * Put or take an item.
288             */
289            Object transfer(Object e, boolean timed, long nanos) {
290                /*
291                 * Basic algorithm is to loop trying one of three actions:
292                 *
293                 * 1. If apparently empty or already containing nodes of same
294                 *    mode, try to push node on stack and wait for a match,
295                 *    returning it, or null if cancelled.
296                 *
297                 * 2. If apparently containing node of complementary mode,
298                 *    try to push a fulfilling node on to stack, match
299                 *    with corresponding waiting node, pop both from
300                 *    stack, and return matched item. The matching or
301                 *    unlinking might not actually be necessary because of
302                 *    another threads performing action 3:
303                 *
304                 * 3. If top of stack already holds another fulfilling node,
305                 *    help it out by doing its match and/or pop
306                 *    operations, and then continue. The code for helping
307                 *    is essentially the same as for fulfilling, except
308                 *    that it doesn't return the item.
309       */       */
     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {  
         private static final long serialVersionUID = -3623113410248163686L;  
         private transient Node head;  
         private transient Node last;  
310    
311          Node enq(Object x) {              SNode s = null; // constructed/reused as needed
312              Node p = new Node(x);              int mode = (e == null)? REQUEST : DATA;
313              if (last == null)  
314                  last = head = p;              for (;;) {
315                    SNode h = head;
316                    if (h == null || h.mode == mode) {  // empty or same-mode
317                        if (timed && nanos <= 0) {      // can't wait
318                            if (h != null && h.isCancelled())
319                                casHead(h, h.next);     // pop cancelled node
320              else              else
321                  last = last.next = p;                              return null;
322              return p;                      } else if (casHead(h, s = snode(s, e, h, mode))) {
323                            SNode m = awaitFulfill(s, timed, nanos);
324                            if (m == s) {               // wait was cancelled
325                                clean(s);
326                                return null;
327                            }
328                            if ((h = head) != null && h.next == s)
329                                casHead(h, s.next);     // help s's fulfiller
330                            return mode == REQUEST? m.item : s.item;
331                        }
332                    } else if (!isFulfilling(h.mode)) { // try to fulfill
333                        if (h.isCancelled())            // already cancelled
334                            casHead(h, h.next);         // pop and retry
335                        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
336                            for (;;) { // loop until matched or waiters disappear
337                                SNode m = s.next;       // m is s's match
338                                if (m == null) {        // all waiters are gone
339                                    casHead(s, null);   // pop fulfill node
340                                    s = null;           // use new node next time
341                                    break;              // restart main loop
342                                }
343                                SNode mn = m.next;
344                                if (m.tryMatch(s)) {
345                                    casHead(s, mn);     // pop both s and m
346                                    return (mode == REQUEST)? m.item : s.item;
347                                } else                  // lost match
348                                    s.casNext(m, mn);   // help unlink
349                            }
350                        }
351                    } else {                            // help a fulfiller
352                        SNode m = h.next;               // m is h's match
353                        if (m == null)                  // waiter is gone
354                            casHead(h, null);           // pop fulfilling node
355                        else {
356                            SNode mn = m.next;
357                            if (m.tryMatch(h))          // help match
358                                casHead(h, mn);         // pop both h and m
359                            else                        // lost match
360                                h.casNext(m, mn);       // help unlink
361          }          }
   
         Node deq() {  
             Node p = head;  
             if (p != null) {  
                 if ((head = p.next) == null)  
                     last = null;  
                 p.next = null;  
362              }              }
             return p;  
363          }          }
   
         boolean shouldUnlink(Node node) {  
             return (node == last || node.next != null);  
364          }          }
365    
366          void unlink(Node node) {          /**
367              Node p = head;           * Spin/block until node s is matched by a fulfill operation.
368              Node trail = null;           * @param s the waiting node
369              while (p != null) {           * @param timed true if timed wait
370                  if (p == node) {           * @param nanos timeout value
371                      Node next = p.next;           * @return matched node, or s if cancelled
372                      if (trail == null)           */
373                          head = next;          SNode awaitFulfill(SNode s, boolean timed, long nanos) {
374                      else              /*
375                          trail.next = next;               * When a node/thread is about to block, it sets its waiter
376                      if (last == node)               * field and then rechecks state at least one more time
377                          last = trail;               * before actually parking, thus covering race vs
378                      break;               * fulfiller noticing that waiter is nonnull so should be
379                 * woken.
380                 *
381                 * When invoked by nodes that appear at the point of call
382                 * to be at the head of the stack, calls to park are
383                 * preceded by spins to avoid blocking when producers and
384                 * consumers are arriving very close in time.  This can
385                 * happen enough to bother only on multiprocessors.
386                 *
387                 * The order of checks for returning out of main loop
388                 * reflects fact that interrupts have precedence over
389                 * normal returns, which have precedence over
390                 * timeouts. (So, on timeout, one last check for match is
391                 * done before giving up.) Except that calls from untimed
392                 * SynchronousQueue.{poll/offer} don't check interrupts
393                 * and don't wait at all, so are trapped in transfer
394                 * method rather than calling awaitFulfill.
395                 */
396                long lastTime = (timed)? System.nanoTime() : 0;
397                Thread w = Thread.currentThread();
398                SNode h = head;
399                int spins = (shouldSpin(s)?
400                             (timed? maxTimedSpins : maxUntimedSpins) : 0);
401                for (;;) {
402                    if (w.isInterrupted())
403                        s.tryCancel();
404                    SNode m = s.match;
405                    if (m != null)
406                        return m;
407                    if (timed) {
408                        long now = System.nanoTime();
409                        nanos -= now - lastTime;
410                        lastTime = now;
411                        if (nanos <= 0) {
412                            s.tryCancel();
413                            continue;
414                  }                  }
                 trail = p;  
                 p = p.next;  
415              }              }
416                    if (spins > 0)
417                        spins = shouldSpin(s)? (spins-1) : 0;
418                    else if (s.waiter == null)
419                        s.waiter = w; // establish waiter so can park next iter
420                    else if (!timed)
421                        LockSupport.park(this);
422                    else if (nanos > spinForTimeoutThreshold)
423                        LockSupport.parkNanos(this, nanos);
424          }          }
425      }      }
426    
427      /**      /**
428       * LIFO queue to hold waiting puts/takes.           * Return true if node s is at head or there is an active
429             * fulfiller.
430       */       */
431      static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {          boolean shouldSpin(SNode s) {
432          private static final long serialVersionUID = -3633113410248163686L;              SNode h = head;
433          private transient Node head;              return (h == null || h == s || isFulfilling(h.mode));
   
         Node enq(Object x) {  
             return head = new Node(x, head);  
         }  
   
         Node deq() {  
             Node p = head;  
             if (p != null) {  
                 head = p.next;  
                 p.next = null;  
             }  
             return p;  
434          }          }
435    
436          boolean shouldUnlink(Node node) {          /**
437              // Return false if already dequeued or is bottom node (in which           * Unlink s from the stack
438              // case we might retain at most one garbage node)           */
439              return (node == head || node.next != null);          void clean(SNode s) {
440          }              s.item = null;   // forget item
441                s.waiter = null; // forget thread
442    
443          void unlink(Node node) {              /*
444              Node p = head;               * At worst we may need to traverse entire stack to unlink
445              Node trail = null;               * s. If there are multiple concurrent calls to clean, we
446              while (p != null) {               * might not see s if another thread has already removed
447                  if (p == node) {               * it. But we can stop when we see any node known to
448                      Node next = p.next;               * follow s. We use s.next unless it too is cancelled, in
449                      if (trail == null)               * which case we try the node one past. We don't check any
450                          head = next;               * futher because we don't want to doubly traverse just to
451                 * find sentinel.
452                 */
453    
454                SNode past = s.next;
455                if (past != null && past.isCancelled())
456                    past = past.next;
457    
458                // Absorb cancelled nodes at head
459                SNode p;
460                while ((p = head) != null && p != past && p.isCancelled())
461                    casHead(p, p.next);
462    
463                // Unsplice embedded nodes
464                while (p != null && p != past) {
465                    SNode n = p.next;
466                    if (n != null && n.isCancelled())
467                        p.casNext(n, n.next);
468                      else                      else
469                          trail.next = next;                      p = n;
                     break;  
                 }  
                 trail = p;  
                 p = p.next;  
470              }              }
471          }          }
472      }      }
473    
474      /**      /** Dual Queue. */
475       * Unlinks the given node from consumer queue.  Called by cancelled      static final class TransferQueue extends Transferer {
476       * (timeout, interrupt) waiters to avoid garbage retention in the          /*
477       * absence of producers.           * This extends Scherer-Scott dual queue algorithm, differing,
478             * among other ways, by using modes within nodes rather than
479             * marked pointers. The algorithm is a little simpler than
480             * that for stacks because fulfillers do not need explicit
481             * nodes, and matching is done by CAS'ing QNode.item field
482             * from nonnull to null (for put) or vice versa (for take).
483       */       */
484      private void unlinkCancelledConsumer(Node node) {  
485          // Use a form of double-check to avoid unnecessary locking and          /** Node class for TransferQueue. */
486          // traversal. The first check outside lock might          static final class QNode {
487          // conservatively report true.              volatile QNode next;          // next node in queue
488          if (waitingConsumers.shouldUnlink(node)) {              volatile Object item;         // CAS'ed to or from null
489              qlock.lock();              volatile Thread waiter;       // to control park/unpark
490              try {              final boolean isData;
491                  if (waitingConsumers.shouldUnlink(node))  
492                      waitingConsumers.unlink(node);              QNode(Object item, boolean isData) {
493              } finally {                  this.item = item;
494                  qlock.unlock();                  this.isData = isData;
495              }              }
496    
497                static final AtomicReferenceFieldUpdater<QNode, QNode>
498                    nextUpdater = AtomicReferenceFieldUpdater.newUpdater
499                    (QNode.class, QNode.class, "next");
500    
501                boolean casNext(QNode cmp, QNode val) {
502                    return (next == cmp &&
503                            nextUpdater.compareAndSet(this, cmp, val));
504          }          }
505    
506                static final AtomicReferenceFieldUpdater<QNode, Object>
507                    itemUpdater = AtomicReferenceFieldUpdater.newUpdater
508                    (QNode.class, Object.class, "item");
509    
510                boolean casItem(Object cmp, Object val) {
511                    return (item == cmp &&
512                            itemUpdater.compareAndSet(this, cmp, val));
513      }      }
514    
515      /**      /**
516       * Unlinks the given node from producer queue.  Symmetric               * Try to cancel by CAS'ing ref to this as item.
      * to unlinkCancelledConsumer.  
517       */       */
518      private void unlinkCancelledProducer(Node node) {              void tryCancel(Object cmp) {
519          if (waitingProducers.shouldUnlink(node)) {                  itemUpdater.compareAndSet(this, cmp, this);
             qlock.lock();  
             try {  
                 if (waitingProducers.shouldUnlink(node))  
                     waitingProducers.unlink(node);  
             } finally {  
                 qlock.unlock();  
520              }              }
521    
522                boolean isCancelled() {
523                    return item == this;
524          }          }
525      }      }
526    
527            /** Head of queue */
528            transient volatile QNode head;
529            /** Tail of queue */
530            transient volatile QNode tail;
531      /**      /**
532       * Nodes each maintain an item and handle waits and signals for           * Reference to a cancelled node that might not yet have been
533       * getting and setting it. The class extends           * unlinked from queue because it was the last inserted node
534       * AbstractQueuedSynchronizer to manage blocking, using AQS state           * when it cancelled.
      *  0 for waiting, 1 for ack, -1 for cancelled.  
535       */       */
536      static final class Node extends AbstractQueuedSynchronizer {          transient volatile QNode cleanMe;
         private static final long serialVersionUID = -2631493897867746127L;  
537    
538          /** Synchronization state value representing that node acked */          TransferQueue() {
539          private static final int ACK    =  1;              QNode h = new QNode(null, false); // initialize to dummy node.
540          /** Synchronization state value representing that node cancelled */              head = h;
541          private static final int CANCEL = -1;              tail = h;
542            }
         /** The item being transferred */  
         Object item;  
         /** Next node in wait queue */  
         Node next;  
   
         /** Creates a node with initial item */  
         Node(Object x) { item = x; }  
543    
544          /** Creates a node with initial item and next */          static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
545          Node(Object x, Node n) { item = x; next = n; }              headUpdater = AtomicReferenceFieldUpdater.newUpdater
546                (TransferQueue.class,  QNode.class, "head");
547    
548          /**          /**
549           * Implements AQS base acquire to succeed if not in WAITING state           * Try to cas nh as new head; if successful unlink
550             * old head's next node to avoid garbage retention.
551           */           */
552          protected boolean tryAcquire(int ignore) {          void advanceHead(QNode h, QNode nh) {
553              return getState() != 0;              if (h == head && headUpdater.compareAndSet(this, h, nh))
554                    h.next = h; // forget old next
555          }          }
556    
557            static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
558                tailUpdater = AtomicReferenceFieldUpdater.newUpdater
559                (TransferQueue.class, QNode.class, "tail");
560    
561          /**          /**
562           * Implements AQS base release to signal if state changed           * Try to cas nt as new tail.
563           */           */
564          protected boolean tryRelease(int newState) {          void advanceTail(QNode t, QNode nt) {
565              return compareAndSetState(0, newState);              if (tail == t)
566                    tailUpdater.compareAndSet(this, t, nt);
567          }          }
568    
569            static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
570                cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
571                (TransferQueue.class, QNode.class, "cleanMe");
572    
573          /**          /**
574           * Takes item and nulls out field (for sake of GC)           * Try to CAS cleanMe slot
575           */           */
576          private Object extract() {          boolean casCleanMe(QNode cmp, QNode val) {
577              Object x = item;              return (cleanMe == cmp &&
578              item = null;                      cleanMeUpdater.compareAndSet(this, cmp, val));
             return x;  
579          }          }
580    
581          /**          /**
582           * Tries to cancel on interrupt; if so rethrowing,           * Put or take an item.
          * else setting interrupt state  
583           */           */
584          private void checkCancellationOnInterrupt(InterruptedException ie)          Object transfer(Object e, boolean timed, long nanos) {
585              throws InterruptedException {              /* Basic algorithm is to loop trying to take either of
586              if (release(CANCEL))               * two actions:
587                  throw ie;               *
588              Thread.currentThread().interrupt();               * 1. If queue apparently empty or holding same-mode nodes,
589                 *    try to add node to queue of waiters, wait to be
590                 *    fulfilled (or cancelled) and return matching item.
591                 *
592                 * 2. If queue apparently contains waiting items, and this
593                 *    call is of complementary mode, try to fulfill by CAS'ing
594                 *    item field of waiting node and dequeuing it, and then
595                 *    returning matching item.
596                 *
597                 * In each case, along the way, check for and try to help
598                 * advance head and tail on behalf of other stalled/slow
599                 * threads.
600                 *
601                 * The loop starts off with a null check guarding against
602                 * seeing uninitialized head or tail values. This never
603                 * happens in current SynchronousQueue, but could if
604                 * callers held non-volatile/final ref to the
605                 * transferer. The check is here anyway because it places
606                 * null checks at top of loop, which is usually faster
607                 * than having them implicitly interspersed.
608                 */
609    
610                QNode s = null; // constructed/reused as needed
611                boolean isData = (e != null);
612    
613                for (;;) {
614                    QNode t = tail;
615                    QNode h = head;
616                    if (t == null || h == null)         // saw unitialized values
617                        continue;                       // spin
618    
619                    if (h == t || t.isData == isData) { // empty or same-mode
620                        QNode tn = t.next;
621                        if (t != tail)                  // inconsistent read
622                            continue;
623                        if (tn != null) {               // lagging tail
624                            advanceTail(t, tn);
625                            continue;
626                        }
627                        if (timed && nanos <= 0)        // can't wait
628                            return null;
629                        if (s == null)
630                            s = new QNode(e, isData);
631                        if (!t.casNext(null, s))        // failed to link in
632                            continue;
633    
634                        advanceTail(t, s);              // swing tail and wait
635                        Object x = awaitFulfill(s, e, timed, nanos);
636                        if (x == s) {                   // wait was cancelled
637                            clean(t, s);
638                            return null;
639          }          }
640    
641          /**                      if (s.next != s) {              // not already unlinked
642           * Fills in the slot created by the consumer and signal consumer to                          advanceHead(t, s);          // unlink
643           * continue.                          if (x != null)              // and forget fields
644           */                              s.item = s;
645          boolean setItem(Object x) {                          s.waiter = null;
             item = x; // can place in slot even if cancelled  
             return release(ACK);  
646          }          }
647                        return (x != null)? x : e;
648    
649          /**                  } else {                            // complementary-mode
650           * Removes item from slot created by producer and signal producer                      QNode m = h.next;               // node to fulfill
651           * to continue.                      if (t != tail || m == null || h != head)
652           */                          continue;                   // inconsistent read
653          Object getItem() {  
654              return (release(ACK))? extract() : null;                      Object x = m.item;
655                        if (isData == (x != null) ||    // m already fulfilled
656                            x == m ||                   // m cancelled
657                            !m.casItem(x, e)) {         // lost CAS
658                            advanceHead(h, m);          // dequeue and retry
659                            continue;
660          }          }
661    
662          /**                      advanceHead(h, m);              // successfully fulfilled
663           * Waits for a consumer to take item placed by producer.                      LockSupport.unpark(m.waiter);
664           */                      return (x != null)? x : e;
665          void waitForTake() throws InterruptedException {                  }
             try {  
                 acquireInterruptibly(0);  
             } catch (InterruptedException ie) {  
                 checkCancellationOnInterrupt(ie);  
666              }              }
667          }          }
668    
669          /**          /**
670           * Waits for a producer to put item placed by consumer.           * Spin/block until node s is fulfilled.
671             * @param s the waiting node
672             * @param e the comparison value for checking match
673             * @param timed true if timed wait
674             * @param nanos timeout value
675             * @return matched item, or s if cancelled
676           */           */
677          Object waitForPut() throws InterruptedException {          Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
678              try {              /* Same idea as TransferStack.awaitFulfill */
679                  acquireInterruptibly(0);              long lastTime = (timed)? System.nanoTime() : 0;
680              } catch (InterruptedException ie) {              Thread w = Thread.currentThread();
681                  checkCancellationOnInterrupt(ie);              int spins = ((head.next == s) ?
682                             (timed? maxTimedSpins : maxUntimedSpins) : 0);
683                for (;;) {
684                    if (w.isInterrupted())
685                        s.tryCancel(e);
686                    Object x = s.item;
687                    if (x != e)
688                        return x;
689                    if (timed) {
690                        long now = System.nanoTime();
691                        nanos -= now - lastTime;
692                        lastTime = now;
693                        if (nanos <= 0) {
694                            s.tryCancel(e);
695                            continue;
696                        }
697                    }
698                    if (spins > 0)
699                        --spins;
700                    else if (s.waiter == null)
701                        s.waiter = w;
702                    else if (!timed)
703                        LockSupport.park(this);
704                    else if (nanos > spinForTimeoutThreshold)
705                        LockSupport.parkNanos(this, nanos);
706              }              }
             return extract();  
707          }          }
708    
709          /**          /**
710           * Waits for a consumer to take item placed by producer or time out.           * Get rid of cancelled node s with original predecessor pred.
711           */           */
712          boolean waitForTake(long nanos) throws InterruptedException {          void clean(QNode pred, QNode s) {
713              try {              s.waiter = null; // forget thread
714                  if (!tryAcquireNanos(0, nanos) &&              /*
715                      release(CANCEL))               * At any given time, exactly one node on list cannot be
716                      return false;               * deleted -- the last inserted node. To accommodate this,
717              } catch (InterruptedException ie) {               * if we cannot delete s, we save its predecessor as
718                  checkCancellationOnInterrupt(ie);               * "cleanMe", deleting the previously saved version
719                 * first. At least one of node s or the node previously
720                 * saved can always be deleted, so this always terminates.
721                 */
722                while (pred.next == s) { // Return early if already unlinked
723                    QNode h = head;
724                    QNode hn = h.next;   // Absorb cancelled first node as head
725                    if (hn != null && hn.isCancelled()) {
726                        advanceHead(h, hn);
727                        continue;
728                    }
729                    QNode t = tail;      // Ensure consistent read for tail
730                    if (t == h)
731                        return;
732                    QNode tn = t.next;
733                    if (t != tail)
734                        continue;
735                    if (tn != null) {
736                        advanceTail(t, tn);
737                        continue;
738                    }
739                    if (s != t) {        // If not tail, try to unsplice
740                        QNode sn = s.next;
741                        if (sn == s || pred.casNext(s, sn))
742                            return;
743                    }
744                    QNode dp = cleanMe;
745                    if (dp != null) {    // Try unlinking previous cancelled node
746                        QNode d = dp.next;
747                        QNode dn;
748                        if (d == null ||               // d is gone or
749                            d == dp ||                 // d is off list or
750                            !d.isCancelled() ||        // d not cancelled or
751                            (d != t &&                 // d not tail and
752                             (dn = d.next) != null &&  //   has successor
753                             dn != d &&                //   that is on list
754                             dp.casNext(d, dn)))       // d unspliced
755                            casCleanMe(dp, null);
756                        if (dp == pred)
757                            return;      // s is already saved node
758                    } else if (casCleanMe(null, pred))
759                        return;          // Postpone cleaning s
760                }
761              }              }
             return true;  
762          }          }
763    
764          /**          /**
765           * Waits for a producer to put item placed by consumer, or time out.       * The transferer. Set only in constructor, but cannot be declared
766         * as final without further complicating serialization.  Since
767         * this is accessed only once per public method, there isn't a
768         * noticeable performance penalty for using volatile instead of
769         * final here.
770           */           */
771          Object waitForPut(long nanos) throws InterruptedException {      private transient volatile Transferer transferer;
772              try {  
773                  if (!tryAcquireNanos(0, nanos) &&      /**
774                      release(CANCEL))       * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
775                      return null;       */
776              } catch (InterruptedException ie) {      public SynchronousQueue() {
777                  checkCancellationOnInterrupt(ie);          this(false);
             }  
             return extract();  
778          }          }
779    
780        /**
781         * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
782         * @param fair if true, waiting threads contend in FIFO order for access;
783         * otherwise the order is unspecified.
784         */
785        public SynchronousQueue(boolean fair) {
786            transferer = (fair)? new TransferQueue() : new TransferStack();
787      }      }
788    
789      /**      /**
# Line 393  Line 793 
793       * @throws InterruptedException {@inheritDoc}       * @throws InterruptedException {@inheritDoc}
794       * @throws NullPointerException {@inheritDoc}       * @throws NullPointerException {@inheritDoc}
795       */       */
796      public void put(E e) throws InterruptedException {      public void put(E o) throws InterruptedException {
797          if (e == null) throw new NullPointerException();          if (o == null) throw new NullPointerException();
798          final ReentrantLock qlock = this.qlock;          if (transferer.transfer(o, false, 0) == null)
799                throw new InterruptedException();
         for (;;) {  
             Node node;  
             boolean mustWait;  
             if (Thread.interrupted()) throw new InterruptedException();  
             qlock.lock();  
             try {  
                 node = waitingConsumers.deq();  
                 if ( (mustWait = (node == null)) )  
                     node = waitingProducers.enq(e);  
             } finally {  
                 qlock.unlock();  
             }  
   
             if (mustWait) {  
                 try {  
                     node.waitForTake();  
                     return;  
                 } catch (InterruptedException ex) {  
                     unlinkCancelledProducer(node);  
                     throw ex;  
                 }  
             }  
   
             else if (node.setItem(e))  
                 return;  
   
             // else consumer cancelled, so retry  
         }  
800      }      }
801    
802      /**      /**
# Line 436  Line 808 
808       * @throws InterruptedException {@inheritDoc}       * @throws InterruptedException {@inheritDoc}
809       * @throws NullPointerException {@inheritDoc}       * @throws NullPointerException {@inheritDoc}
810       */       */
811      public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {      public boolean offer(E o, long timeout, TimeUnit unit)
812          if (e == null) throw new NullPointerException();          throws InterruptedException {
813          long nanos = unit.toNanos(timeout);          if (o == null) throw new NullPointerException();
814          final ReentrantLock qlock = this.qlock;          if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
         for (;;) {  
             Node node;  
             boolean mustWait;  
             if (Thread.interrupted()) throw new InterruptedException();  
             qlock.lock();  
             try {  
                 node = waitingConsumers.deq();  
                 if ( (mustWait = (node == null)) )  
                     node = waitingProducers.enq(e);  
             } finally {  
                 qlock.unlock();  
             }  
   
             if (mustWait) {  
                 try {  
                     boolean x = node.waitForTake(nanos);  
                     if (!x)  
                         unlinkCancelledProducer(node);  
                     return x;  
                 } catch (InterruptedException ex) {  
                     unlinkCancelledProducer(node);  
                     throw ex;  
                 }  
             }  
   
             else if (node.setItem(e))  
815                  return true;                  return true;
816            if (!Thread.interrupted())
817              // else consumer cancelled, so retry              return false;
818            throw new InterruptedException();
819          }          }
820    
821        /**
822         * Inserts the specified element into this queue, if another thread is
823         * waiting to receive it.
824         *
825         * @param e the element to add
826         * @return <tt>true</tt> if the element was added to this queue, else
827         *         <tt>false</tt>
828         * @throws NullPointerException if the specified element is null
829         */
830        public boolean offer(E e) {
831            if (e == null) throw new NullPointerException();
832            return transferer.transfer(e, true, 0) != null;
833      }      }
834    
835      /**      /**
# Line 480  Line 840 
840       * @throws InterruptedException {@inheritDoc}       * @throws InterruptedException {@inheritDoc}
841       */       */
842      public E take() throws InterruptedException {      public E take() throws InterruptedException {
843          final ReentrantLock qlock = this.qlock;          Object e = transferer.transfer(null, false, 0);
844          for (;;) {          if (e != null)
845              Node node;              return (E)e;
846              boolean mustWait;          throw new InterruptedException();
   
             if (Thread.interrupted()) throw new InterruptedException();  
             qlock.lock();  
             try {  
                 node = waitingProducers.deq();  
                 if ( (mustWait = (node == null)) )  
                     node = waitingConsumers.enq(null);  
             } finally {  
                 qlock.unlock();  
             }  
   
             if (mustWait) {  
                 try {  
                     Object x = node.waitForPut();  
                     return (E)x;  
                 } catch (InterruptedException ex) {  
                     unlinkCancelledConsumer(node);  
                     throw ex;  
                 }  
             }  
             else {  
                 Object x = node.getItem();  
                 if (x != null)  
                     return (E)x;  
                 // else cancelled, so retry  
             }  
         }  
847      }      }
848    
849      /**      /**
# Line 523  Line 856 
856       * @throws InterruptedException {@inheritDoc}       * @throws InterruptedException {@inheritDoc}
857       */       */
858      public E poll(long timeout, TimeUnit unit) throws InterruptedException {      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
859          long nanos = unit.toNanos(timeout);          Object e = transferer.transfer(null, true, unit.toNanos(timeout));
860          final ReentrantLock qlock = this.qlock;          if (e != null || !Thread.interrupted())
861                return (E)e;
862          for (;;) {          throw new InterruptedException();
             Node node;  
             boolean mustWait;  
   
             if (Thread.interrupted()) throw new InterruptedException();  
             qlock.lock();  
             try {  
                 node = waitingProducers.deq();  
                 if ( (mustWait = (node == null)) )  
                     node = waitingConsumers.enq(null);  
             } finally {  
                 qlock.unlock();  
             }  
   
             if (mustWait) {  
                 try {  
                     Object x = node.waitForPut(nanos);  
                     if (x == null)  
                         unlinkCancelledConsumer(node);  
                     return (E)x;  
                 } catch (InterruptedException ex) {  
                     unlinkCancelledConsumer(node);  
                     throw ex;  
                 }  
             }  
             else {  
                 Object x = node.getItem();  
                 if (x != null)  
                     return (E)x;  
                 // else cancelled, so retry  
             }  
         }  
     }  
   
     // Untimed nonblocking versions  
   
     /**  
      * Inserts the specified element into this queue, if another thread is  
      * waiting to receive it.  
      *  
      * @param e the element to add  
      * @return <tt>true</tt> if the element was added to this queue, else  
      *         <tt>false</tt>  
      * @throws NullPointerException if the specified element is null  
      */  
     public boolean offer(E e) {  
         if (e == null) throw new NullPointerException();  
         final ReentrantLock qlock = this.qlock;  
   
         for (;;) {  
             Node node;  
             qlock.lock();  
             try {  
                 node = waitingConsumers.deq();  
             } finally {  
                 qlock.unlock();  
             }  
             if (node == null)  
                 return false;  
   
             else if (node.setItem(e))  
                 return true;  
             // else retry  
         }  
863      }      }
864    
865      /**      /**
# Line 600  Line 870 
870       *         element is available.       *         element is available.
871       */       */
872      public E poll() {      public E poll() {
873          final ReentrantLock qlock = this.qlock;          return (E)transferer.transfer(null, true, 0);
         for (;;) {  
             Node node;  
             qlock.lock();  
             try {  
                 node = waitingProducers.deq();  
             } finally {  
                 qlock.unlock();  
             }  
             if (node == null)  
                 return null;  
   
             else {  
                 Object x = node.getItem();  
                 if (x != null)  
                     return (E)x;  
                 // else retry  
             }  
         }  
874      }      }
875    
876      /**      /**
877       * Always returns <tt>true</tt>.       * Always returns <tt>true</tt>.
878       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
      *  
879       * @return <tt>true</tt>       * @return <tt>true</tt>
880       */       */
881      public boolean isEmpty() {      public boolean isEmpty() {
# Line 634  Line 885 
885      /**      /**
886       * Always returns zero.       * Always returns zero.
887       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
888       *       * @return zero.
      * @return zero  
889       */       */
890      public int size() {      public int size() {
891          return 0;          return 0;
# Line 644  Line 894 
894      /**      /**
895       * Always returns zero.       * Always returns zero.
896       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
897       *       * @return zero.
      * @return zero  
898       */       */
899      public int remainingCapacity() {      public int remainingCapacity() {
900          return 0;          return 0;
# Line 655  Line 904 
904       * Does nothing.       * Does nothing.
905       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
906       */       */
907      public void clear() {}      public void clear() {
908        }
909    
910      /**      /**
911       * Always returns <tt>false</tt>.       * Always returns <tt>false</tt>.
912       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
913       *       * @param o the element
      * @param o object to be checked for containment in this queue  
914       * @return <tt>false</tt>       * @return <tt>false</tt>
915       */       */
916      public boolean contains(Object o) {      public boolean contains(Object o) {
# Line 680  Line 929 
929      }      }
930    
931      /**      /**
932       * Returns <tt>false</tt> unless the given collection is empty.       * Returns <tt>false</tt> unless given collection is empty.
933       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
      *  
934       * @param c the collection       * @param c the collection
935       * @return <tt>false</tt> unless the given collection is empty       * @return <tt>false</tt> unless given collection is empty
      * @throws NullPointerException if the specified collection is null  
936       */       */
937      public boolean containsAll(Collection<?> c) {      public boolean containsAll(Collection<?> c) {
938          return c.isEmpty();          return c.isEmpty();
# Line 694  Line 941 
941      /**      /**
942       * Always returns <tt>false</tt>.       * Always returns <tt>false</tt>.
943       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
      *  
944       * @param c the collection       * @param c the collection
945       * @return <tt>false</tt>       * @return <tt>false</tt>
946       */       */
# Line 705  Line 951 
951      /**      /**
952       * Always returns <tt>false</tt>.       * Always returns <tt>false</tt>.
953       * A <tt>SynchronousQueue</tt> has no internal capacity.       * A <tt>SynchronousQueue</tt> has no internal capacity.
      *  
954       * @param c the collection       * @param c the collection
955       * @return <tt>false</tt>       * @return <tt>false</tt>
956       */       */
# Line 717  Line 962 
962       * Always returns <tt>null</tt>.       * Always returns <tt>null</tt>.
963       * A <tt>SynchronousQueue</tt> does not return elements       * A <tt>SynchronousQueue</tt> does not return elements
964       * unless actively waited on.       * unless actively waited on.
      *  
965       * @return <tt>null</tt>       * @return <tt>null</tt>
966       */       */
967      public E peek() {      public E peek() {
968          return null;          return null;
969      }      }
970    
   
971      static class EmptyIterator<E> implements Iterator<E> {      static class EmptyIterator<E> implements Iterator<E> {
972          public boolean hasNext() {          public boolean hasNext() {
973              return false;              return false;
# Line 747  Line 990 
990          return new EmptyIterator<E>();          return new EmptyIterator<E>();
991      }      }
992    
   
993      /**      /**
994       * Returns a zero-length array.       * Returns a zero-length array.
995       * @return a zero-length array       * @return a zero-length array
# Line 809  Line 1051 
1051          }          }
1052          return n;          return n;
1053      }      }
1054    
1055        /*
1056         * To cope with serialization strategy in the 1.5 version of
1057         * SynchronousQueue, we declare some unused classes and fields
1058         * that exist solely to enable serializability across versions.
1059         * These fields are never used, so are initialized only if this
1060         * object is ever serialized or deserialized.
1061         */
1062    
1063        static class WaitQueue implements java.io.Serializable { }
1064        static class LifoWaitQueue extends WaitQueue {
1065            private static final long serialVersionUID = -3633113410248163686L;
1066        }
1067        static class FifoWaitQueue extends WaitQueue {
1068            private static final long serialVersionUID = -3623113410248163686L;
1069        }
1070        private ReentrantLock qlock;
1071        private WaitQueue waitingProducers;
1072        private WaitQueue waitingConsumers;
1073    
1074        /**
1075         * Save the state to a stream (that is, serialize it).
1076         *
1077         * @param s the stream
1078         */
1079        private void writeObject(java.io.ObjectOutputStream s)
1080            throws java.io.IOException {
1081            boolean fair = transferer instanceof TransferQueue;
1082            if (fair) {
1083                qlock = new ReentrantLock(true);
1084                waitingProducers = new FifoWaitQueue();
1085                waitingConsumers = new FifoWaitQueue();
1086            }
1087            else {
1088                qlock = new ReentrantLock();
1089                waitingProducers = new LifoWaitQueue();
1090                waitingConsumers = new LifoWaitQueue();
1091            }
1092            s.defaultWriteObject();
1093        }
1094    
1095        private void readObject(final java.io.ObjectInputStream s)
1096            throws java.io.IOException, ClassNotFoundException {
1097            s.defaultReadObject();
1098            if (waitingProducers instanceof FifoWaitQueue)
1099                transferer = new TransferQueue();
1100            else
1101                transferer = new TransferStack();
1102        }
1103    
1104  }  }

Legend:
Removed from v.1.54  
changed lines
  Added in v.1.55

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8