/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import java.util.*; /** * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on * linked nodes. * This queue orders elements FIFO (first-in-first-out). * The head of the queue is that element that has been on the * queue the longest time. * The tail of the queue is that element that has been on the * queue the shortest time. * Linked queues typically have higher throughput than array-based queues but * less predictable performance in most concurrent applications. * *
The optional capacity bound constructor argument serves as a
* way to prevent excessive queue expansion. The capacity, if unspecified,
* is equal to {@link Integer#MAX_VALUE}. Linked nodes are
* dynamically created upon each insertion unless this would bring the
* queue above capacity.
*
* @since 1.5
* @author Doug Lea
*
**/
public class LinkedBlockingQueue
* This implementation iterates over the specified collection, and adds
* each object returned by the iterator to this queue's tail, in turn.
* @throws IllegalStateException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean addAll(Collection extends E> c) {
return super.addAll(c);
}
// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this collection.
*/
public int size() {
return count.get();
}
// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of elements that this queue can ideally (in
* the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current size of this queue.
* Note that you cannot always tell if
* an attempt to add an element will succeed by
* inspecting remainingCapacity because it may be the
* case that a waiting consumer is ready to take an
* element out of an otherwise full queue.
*/
public int remainingCapacity() {
return capacity - count.get();
}
/**
* Adds the specified element to the tail of this queue, waiting if
* necessary for space to become available.
* @throws NullPointerException {@inheritDoc}
*/
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() == capacity)
notFull.await();
}
catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* Adds the specified element to the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
putLock.lockInterruptibly();
try {
for (;;) {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
}
catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
}
finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/**
* Adds the specified element to the tail of this queue if possible,
* returning immediately if this queue is full.
*
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E o) {
if (o == null) throw new NullPointerException();
if (count.get() == capacity)
return false;
int c = -1;
putLock.lock();
try {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
}
finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E take() throws InterruptedException {
E x;
int c = -1;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
}
catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
takeLock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
}
catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
}
finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
if (count.get() == 0)
return null;
E x = null;
int c = -1;
takeLock.tryLock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
}
finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
if (count.get() == 0)
return null;
takeLock.lock();
try {
Node This implementation iterates over the queue looking for the
* specified element. If it finds the element, it removes the element
* from the queue using the iterator's remove method.
*
*/
public boolean remove(Object o) {
if (o == null) return false;
boolean removed = false;
fullyLock();
try {
Node