/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} based on a * {@link PriorityQueue}, * obeying its ordering rules and implementation characteristics. * While this queue is logically unbounded, * attempted additions may fail due to resource exhaustion (causing * OutOfMemoryError). * *
The Iterator provided in method {@link #iterator()} is
* not guaranteed to traverse the elements of the
* PriorityBlockingQueue in any particular order. If you need ordered
* traversal, consider using Arrays.sort(pq.toArray()).
*
* @since 1.5
* @author Doug Lea
*/
public class PriorityBlockingQueue
* This implementation iterates over the specified collection, and adds
* each object returned by the iterator to this collection, in turn.
* @param c collection whose elements are to be added to this queue
* @return true if this queue changed as a result of the
* call.
* @throws NullPointerException if c or any element in c
* is null
* @throws ClassCastException if any element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
*/
public boolean addAll(Collection extends E> c) {
return super.addAll(c);
}
/**
* Returns the comparator used to order this collection, or null
* if this collection is sorted according to its elements natural ordering
* (using Comparable).
*
* @return the comparator used to order this collection, or null
* if this collection is sorted according to its elements natural ordering.
*/
public Comparator comparator() {
return q.comparator();
}
/**
* Inserts the specified element to this priority queue.
*
* @param o the element to add
* @return true
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
* @throws NullPointerException if the specified element is null.
*/
public boolean offer(E o) {
if (o == null) throw new NullPointerException();
lock.lock();
try {
boolean ok = q.offer(o);
assert ok;
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
/**
* Adds the specified element to this priority queue. As the queue is
* unbounded this method will never block.
* @param o the element to add
* @throws ClassCastException if the element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
* @throws NullPointerException if the specified element is null.
*/
public void put(E o) {
offer(o); // never need to block
}
/**
* Adds the specified element to this priority queue. As the queue is
* unbounded this method will never block.
* @param o the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return true
* @throws ClassCastException if the element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
* @throws NullPointerException if the specified element is null.
*/
public boolean offer(E o, long timeout, TimeUnit unit) {
return offer(o); // never need to block
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (q.size() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();
assert x != null;
return x;
} finally {
lock.unlock();
}
}
public E poll() {
lock.lock();
try {
return q.poll();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
for (;;) {
E x = q.poll();
if (x != null)
return x;
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
public E peek() {
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
/**
* Always returns Integer.MAX_VALUE because
* a PriorityBlockingQueue is not capacity constrained.
* @return Integer.MAX_VALUE
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public boolean remove(Object o) {
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
public boolean contains(Object o) {
lock.lock();
try {
return q.contains(o);
} finally {
lock.unlock();
}
}
public Object[] toArray() {
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
public String toString() {
lock.lock();
try {
return q.toString();
} finally {
lock.unlock();
}
}
/**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
*/
public void clear() {
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
public