/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;
/**
* An unbounded {@linkplain BlockingQueue blocking queue} of
* Delayed elements, in which an element can only be taken
* when its delay has expired. The head of the queue is that
* Delayed element whose delay expired furthest in the
* past. If no delay has expired there is no head and poll
* will return null. Expiration occurs when an element's
* getDelay(TimeUnit.NANOSECONDS) method returns a value less
* than or equal to zero. This queue does not permit null
* elements.
*
*
This class and its iterator implement all of the
* optional methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
*
This class is a member of the
*
* Java Collections Framework.
*
* @since 1.5
* @author Doug Lea
* @param the type of elements held in this collection
*/
public class DelayQueue extends AbstractQueue
implements BlockingQueue {
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue q = new PriorityQueue();
/**
* Creates a new DelayQueue that is initially empty.
*/
public DelayQueue() {}
/**
* Creates a DelayQueue initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection extends E> c) {
this.addAll(c);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return true (as per the spec for {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return true
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return true
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* Retrieves and removes the head of this queue, or returns null
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or null if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or null if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
long nanos = unit.toNanos(timeout);
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
if (delay > nanos)
delay = nanos;
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
}
}
} finally {
lock.unlock();
}
}
/**
* Retrieves, but does not remove, the head of this queue,
* returning null if this queue has no elements with an
* expired delay.
*
* @return the head of this queue, or null if this
* queue has no elements with an expired delay
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (;;) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
break;
c.add(q.poll());
++n;
}
if (n > 0)
available.signalAll();
return n;
} finally {
lock.unlock();
}
}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
while (n < maxElements) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
break;
c.add(q.poll());
++n;
}
if (n > 0)
available.signalAll();
return n;
} finally {
lock.unlock();
}
}
/**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
* Elements with an unexpired delay are not waited for; they are
* simply discarded from the queue.
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
/**
* Always returns Integer.MAX_VALUE because
* a DelayQueue is not capacity constrained.
*
* @return Integer.MAX_VALUE
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
/**
* Returns an array containing all of the elements in this queue.
* The returned array elements are in no particular order.
*
* The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
*
This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
/**
* Returns an array containing all of the elements in this queue; the
* runtime type of the returned array is that of the specified array.
* The returned array elements are in no particular order.
* If the queue fits in the specified array, it is returned therein.
* Otherwise, a new array is allocated with the runtime type of the
* specified array and the size of this queue.
*
*
If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* null.
*
*
Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
*
Suppose x is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
* allocated array of String:
*
*
* String[] y = x.toArray(new String[0]);
*
* Note that toArray(new Object[0]) is identical in function to
* toArray().
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
public T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
/**
* Returns an iterator over the elements in this queue. The iterator
* does not return the elements in any particular order. The
* returned iterator is a thread-safe "fast-fail" iterator that will
* throw {@link ConcurrentModificationException}
* upon detected interference.
*
* @return an iterator over the elements in this queue
*/
public Iterator iterator() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return new Itr(q.iterator());
} finally {
lock.unlock();
}
}
private class Itr implements Iterator {
private final Iterator iter;
Itr(Iterator i) {
iter = i;
}
public boolean hasNext() {
return iter.hasNext();
}
public E next() {
final ReentrantLock lock = DelayQueue.this.lock;
lock.lock();
try {
return iter.next();
} finally {
lock.unlock();
}
}
public void remove() {
final ReentrantLock lock = DelayQueue.this.lock;
lock.lock();
try {
iter.remove();
} finally {
lock.unlock();
}
}
}
}