/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * A bounded blocking queue backed by an array. The implementation is * a classic "bounded buffer", in which a fixed-sized array holds * elements inserted by producers and extracted by consumers. Once * created, the capacity can not be increased. Attempts to offer an * element to a full queue will result in the offer operation * blocking; attempts to retrieve an element from an empty queue will * similarly block. * *

This class supports an optional fairness policy for ordering * threads blocked on an insertion or removal. By default, this * ordering is not guaranteed. However, an ArrayBlockingQueue * constructed with fairness set to true grants blocked * threads access in FIFO order. Fairness generally substantially * decreases throughput but reduces variablility and avoids * starvation. * * @since 1.5 * @author Doug Lea */ public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /** The queued items */ private transient final E[] items; /** items index for next take, poll or remove */ private transient int takeIndex; /** items index for next put, offer, or add. */ private transient int putIndex; /** Number of items in the queue */ private int count; /** * An array used only during deserialization, to hold * items read back in from the stream, and then used * as "items" by readResolve via the private constructor. */ private transient E[] deserializedItems; /* * Concurrency control via the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition wor waiting takes */ private final Condition notEmpty; /** Condition for wiating puts */ private final Condition notFull; // Internal helper methods /** * Circularly increment i. */ int inc(int i) { return (++i == items.length)? 0 : i; } /** * Insert element at current put position, advance, and signal. * Call only when holding lock. */ private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } /** * Extract element at current take position, advance, and signal. * Call only when holding lock. */ private E extract() { E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } /** * Utility for remove and iterator.remove: Delete item at position i. * Call only when holding lock. */ void removeAt(int i) { // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; notFull.signal(); } /** * Internal constructor also used by readResolve. * Sets all final fields, plus count. * @param cap the maximumSize * @param array the array to use or null if should create new one * @param count the number of items in the array, where indices 0 * to count-1 hold items. */ private ArrayBlockingQueue(int cap, E[] array, int count, ReentrantLock lk) { if (cap <= 0) throw new IllegalArgumentException(); if (array == null) this.items = (E[]) new Object[cap]; else this.items = array; this.putIndex = count; this.count = count; lock = lk; notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * Creates a new ArrayBlockingQueue with the given (fixed) capacity * and default access policy. * @param maximumSize the capacity */ public ArrayBlockingQueue(int maximumSize) { this(maximumSize, null, 0, new ReentrantLock()); } /** * Creates a new ArrayBlockingQueue with the given (fixed) capacity. * @param maximumSize the capacity * @param fair true if queue access should use a fair policy */ public ArrayBlockingQueue(int maximumSize, boolean fair) { this(maximumSize, null, 0, new ReentrantLock(fair)); } /** Return the number of elements currently in the queue */ public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } /** Return the remaining capacity of the queue, which is the * number of elements that can be inserted before the queue is * full. */ public int remainingCapacity() { lock.lock(); try { return items.length - count; } finally { lock.unlock(); } } /** Insert a new element into the queue, blocking if the queue is full. */ public void put(E x) throws InterruptedException { if (x == null) throw new NullPointerException(); lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(x); } finally { lock.unlock(); } } /** Remove and return the first element from the queue, blocking * if the queue is empty. */ public E take() throws InterruptedException { lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } /** Attempt to insert a new element into the queue, but return * immediately without inserting the element if the queue is full. * @return true if the element was inserted successfully, * false otherwise */ public boolean offer(E x) { if (x == null) throw new NullPointerException(); lock.lock(); try { if (count == items.length) return false; else { insert(x); return true; } } finally { lock.unlock(); } } /** Attempt to retrieve the first insert element from the queue, * but return immediately if the queue is empty. * @return The first element of the queue if the queue is not * empty, or null otherwise. */ public E poll() { lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } /** Attempt to insert a new element into the queue. If the queue * is full, wait up to the specified amount of time before giving * up. * @param x the element to be inserted * @param timeout how long to wait before giving up, in units of * unit * @param unit a TimeUnit determining how to interpret the timeout * parameter * @return true if the element was inserted successfully, * false otherwise * @throws InterruptedException if interrupted while waiting */ public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException { if (x == null) throw new NullPointerException(); lock.lockInterruptibly(); long nanos = unit.toNanos(timeout); try { for (;;) { if (count != items.length) { insert(x); return true; } if (nanos <= 0) return false; try { nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } /** * Attempt to retrieve the first insert element from the queue. * If the queue is empty, wait up to the specified amount of time * before giving up. * @param timeout how long to wait before giving up, in units of * unit * @param unit a TimeUnit determining how to interpret the timeout * parameter * @return The first element of the queue if an item was * successfully retrieved, or null otherwise. * @throws InterruptedException if interrupted while waiting * */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { lock.lockInterruptibly(); long nanos = unit.toNanos(timeout); try { for (;;) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } /** Return, but do not remove, the first element from the queue, * if the queue is not empty. This will return the same result as * poll, but will not remove it from the queue. * @return The first element of the queue if the queue is not * empty, or null otherwise. */ public E peek() { lock.lock(); try { return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } } public boolean remove(Object x) { if (x == null) return false; lock.lock(); try { int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (x.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } } public boolean contains(Object x) { if (x == null) return false; lock.lock(); try { int i = takeIndex; int k = 0; while (k++ < count) { if (x.equals(items[i])) return true; i = inc(i); } return false; } finally { lock.unlock(); } } public Object[] toArray() { lock.lock(); try { E[] a = (E[]) new Object[count]; int k = 0; int i = takeIndex; while (k < count) { a[k++] = items[i]; i = inc(i); } return a; } finally { lock.unlock(); } } public T[] toArray(T[] a) { lock.lock(); try { if (a.length < count) a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count); int k = 0; int i = takeIndex; while (k < count) { a[k++] = (T)items[i]; i = inc(i); } if (a.length > count) a[count] = null; return a; } finally { lock.unlock(); } } public String toString() { lock.lock(); try { return super.toString(); } finally { lock.unlock(); } } /** * Returns an iterator over the elements in this queue in proper sequence. * * @return an iterator over the elements in this queue in proper sequence. */ public Iterator iterator() { lock.lock(); try { return new Itr(); } finally { lock.unlock(); } } /** * Iterator for ArrayBlockingQueue */ private class Itr implements Iterator { /** * Index of element to be returned by next, * or a negative number if no such. */ private int nextIndex; /** * nextItem holds on to item fields because once we claim * that an element exists in hasNext(), we must return it in * the following next() call even if it was in the process of * being removed when hasNext() was called. **/ private E nextItem; /** * Index of element returned by most recent call to next. * Reset to -1 if this element is deleted by a call to remove. */ private int lastRet; Itr() { lastRet = -1; if (count == 0) nextIndex = -1; else { nextIndex = takeIndex; nextItem = items[takeIndex]; } } public boolean hasNext() { /* * No sync. We can return true by mistake here * only if this iterator passed across threads, * which we don't support anyway. */ return nextIndex >= 0; } /** * Check whether nextIndex is valied; if so setting nextItem. * Stops iterator when either hits putIndex or sees null item. */ private void checkNext() { if (nextIndex == putIndex) { nextIndex = -1; nextItem = null; } else { nextItem = items[nextIndex]; if (nextItem == null) nextIndex = -1; } } public E next() { lock.lock(); try { if (nextIndex < 0) throw new NoSuchElementException(); lastRet = nextIndex; E x = nextItem; nextIndex = inc(nextIndex); checkNext(); return x; } finally { lock.unlock(); } } public void remove() { lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; int ti = takeIndex; removeAt(i); // back up cursor (reset to front if was first element) nextIndex = (i == ti) ? takeIndex : i; checkNext(); } finally { lock.unlock(); } } } /** * Save the state to a stream (that is, serialize it). * * @serialData The maximumSize is emitted (int), followed by all of * its elements (each an E) in the proper order. * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { // Write out element count, and any hidden stuff s.defaultWriteObject(); // Write out maximumSize == items length s.writeInt(items.length); // Write out all elements in the proper order. int i = takeIndex; int k = 0; while (k++ < count) { s.writeObject(items[i]); i = inc(i); } } /** * Reconstitute the Queue instance from a stream (that is, * deserialize it). * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { // Read in size, and any hidden stuff s.defaultReadObject(); int size = count; // Read in array length and allocate array int arrayLength = s.readInt(); // We use deserializedItems here because "items" is final deserializedItems = (E[]) new Object[arrayLength]; // Read in all elements in the proper order into deserializedItems for (int i = 0; i < size; i++) deserializedItems[i] = (E)s.readObject(); } /** * Throw away the object created with readObject, and replace it * with a usable ArrayBlockingQueue. * @return the ArrayBlockingQueue */ private Object readResolve() throws java.io.ObjectStreamException { E[] array = deserializedItems; deserializedItems = null; return new ArrayBlockingQueue(array.length, array, count, lock); } }