/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} of Delayed * elements, in which an element can only be taken when its delay has expired. * The head of the queue is that Delayed element whose delay * expired furthest in the past - if no delay has expired there is no head and * poll will return null. * This queue does not permit null elements. *

This class implements all of the optional methods * of the {@link Collection} and {@link Iterator} interfaces. * @since 1.5 * @author Doug Lea * @param the type of elements held in this collection */ public class DelayQueue extends AbstractQueue implements BlockingQueue { private transient final ReentrantLock lock = new ReentrantLock(); private transient final ReentrantLock.ConditionObject available = lock.newCondition(); private final PriorityQueue q = new PriorityQueue(); /** * Creates a new DelayQueue that is initially empty. */ public DelayQueue() {} /** * Creates a DelayQueue initially containing the elements of the * given collection of {@link Delayed} instances. * * @throws NullPointerException if c or any element within it * is null * */ public DelayQueue(Collection c) { this.addAll(c); } /** * Inserts the specified element into this delay queue. * * @param o the element to add * @return true * @throws NullPointerException if the specified element is null. */ public boolean offer(E o) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(o); if (first == null || o.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } } /** * Adds the specified element to this delay queue. As the queue is * unbounded this method will never block. * @param o the element to add * @throws NullPointerException if the specified element is null. */ public void put(E o) { offer(o); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * @param o the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return true * @throws NullPointerException if the specified element is null. */ public boolean offer(E o, long timeout, TimeUnit unit) { return offer(o); } /** * Adds the specified element to this queue. * @param o the element to add * @return true (as per the general contract of * Collection.add). * * @throws NullPointerException if the specified element is null. */ public boolean add(E o) { return offer(o); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } } public E poll(long time, TimeUnit unit) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); long nanos = unit.toNanos(time); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { if (delay > nanos) delay = nanos; long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } if (n > 0) available.signalAll(); return n; } finally { lock.unlock(); } } public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } if (n > 0) available.signalAll(); return n; } finally { lock.unlock(); } } /** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * Always returns Integer.MAX_VALUE because * a DelayQueue is not capacity constrained. * @return Integer.MAX_VALUE */ public int remainingCapacity() { return Integer.MAX_VALUE; } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } public T[] toArray(T[] array) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(array); } finally { lock.unlock(); } } public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * Returns an iterator over the elements in this queue. The iterator * does not return the elements in any particular order. The * returned iterator is a thread-safe "fast-fail" iterator that will * throw {@link java.util.ConcurrentModificationException} * upon detected interference. * * @return an iterator over the elements in this queue. */ public Iterator iterator() { final ReentrantLock lock = this.lock; lock.lock(); try { return new Itr(q.iterator()); } finally { lock.unlock(); } } private class Itr implements Iterator { private final Iterator iter; Itr(Iterator i) { iter = i; } public boolean hasNext() { return iter.hasNext(); } public E next() { final ReentrantLock lock = DelayQueue.this.lock; lock.lock(); try { return iter.next(); } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = DelayQueue.this.lock; lock.lock(); try { iter.remove(); } finally { lock.unlock(); } } } }