/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain. Use, modify, and
* redistribute this code in any way without acknowledgement.
*/
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;
/**
* An unbounded {@link BlockingQueue blocking queue} based on a {@link
* PriorityQueue}, obeying its ordering rules and implementation
* characteristics. While this queue is logically unbounded,
* attempted additions may fail due to resource exhaustion (causing
* OutOfMemoryError) when Integer.MAX_VALUE elements
* are held.
* @since 1.5
* @author Doug Lea
*/
public class PriorityBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
private final PriorityQueue q;
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition notEmpty = lock.newCondition();
/**
* Create a PriorityBlockingQueue with the default initial
* capacity
* (11) that orders its elements according to their natural
* ordering (using Comparable.)
*/
public PriorityBlockingQueue() {
q = new PriorityQueue();
}
/**
* Create a PriorityBlockingQueue with the specified initial
* capacity
* that orders its elements according to their natural ordering
* (using Comparable.)
*
* @param initialCapacity the initial capacity for this priority queue.
*/
public PriorityBlockingQueue(int initialCapacity) {
q = new PriorityQueue(initialCapacity, null);
}
/**
* Create a PriorityBlockingQueue with the specified initial
* capacity
* that orders its elements according to the specified comparator.
*
* @param initialCapacity the initial capacity for this priority queue.
* @param comparator the comparator used to order this priority queue.
* If null then the order depends on the elements' natural
* ordering.
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator super E> comparator) {
q = new PriorityQueue(initialCapacity, comparator);
}
/**
* Create a PriorityBlockingQueue containing the elements
* in the specified collection. The priority queue has an initial
* capacity of 110% of the size of the specified collection. If
* the specified collection is a {@link SortedSet} or a {@link
* PriorityQueue}, this priority queue will be sorted according to
* the same comparator, or according to its elements' natural
* order if the collection is sorted according to its elements'
* natural order. Otherwise, this priority queue is ordered
* according to its elements' natural order.
*
* @param c the collection whose elements are to be placed
* into this priority queue.
* @throws ClassCastException if elements of the specified collection
* cannot be compared to one another according to the priority
* queue's ordering.
* @throws NullPointerException if c or any element within it
* is null
*/
public PriorityBlockingQueue(Collection extends E> c) {
q = new PriorityQueue(c);
}
// these first two override just to get the throws docs
/**
* @throws NullPointerException {@inheritDoc}
*/
public boolean add(E element) {
return super.add(element);
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public boolean addAll(Collection extends E> c) {
return super.addAll(c);
}
public Comparator comparator() {
return q.comparator();
}
/**
* @throws NullPointerException if the specified element is null
**/
public boolean offer(E o) {
if (o == null) throw new NullPointerException();
lock.lock();
try {
boolean ok = q.offer(o);
assert ok;
notEmpty.signal();
return true;
}
finally {
lock.unlock();
}
}
public void put(E o) throws InterruptedException {
offer(o); // never need to block
}
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
return offer(o); // never need to block
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (q.size() == 0)
notEmpty.await();
}
catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();
assert x != null;
return x;
}
finally {
lock.unlock();
}
}
public E poll() {
lock.lock();
try {
return q.poll();
}
finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
for (;;) {
E x = q.poll();
if (x != null)
return x;
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
}
catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
}
finally {
lock.unlock();
}
}
public E peek() {
lock.lock();
try {
return q.peek();
}
finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return q.size();
}
finally {
lock.unlock();
}
}
/**
* Always returns Integer.MAX_VALUE because
* PriorityBlockingQueues are not capacity constrained.
* @return Integer.MAX_VALUE
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public boolean remove(Object o) {
lock.lock();
try {
return q.remove(o);
}
finally {
lock.unlock();
}
}
public boolean contains(Object o) {
lock.lock();
try {
return q.contains(o);
}
finally {
lock.unlock();
}
}
public Object[] toArray() {
lock.lock();
try {
return q.toArray();
}
finally {
lock.unlock();
}
}
public String toString() {
lock.lock();
try {
return q.toString();
}
finally {
lock.unlock();
}
}
public T[] toArray(T[] a) {
lock.lock();
try {
return q.toArray(a);
}
finally {
lock.unlock();
}
}
public Iterator iterator() {
lock.lock();
try {
return new Itr(q.iterator());
}
finally {
lock.unlock();
}
}
private class Itr implements Iterator {
private final Iterator iter;
Itr(Iterator i) {
iter = i;
}
public boolean hasNext() {
/*
* No sync -- we rely on underlying hasNext to be
* stateless, in which case we can return true by mistake
* only when next() willl subsequently throw
* ConcurrentModificationException.
*/
return iter.hasNext();
}
public E next() {
lock.lock();
try {
return iter.next();
}
finally {
lock.unlock();
}
}
public void remove() {
lock.lock();
try {
iter.remove();
}
finally {
lock.unlock();
}
}
}
/**
* Save the state to a stream (that is, serialize it). This
* merely wraps default serialization within lock. The
* serialization strategy for items is left to underlying
* Queue. Note that locking is not needed on deserialization, so
* readObject is not defined, just relying on default.
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
lock.lock();
try {
s.defaultWriteObject();
}
finally {
lock.unlock();
}
}
}