/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} based on a * {@link PriorityQueue}, * obeying its ordering rules and implementation characteristics. * While this queue is logically unbounded, * attempted additions may fail due to resource exhaustion (causing * OutOfMemoryError). * *
The Iterator provided in method {@link #iterator()} is
* not guaranteed to traverse the elements of the
* PriorityBlockingQueue in any particular order. If you need ordered
* traversal, consider using Arrays.sort(pq.toArray()).
*
* @since 1.5
* @author Doug Lea
*/
public class PriorityBlockingQueue
* This implementation iterates over the specified collection, and adds
* each object returned by the iterator to this collection, in turn.
* @throws NullPointerException {@inheritDoc}
* @throws ClassCastException if any element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
*/
public boolean addAll(Collection extends E> c) {
return super.addAll(c);
}
/**
* Returns the comparator used to order this collection, or null
* if this collection is sorted according to its elements natural ordering
* (using Comparable).
*
* @return the comparator used to order this collection, or null
* if this collection is sorted according to its elements natural ordering.
*/
public Comparator comparator() {
return q.comparator();
}
/**
* Adds the specified element to this priority queue.
*
* @return true
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E o) {
if (o == null) throw new NullPointerException();
lock.lock();
try {
boolean ok = q.offer(o);
assert ok;
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
/**
* Adds the specified element to this priority queue. As the queue is
* unbounded this method will never block.
* @throws ClassCastException if the element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
* @throws NullPointerException {@inheritDoc}
*/
public void put(E o) {
offer(o); // never need to block
}
/**
* Adds the specified element to this priority queue. As the queue is
* unbounded this method will never block.
* @param o {@inheritDoc}
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @throws ClassCastException if the element cannot be compared
* with elements currently in the priority queue according
* to the priority queue's ordering.
* @throws NullPointerException {@inheritDoc}
* @return true
*/
public boolean offer(E o, long timeout, TimeUnit unit) {
return offer(o); // never need to block
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (q.size() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();
assert x != null;
return x;
} finally {
lock.unlock();
}
}
public E poll() {
lock.lock();
try {
return q.poll();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
for (;;) {
E x = q.poll();
if (x != null)
return x;
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
public E peek() {
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
/**
* Always returns Integer.MAX_VALUE because
* a PriorityBlockingQueue is not capacity constrained.
* @return Integer.MAX_VALUE
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present. More formally,
* removes an element e such that (o==null ? e==null :
* o.equals(e)), if the queue contains one or more such
* elements. Returns true if the queue contained the
* specified element (or equivalently, if the queue changed as a
* result of the call).
*
* This implementation iterates over the queue looking for the
* specified element. If it finds the element, it removes the element
* from the queue using the iterator's remove method.
*
*/
public boolean remove(Object o) {
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
public boolean contains(Object o) {
lock.lock();
try {
return q.contains(o);
} finally {
lock.unlock();
}
}
public Object[] toArray() {
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
public String toString() {
lock.lock();
try {
return q.toString();
} finally {
lock.unlock();
}
}
/**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
*/
public void clear() {
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
public