ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SynchronousQueue.java (file contents):
Revision 1.46 by dl, Wed Jun 2 23:49:07 2004 UTC vs.
Revision 1.47 by dl, Wed Mar 2 17:15:26 2005 UTC

# Line 120 | Line 120 | public class SynchronousQueue<E> extends
120          abstract Node enq(Object x);
121          /** Remove and return node, or null if empty */
122          abstract Node deq();
123 +        /** Remove a cancelled node to avoid garbage retention. */
124 +        abstract void unlink(Node node);
125 +        /** Return true if a cancelled node might be on queue */
126 +        abstract boolean shouldUnlink(Node node);
127      }
128  
129      /**
# Line 148 | Line 152 | public class SynchronousQueue<E> extends
152              }
153              return p;
154          }
155 +
156 +        boolean shouldUnlink(Node node) {
157 +            return (node == last || node.next != null);
158 +        }
159 +
160 +
161 +        void unlink(Node node) {
162 +            Node p = head;
163 +            Node trail = null;
164 +            while (p != null) {
165 +                if (p == node) {
166 +                    Node next = p.next;
167 +                    if (trail == null)
168 +                        head = next;
169 +                    else
170 +                        trail.next = next;
171 +                    if (last == node)
172 +                        last = trail;
173 +                    break;
174 +                }
175 +                trail = p;
176 +                p = p.next;
177 +            }
178 +        }
179      }
180  
181      /**
# Line 169 | Line 197 | public class SynchronousQueue<E> extends
197              }
198              return p;
199          }
200 +
201 +        boolean shouldUnlink(Node node) {
202 +            // Return false if already dequeued or is bottom node (in which
203 +            // case we might retain at most one garbage node)
204 +            return (node == head || node.next != null);
205 +        }
206 +
207 +        void unlink(Node node) {
208 +            Node p = head;
209 +            Node trail = null;
210 +            while (p != null) {
211 +                if (p == node) {
212 +                    Node next = p.next;
213 +                    if (trail == null)
214 +                        head = next;
215 +                    else
216 +                        trail.next = next;
217 +                    break;
218 +                }
219 +                trail = p;
220 +                p = p.next;
221 +            }
222 +        }
223      }
224  
225 +    /*
226 +     * Unlink the given node from consumer queue.  Called by cancelled
227 +     * (timeout, interrupt) waiters to avoid garbage retention in the
228 +     * absence of producers.
229 +     */
230 +    private void unlinkCancelledConsumer(Node node) {
231 +        // Use a form of double-check to avoid unnecessary locking and
232 +        // traversal. The first check outside lock might
233 +        // conservatively report true.
234 +        if (waitingConsumers.shouldUnlink(node)) {
235 +            qlock.lock();
236 +            try {
237 +                if (waitingConsumers.shouldUnlink(node))
238 +                    waitingConsumers.unlink(node);
239 +            } finally {
240 +                qlock.unlock();
241 +            }
242 +        }
243 +    }
244 +
245 +    /*
246 +     * Unlink the given node from producer queue.  Symmetric
247 +     * to unlinkCancelledConsumer.
248 +     */
249 +    private void unlinkCancelledProducer(Node node) {
250 +        if (waitingProducers.shouldUnlink(node)) {
251 +            qlock.lock();
252 +            try {
253 +                if (waitingProducers.shouldUnlink(node))
254 +                    waitingProducers.unlink(node);
255 +            } finally {
256 +                qlock.unlock();
257 +            }
258 +        }
259 +    }
260 +        
261      /**
262       * Nodes each maintain an item and handle waits and signals for
263       * getting and setting it. The class extends
# Line 322 | Line 409 | public class SynchronousQueue<E> extends
409              }
410  
411              if (mustWait) {
412 <                node.waitForTake();
413 <                return;
412 >                try {
413 >                    node.waitForTake();
414 >                    return;
415 >                } catch (InterruptedException ex) {
416 >                    unlinkCancelledProducer(node);
417 >                    throw ex;
418 >                }
419              }
420  
421              else if (node.setItem(o))
# Line 363 | Line 455 | public class SynchronousQueue<E> extends
455                  qlock.unlock();
456              }
457  
458 <            if (mustWait)
459 <                return node.waitForTake(nanos);
458 >            if (mustWait) {
459 >                try {
460 >                    boolean x = node.waitForTake(nanos);
461 >                    if (!x)
462 >                        unlinkCancelledProducer(node);
463 >                    return x;
464 >                } catch (InterruptedException ex) {
465 >                    unlinkCancelledProducer(node);
466 >                    throw ex;
467 >                }
468 >            }
469  
470              else if (node.setItem(o))
471                  return true;
# Line 396 | Line 497 | public class SynchronousQueue<E> extends
497              }
498  
499              if (mustWait) {
500 <                Object x = node.waitForPut();
501 <                return (E)x;
500 >                try {
501 >                    Object x = node.waitForPut();
502 >                    return (E)x;
503 >                } catch (InterruptedException ex) {
504 >                    unlinkCancelledConsumer(node);
505 >                    throw ex;
506 >                }
507              }
508              else {
509                  Object x = node.getItem();
# Line 439 | Line 545 | public class SynchronousQueue<E> extends
545              }
546  
547              if (mustWait) {
548 <                Object x = node.waitForPut(nanos);
549 <                return (E)x;
548 >                try {
549 >                    Object x = node.waitForPut(nanos);
550 >                    if (x == null)
551 >                        unlinkCancelledConsumer(node);
552 >                    return (E)x;
553 >                } catch (InterruptedException ex) {
554 >                    unlinkCancelledConsumer(node);
555 >                    throw ex;
556 >                }
557              }
558              else {
559                  Object x = node.getItem();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines