ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/DelayQueue.java (file contents):
Revision 1.47 by jsr166, Wed Aug 8 16:08:40 2007 UTC vs.
Revision 1.48 by jsr166, Tue Oct 9 20:43:49 2007 UTC

# Line 39 | Line 39 | public class DelayQueue<E extends Delaye
39      implements BlockingQueue<E> {
40  
41      private transient final ReentrantLock lock = new ReentrantLock();
42    private transient final Condition available = lock.newCondition();
42      private final PriorityQueue<E> q = new PriorityQueue<E>();
43  
44      /**
45 +     * Thread designated to wait for the element at the head of
46 +     * the queue.  This variant of the Leader-Follower pattern
47 +     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
48 +     * minimize unnecessary timed waiting.  When a thread becomes
49 +     * the leader, it waits only for the next delay to elapse, but
50 +     * other threads await indefinitely.  The leader thread must
51 +     * signal some other thread before returning from take() or
52 +     * poll(...), unless some other thread becomes leader in the
53 +     * interim.  Whenever the head of the queue is replaced with
54 +     * an element with an earlier expiration time, the leader
55 +     * field is invalidated by being reset to null, and some
56 +     * waiting thread, but not necessarily the current leader, is
57 +     * signalled.  So waiting threads must be prepared to acquire
58 +     * and lose leadership while waiting.
59 +     */
60 +    private Thread leader = null;
61 +
62 +    /**
63 +     * Condition signalled when a newer element becomes available
64 +     * at the head of the queue or a new thread may need to
65 +     * become leader.
66 +     */
67 +    private final Condition available = lock.newCondition();
68 +
69 +    /**
70       * Creates a new <tt>DelayQueue</tt> that is initially empty.
71       */
72      public DelayQueue() {}
# Line 81 | Line 105 | public class DelayQueue<E extends Delaye
105          final ReentrantLock lock = this.lock;
106          lock.lock();
107          try {
84            E first = q.peek();
108              q.offer(e);
109 <            if (first == null || e.compareTo(first) < 0)
110 <                available.signalAll();
109 >            if (q.peek() == e) {
110 >                leader = null;
111 >                available.signal();
112 >            }
113              return true;
114          } finally {
115              lock.unlock();
# Line 130 | Line 155 | public class DelayQueue<E extends Delaye
155              E first = q.peek();
156              if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
157                  return null;
158 <            else {
159 <                E x = q.poll();
135 <                assert x != null;
136 <                if (q.size() != 0)
137 <                    available.signalAll();
138 <                return x;
139 <            }
158 >            else
159 >                return q.poll();
160          } finally {
161              lock.unlock();
162          }
# Line 155 | Line 175 | public class DelayQueue<E extends Delaye
175          try {
176              for (;;) {
177                  E first = q.peek();
178 <                if (first == null) {
178 >                if (first == null)
179                      available.await();
180 <                } else {
181 <                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
182 <                    if (delay > 0) {
183 <                        long tl = available.awaitNanos(delay);
184 <                    } else {
185 <                        E x = q.poll();
186 <                        assert x != null;
187 <                        if (q.size() != 0)
188 <                            available.signalAll(); // wake up other takers
189 <                        return x;
190 <
180 >                else {
181 >                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
182 >                    if (delay <= 0)
183 >                        return q.poll();
184 >                    else if (leader != null)
185 >                        available.await();
186 >                    else {
187 >                        Thread thisThread = Thread.currentThread();
188 >                        leader = thisThread;
189 >                        try {
190 >                            available.awaitNanos(delay);
191 >                        } finally {
192 >                            if (leader == thisThread)
193 >                                leader = null;
194 >                        }
195                      }
196                  }
197              }
198          } finally {
199 +            if (leader == null && q.peek() != null)
200 +                available.signal();
201              lock.unlock();
202          }
203      }
# Line 200 | Line 226 | public class DelayQueue<E extends Delaye
226                          nanos = available.awaitNanos(nanos);
227                  } else {
228                      long delay = first.getDelay(TimeUnit.NANOSECONDS);
229 <                    if (delay > 0) {
230 <                        if (nanos <= 0)
231 <                            return null;
232 <                        if (delay > nanos)
233 <                            delay = nanos;
234 <                        long timeLeft = available.awaitNanos(delay);
235 <                        nanos -= delay - timeLeft;
236 <                    } else {
237 <                        E x = q.poll();
238 <                        assert x != null;
239 <                        if (q.size() != 0)
240 <                            available.signalAll();
241 <                        return x;
242 <                    }
229 >                    if (delay <= 0)
230 >                        return q.poll();
231 >                    if (nanos <= 0)
232 >                        return null;
233 >                    if (nanos < delay || leader != null)
234 >                        nanos = available.awaitNanos(nanos);
235 >                    else {
236 >                        Thread thisThread = Thread.currentThread();
237 >                        leader = thisThread;
238 >                        try {
239 >                            long timeLeft = available.awaitNanos(delay);
240 >                            nanos -= delay - timeLeft;
241 >                        } finally {
242 >                            if (leader == thisThread)
243 >                                leader = null;
244 >                        }
245 >                    }
246                  }
247              }
248          } finally {
249 +            if (leader == null && q.peek() != null)
250 +                available.signal();
251              lock.unlock();
252          }
253      }
# Line 273 | Line 304 | public class DelayQueue<E extends Delaye
304                  c.add(q.poll());
305                  ++n;
306              }
276            if (n > 0)
277                available.signalAll();
307              return n;
308          } finally {
309              lock.unlock();
# Line 305 | Line 334 | public class DelayQueue<E extends Delaye
334                  c.add(q.poll());
335                  ++n;
336              }
308            if (n > 0)
309                available.signalAll();
337              return n;
338          } finally {
339              lock.unlock();
# Line 455 | Line 482 | public class DelayQueue<E extends Delaye
482              return cursor < array.length;
483          }
484  
485 +        @SuppressWarnings("unchecked")
486          public E next() {
487              if (cursor >= array.length)
488                  throw new NoSuchElementException();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines