/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} of * Delayed elements, in which an element can only be taken * when its delay has expired. The head of the queue is that * Delayed element whose delay expired furthest in the * past. If no delay has expired there is no head and poll * will return null. Expiration occurs when an element's * getDelay(TimeUnit.NANOSECONDS) method returns a value less * than or equal to zero. This queue does not permit null * elements. * *

This class and its iterator implement all of the * optional methods of the {@link Collection} and {@link * Iterator} interfaces. * *

This class is a member of the * * Java Collections Framework. * * @since 1.5 * @author Doug Lea * @param the type of elements held in this collection */ public class DelayQueue extends AbstractQueue implements BlockingQueue { private transient final ReentrantLock lock = new ReentrantLock(); private transient final Condition available = lock.newCondition(); private final PriorityQueue q = new PriorityQueue(); /** * Creates a new DelayQueue that is initially empty. */ public DelayQueue() {} /** * Creates a DelayQueue initially containing the elements of the * given collection of {@link Delayed} instances. * * @param c the collection * @throws NullPointerException if c or any element within it * is null. * */ public DelayQueue(Collection c) { this.addAll(c); } /** * Inserts the specified element into this delay queue. * * @param e the element to add * @return true * @throws NullPointerException if the specified element is null. */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } } /** * Adds the specified element to this delay queue. As the queue is * unbounded this method will never block. * @param e the element to add * @throws NullPointerException if the specified element is null. */ public void put(E e) { offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return true * @throws NullPointerException if the specified element is null. */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } /** * Adds the specified element to this queue. * @param e the element to add * @return true (as per the general contract of * Collection.add). * * @throws NullPointerException if the specified element is null. */ public boolean add(E e) { return offer(e); } /** * Retrieves and removes the head of this queue, waiting * if no elements with an expired delay are present on this queue. * @return the head of this queue * @throws InterruptedException if interrupted while waiting. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting * if necessary up to the specified wait time if no elements with * an expired delay are * present on this queue. * @param timeout how long to wait before giving up, in units of * unit * @param unit a TimeUnit determining how to interpret the * timeout parameter * @return the head of this queue, or null if the * specified waiting time elapses before an element with * an expired delay is present. * @throws InterruptedException if interrupted while waiting. */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); long nanos = unit.toNanos(timeout); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { if (delay > nanos) delay = nanos; long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } } } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, or null * if this queue has no elements with an expired delay. * * @return the head of this queue, or null if this * queue has no elements with an expired delay. */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } finally { lock.unlock(); } } /** * Retrieves, but does not remove, the head of this queue, * returning null if this queue has no elements with an * expired delay. * * @return the head of this queue, or null if this queue * has no elements with an expired delay. */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } if (n > 0) available.signalAll(); return n; } finally { lock.unlock(); } } public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } if (n > 0) available.signalAll(); return n; } finally { lock.unlock(); } } /** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * Always returns Integer.MAX_VALUE because * a DelayQueue is not capacity constrained. * @return Integer.MAX_VALUE */ public int remainingCapacity() { return Integer.MAX_VALUE; } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } public T[] toArray(T[] array) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(array); } finally { lock.unlock(); } } /** * Removes a single instance of the specified element from this * queue, if it is present. */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * Returns an iterator over the elements in this queue. The iterator * does not return the elements in any particular order. The * returned iterator is a thread-safe "fast-fail" iterator that will * throw {@link ConcurrentModificationException} * upon detected interference. * * @return an iterator over the elements in this queue. */ public Iterator iterator() { final ReentrantLock lock = this.lock; lock.lock(); try { return new Itr(q.iterator()); } finally { lock.unlock(); } } private class Itr implements Iterator { private final Iterator iter; Itr(Iterator i) { iter = i; } public boolean hasNext() { return iter.hasNext(); } public E next() { final ReentrantLock lock = DelayQueue.this.lock; lock.lock(); try { return iter.next(); } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = DelayQueue.this.lock; lock.lock(); try { iter.remove(); } finally { lock.unlock(); } } } }