/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain. Use, modify, and
* redistribute this code in any way without acknowledgement.
*/
package java.util.concurrent;
import java.util.*;
/**
* An unbounded queue of Delayed elements, in which
* elements can only be taken when their delay has expired.
**/
public class DelayQueue extends AbstractQueue
implements BlockingQueue {
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition canTake = lock.newCondition();
private final PriorityQueue q = new PriorityQueue();
public DelayQueue() {}
public boolean offer(E x) {
lock.lock();
try {
E first = q.peek();
q.offer(x);
if (first == null || x.compareTo(first) < 0)
canTake.signalAll();
return true;
}
finally {
lock.unlock();
}
}
public void put(E x) {
offer(x);
}
public boolean offer(E x, long time, TimeUnit unit) {
return offer(x);
}
public boolean add(E x) {
return offer(x);
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
canTake.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0)
canTake.awaitNanos(delay);
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
canTake.signalAll(); // wake up other takers
return x;
}
}
}
}
finally {
lock.unlock();
}
}
public E poll(long time, TimeUnit unit) throws InterruptedException {
lock.lockInterruptibly();
long nanos = unit.toNanos(time);
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = canTake.awaitNanos(nanos);
}
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
if (delay > nanos)
delay = nanos;
long timeLeft = canTake.awaitNanos(delay);
nanos -= delay - timeLeft;
}
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
canTake.signalAll();
return x;
}
}
}
}
finally {
lock.unlock();
}
}
public E poll() {
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
canTake.signalAll();
return x;
}
}
finally {
lock.unlock();
}
}
public E peek() {
lock.lock();
try {
return q.peek();
}
finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return q.size();
}
finally {
lock.unlock();
}
}
public void clear() {
lock.lock();
try {
q.clear();
}
finally {
lock.unlock();
}
}
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public Object[] toArray() {
lock.lock();
try {
return q.toArray();
}
finally {
lock.unlock();
}
}
public T[] toArray(T[] array) {
lock.lock();
try {
return q.toArray(array);
}
finally {
lock.unlock();
}
}
public boolean remove(Object x) {
lock.lock();
try {
return q.remove(x);
}
finally {
lock.unlock();
}
}
public Iterator iterator() {
lock.lock();
try {
return new Itr(q.iterator());
}
finally {
lock.unlock();
}
}
private class Itr implements Iterator {
private final Iterator iter;
Itr(Iterator i) {
iter = i;
}
public boolean hasNext() {
return iter.hasNext();
}
public E next() {
lock.lock();
try {
return iter.next();
}
finally {
lock.unlock();
}
}
public void remove() {
lock.lock();
try {
iter.remove();
}
finally {
lock.unlock();
}
}
}
}