/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166x; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on * linked nodes. * *

The optional capacity bound constructor argument serves as a * way to prevent excessive expansion. The capacity, if unspecified, * is equal to {@link Integer#MAX_VALUE}. Linked nodes are * dynamically created upon each insertion unless this would bring the * deque above capacity. * *

Most operations run in constant time (ignoring time spent * blocking). Exceptions include {@link #remove(Object) remove}, * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link * #removeLastOccurrence removeLastOccurrence}, {@link #contains * contains }, {@link #iterator iterator.remove()}, and the bulk * operations, all of which run in linear time. * *

This class and its iterator implement all of the * optional methods of the {@link Collection} and {@link * Iterator} interfaces. This class is a member of the Java Collections * Framework. * * @since 1.6 * @author Doug Lea * @param the type of elements held in this collection */ public class LinkedBlockingDeque extends AbstractQueue implements BlockingDeque, java.io.Serializable { /* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. */ private static final long serialVersionUID = -387911632671998426L; /** Doubly-linked list node class */ static final class Node { E item; Node prev; Node next; Node(E x, Node p, Node n) { item = x; prev = p; next = n; } } /** Pointer to first node */ private transient Node first; /** Pointer to last node */ private transient Node last; /** Number of items in the deque */ private transient int count; /** Maximum number of items in the deque */ private final int capacity; /** Main lock guarding all access */ private final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); /** Condition for waiting puts */ private final Condition notFull = lock.newCondition(); /** * Creates a LinkedBlockingDeque with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } /** * Creates a LinkedBlockingDeque with the given (fixed) * capacity. * @param capacity the capacity of this deque * @throws IllegalArgumentException if capacity is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } /** * Creates a LinkedBlockingDeque with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. * @param c the collection of elements to initially contain * @throws NullPointerException if c or any element within it * is null */ public LinkedBlockingDeque(Collection c) { this(Integer.MAX_VALUE); for (E e : c) add(e); } // Basic linking and unlinking operations, called only while holding lock /** * Link e as first element, or return false if full */ private boolean linkFirst(E e) { if (count >= capacity) return false; ++count; Node f = first; Node x = new Node(e, null, f); first = x; if (last == null) last = x; else f.prev = x; notEmpty.signal(); return true; } /** * Link e as last element, or return false if full */ private boolean linkLast(E e) { if (count >= capacity) return false; ++count; Node l = last; Node x = new Node(e, l, null); last = x; if (first == null) first = x; else l.next = x; notEmpty.signal(); return true; } /** * Remove and return first element, or null if empty */ private E unlinkFirst() { Node f = first; if (f == null) return null; Node n = f.next; first = n; if (n == null) last = null; else n.prev = null; --count; notFull.signal(); return f.item; } /** * Remove and return last element, or null if empty */ private E unlinkLast() { Node l = last; if (l == null) return null; Node p = l.prev; last = p; if (p == null) first = null; else p.next = null; --count; notFull.signal(); return l.item; } /** * Unlink e */ private void unlink(Node x) { Node p = x.prev; Node n = x.next; if (p == null) { if (n == null) first = last = null; else { n.prev = null; first = n; } } else if (n == null) { p.next = null; last = p; } else { p.next = n; n.prev = p; } --count; notFull.signalAll(); } // Deque methods public boolean offerFirst(E o) { if (o == null) throw new NullPointerException(); lock.lock(); try { return linkFirst(o); } finally { lock.unlock(); } } public boolean offerLast(E o) { if (o == null) throw new NullPointerException(); lock.lock(); try { return linkLast(o); } finally { lock.unlock(); } } public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } public E pollFirst() { lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } } public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; } public E peekFirst() { lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } public E peekLast() { lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } } public E getFirst() { E x = peekFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E getLast() { E x = peekLast(); if (x == null) throw new NoSuchElementException(); return x; } // BlockingDeque methods public void putFirst(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); lock.lock(); try { while (!linkFirst(o)) notFull.await(); } finally { lock.unlock(); } } public void putLast(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); lock.lock(); try { while (!linkLast(o)) notFull.await(); } finally { lock.unlock(); } } public E takeFirst() throws InterruptedException { lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E takeLast() throws InterruptedException { lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public boolean offerFirst(E o, long timeout, TimeUnit unit) throws InterruptedException { if (o == null) throw new NullPointerException(); lock.lockInterruptibly(); try { long nanos = unit.toNanos(timeout); for (;;) { if (linkFirst(o)) return true; if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } } finally { lock.unlock(); } } public boolean offerLast(E o, long timeout, TimeUnit unit) throws InterruptedException { if (o == null) throw new NullPointerException(); lock.lockInterruptibly(); try { long nanos = unit.toNanos(timeout); for (;;) { if (linkLast(o)) return true; if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } } finally { lock.unlock(); } } public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { lock.lockInterruptibly(); try { long nanos = unit.toNanos(timeout); for (;;) { E x = unlinkFirst(); if (x != null) return x; if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } } finally { lock.unlock(); } } public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { lock.lockInterruptibly(); try { long nanos = unit.toNanos(timeout); for (;;) { E x = unlinkLast(); if (x != null) return x; if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } } finally { lock.unlock(); } } // Queue and stack methods public boolean offer(E e) { return offerLast(e); } public boolean add(E e) { addLast(e); return true; } public void push(E e) { addFirst(e); } public E poll() { return pollFirst(); } public E remove() { return removeFirst(); } public E pop() { return removeFirst(); } public E peek() { return peekFirst(); } public E element() { return getFirst(); } public boolean remove(Object o) { return removeFirstOccurrence(o); } // BlockingQueue methods public void put(E o) throws InterruptedException { putLast(o); } public E take() throws InterruptedException { return takeFirst(); } public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { return offerLast(o, timeout, unit); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); } /** * Returns the number of elements in this deque. * * @return the number of elements in this deque. */ public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } /** * Returns the number of elements that this deque can ideally (in * the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this deque * less the current size of this deque. *

Note that you cannot always tell if * an attempt to add an element will succeed by * inspecting remainingCapacity because it may be the * case that a waiting consumer is ready to take an * element out of an otherwise full deque. */ public int remainingCapacity() { lock.lock(); try { return capacity - count; } finally { lock.unlock(); } } public boolean contains(Object o) { if (o == null) return false; lock.lock(); try { for (Node p = first; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { lock.unlock(); } } public boolean removeFirstOccurrence(Object e) { if (e == null) throw new NullPointerException(); lock.lock(); try { for (Node p = first; p != null; p = p.next) { if (e.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } public boolean removeLastOccurrence(Object e) { if (e == null) throw new NullPointerException(); lock.lock(); try { for (Node p = last; p != null; p = p.prev) { if (e.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } /** * Variant of removeFirstOccurrence needed by iterator.remove. * Searches for the node, not its contents. */ boolean removeNode(Node e) { lock.lock(); try { for (Node p = first; p != null; p = p.next) { if (p == e) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } public Object[] toArray() { lock.lock(); try { Object[] a = new Object[count]; int k = 0; for (Node p = first; p != null; p = p.next) a[k++] = p.item; return a; } finally { lock.unlock(); } } public T[] toArray(T[] a) { lock.lock(); try { if (a.length < count) a = (T[])java.lang.reflect.Array.newInstance( a.getClass().getComponentType(), count ); int k = 0; for (Node p = first; p != null; p = p.next) a[k++] = (T)p.item; if (a.length > k) a[k] = null; return a; } finally { lock.unlock(); } } public String toString() { lock.lock(); try { return super.toString(); } finally { lock.unlock(); } } /** * Atomically removes all of the elements from this deque. * The deque will be empty after this call returns. */ public void clear() { lock.lock(); try { first = last = null; count = 0; notFull.signalAll(); } finally { lock.unlock(); } } public int drainTo(Collection c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); lock.lock(); try { for (Node p = first; p != null; p = p.next) c.add(p.item); int n = count; count = 0; first = last = null; notFull.signalAll(); return n; } finally { lock.unlock(); } } public int drainTo(Collection c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); lock.lock(); try { int n = 0; while (n < maxElements && first != null) { c.add(first.item); first.prev = null; first = first.next; --count; ++n; } if (first == null) last = null; notFull.signalAll(); return n; } finally { lock.unlock(); } } /** * Returns an iterator over the elements in this deque in proper sequence. * The returned Iterator is a "weakly consistent" iterator that * will never throw {@link java.util.ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * * @return an iterator over the elements in this deque in proper sequence. */ public Iterator iterator() { return new Itr(); } /** * Iterator for LinkedBlockingDeque */ private class Itr implements Iterator { private Node next; /** * nextItem holds on to item fields because once we claim that * an element exists in hasNext(), we must return item read * under lock (in advance()) even if it was in the process of * being removed when hasNext() was called. **/ private E nextItem; /** * Node returned by most recent call to next. Needed by remove. * Reset to null if this element is deleted by a call to remove. */ private Node last; Itr() { advance(); } /** * Advance next, or if not yet initialized, set to first node. */ private void advance() { final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { next = (next == null) ? first : next.next; nextItem = (next == null) ? null : next.item; } finally { lock.unlock(); } } public boolean hasNext() { return next != null; } public E next() { if (next == null) throw new NoSuchElementException(); last = next; E x = nextItem; advance(); return x; } public void remove() { Node n = last; if (n == null) throw new IllegalStateException(); last = null; // Note: removeNode rescans looking for this node to make // sure it was not already removed. Otherwwise, trying to // re-remove could corrupt list. removeNode(n); } } /** * Saves the state to a stream (that is, serializes it). * * @serialData The capacity (int), followed by elements (each an * Object) in the proper order, followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { lock.lock(); try { // Write out capacity and any hidden stuff s.defaultWriteObject(); // Write out all elements in the proper order. for (Node p = first; p != null; p = p.next) s.writeObject(p.item); // Use trailing null as sentinel s.writeObject(null); } finally { lock.unlock(); } } /** * Reconstitutes this deque instance from a stream (that is, * deserializes it). * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); count = 0; first = null; last = null; // Read in all elements and place in queue for (;;) { E item = (E)s.readObject(); if (item == null) break; add(item); } } }