/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * A bounded {@linkplain BlockingQueue blocking queue} backed by an array. * This queue orders elements FIFO (first-in-first-out). * The head of the queue is that element that has been on the * queue the longest time. * The tail of the queue is that element that has been on the * queue the shortest time. * *
This is a classic "bounded buffer", in which a fixed-sized * array holds * elements inserted by producers and extracted by consumers. Once * created, the capacity can not be increased. Attempts to offer an * element to a full queue will result in the offer operation * blocking; attempts to retrieve an element from an empty queue will * similarly block. * *
This class supports an optional fairness policy for ordering
* threads blocked on an insertion or removal. By default, this
* ordering is not guaranteed. However, an ArrayBlockingQueue
* constructed with fairness set to true grants blocked
* threads access in FIFO order. Fairness generally substantially
* decreases throughput but reduces variablility and avoids
* starvation.
*
* @since 1.5
* @author Doug Lea
*/
public class ArrayBlockingQueue
* This implementation iterates over the specified collection, and adds
* each object returned by the iterator to this queue's tail, in turn.
* @throws IllegalStateException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean addAll(Collection extends E> c) {
return super.addAll(c);
}
/**
* Adds the specified element to the tail of this queue if possible,
* returning immediately if this queue is full.
*
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E o) {
if (o == null) throw new NullPointerException();
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(o);
return true;
}
}
finally {
lock.unlock();
}
}
/**
* Adds the specified element to the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
lock.lockInterruptibly();
try {
long nanos = unit.toNanos(timeout);
for (;;) {
if (count != items.length) {
insert(o);
return true;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
}
catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
}
finally {
lock.unlock();
}
}
public E poll() {
lock.lock();
try {
if (count == 0)
return null;
E x = extract();
return x;
}
finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
lock.lockInterruptibly();
try {
long nanos = unit.toNanos(timeout);
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
}
catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
}
finally {
lock.unlock();
}
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present. More formally,
* removes an element e such that (o==null ? e==null :
* o.equals(e)), if the queue contains one or more such
* elements. Returns true if the queue contained the
* specified element (or equivalently, if the queue changed as a
* result of the call).
*
* This implementation iterates over the queue looking for the
* specified element. If it finds the element, it removes the element
* from the queue using the iterator's remove method.
*
*/
public boolean remove(Object o) {
if (o == null) return false;
lock.lock();
try {
int i = takeIndex;
int k = 0;
for (;;) {
if (k++ >= count)
return false;
if (o.equals(items[i])) {
removeAt(i);
return true;
}
i = inc(i);
}
}
finally {
lock.unlock();
}
}
public E peek() {
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
}
finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
}
catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
}
finally {
lock.unlock();
}
}
/**
* Adds the specified element to the tail of this queue, waiting if
* necessary for space to become available.
* @throws NullPointerException {@inheritDoc}
*/
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
}
catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(o);
}
finally {
lock.unlock();
}
}
// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this collection.
*/
public int size() {
lock.lock();
try {
return count;
}
finally {
lock.unlock();
}
}
// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of elements that this queue can ideally (in
* the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current size of this queue.
* Note that you cannot always tell if
* an attempt to add an element will succeed by
* inspecting remainingCapacity because it may be the
* case that a waiting consumer is ready to take an
* element out of an otherwise full queue.
*/
public int remainingCapacity() {
lock.lock();
try {
return items.length - count;
}
finally {
lock.unlock();
}
}
public boolean contains(Object o) {
if (o == null) return false;
lock.lock();
try {
int i = takeIndex;
int k = 0;
while (k++ < count) {
if (o.equals(items[i]))
return true;
i = inc(i);
}
return false;
}
finally {
lock.unlock();
}
}
public Object[] toArray() {
lock.lock();
try {
E[] a = (E[]) new Object[count];
int k = 0;
int i = takeIndex;
while (k < count) {
a[k++] = items[i];
i = inc(i);
}
return a;
}
finally {
lock.unlock();
}
}
public