/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} of Delayed * elements, in which an element can only be taken when its delay has expired. * The head of the queue is that Delayed element whose delay * expired furthest in the past - if no delay has expired there is no head and * poll will return null. * This queue does not permit null elements. * @since 1.5 * @author Doug Lea */ public class DelayQueue extends AbstractQueue implements BlockingQueue { private transient final ReentrantLock lock = new ReentrantLock(); private transient final Condition available = lock.newCondition(); private final PriorityQueue q = new PriorityQueue(); /** * Creates a new DelayQueue that is initially empty. */ public DelayQueue() {} /** * Creates a DelayQueue initialy containing the elements of the * given collection of {@link Delayed} instances. * * @throws NullPointerException if c or any element within it * is null * * @fixme Should the body be wrapped with try-lock-finally-unlock? */ public DelayQueue(Collection c) { this.addAll(c); } /** * Add the specified element to this delay queue. * * @return true * @throws NullPointerException {@inheritDoc} */ public boolean offer(E o) { lock.lock(); try { E first = q.peek(); q.offer(o); if (first == null || o.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } } /** * Adds the specified element to this delay queue. As the queue is * unbounded this method will never block. * @throws NullPointerException {@inheritDoc} */ public void put(E o) { offer(o); } /** * Adds the specified element to this priority queue. As the queue is * unbounded this method will never block. * @param o {@inheritDoc} * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @throws NullPointerException {@inheritDoc} * @return true */ public boolean offer(E o, long time, TimeUnit unit) { return offer(o); } /** * Adds the specified element to this queue. * @return true (as per the general contract of * Collection.add). * * @throws NullPointerException {@inheritDoc} */ public boolean add(E o) { return offer(o); } public E take() throws InterruptedException { lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) available.awaitNanos(delay); else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } } public E poll(long time, TimeUnit unit) throws InterruptedException { lock.lockInterruptibly(); long nanos = unit.toNanos(time); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { if (delay > nanos) delay = nanos; long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } } } finally { lock.unlock(); } } public E poll() { lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } finally { lock.unlock(); } } public E peek() { lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { lock.lock(); try { return q.size(); } finally { lock.unlock(); } } /** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. */ public void clear() { lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * Always returns Integer.MAX_VALUE because * a DelayQueue is not capacity constrained. * @return Integer.MAX_VALUE */ public int remainingCapacity() { return Integer.MAX_VALUE; } public Object[] toArray() { lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } public T[] toArray(T[] array) { lock.lock(); try { return q.toArray(array); } finally { lock.unlock(); } } /** * Removes a single instance of the specified element from this * queue, if it is present. More formally, * removes an element e such that (o==null ? e==null : * o.equals(e)), if the queue contains one or more such * elements. Returns true if the queue contained the * specified element (or equivalently, if the queue changed as a * result of the call). * *

This implementation iterates over the queue looking for the * specified element. If it finds the element, it removes the element * from the queue using the iterator's remove method.

* */ public boolean remove(Object o) { lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * Returns an iterator over the elements in this queue. The iterator * does not return the elements in any particular order. * * @return an iterator over the elements in this queue. */ public Iterator iterator() { lock.lock(); try { return new Itr(q.iterator()); } finally { lock.unlock(); } } private class Itr implements Iterator { private final Iterator iter; Itr(Iterator i) { iter = i; } public boolean hasNext() { return iter.hasNext(); } public E next() { lock.lock(); try { return iter.next(); } finally { lock.unlock(); } } public void remove() { lock.lock(); try { iter.remove(); } finally { lock.unlock(); } } } }