ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.4
Committed: Tue Jan 3 04:52:20 2017 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +28 -32 lines
Log Message:
backport Spliterator from src/main

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.AbstractQueue;
10     import java.util.Arrays;
11     import java.util.Collection;
12     import java.util.Comparator;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.4 import java.util.Objects;
16 jsr166 1.1 import java.util.PriorityQueue;
17     import java.util.Queue;
18     import java.util.SortedSet;
19     import java.util.Spliterator;
20     import java.util.concurrent.locks.Condition;
21     import java.util.concurrent.locks.ReentrantLock;
22     import java.util.function.Consumer;
23    
24     /**
25     * An unbounded {@linkplain BlockingQueue blocking queue} that uses
26     * the same ordering rules as class {@link PriorityQueue} and supplies
27     * blocking retrieval operations. While this queue is logically
28     * unbounded, attempted additions may fail due to resource exhaustion
29     * (causing {@code OutOfMemoryError}). This class does not permit
30     * {@code null} elements. A priority queue relying on {@linkplain
31     * Comparable natural ordering} also does not permit insertion of
32     * non-comparable objects (doing so results in
33     * {@code ClassCastException}).
34     *
35     * <p>This class and its iterator implement all of the
36     * <em>optional</em> methods of the {@link Collection} and {@link
37     * Iterator} interfaces. The Iterator provided in method {@link
38     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
39     * the PriorityBlockingQueue in any particular order. If you need
40     * ordered traversal, consider using
41     * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
42     * can be used to <em>remove</em> some or all elements in priority
43     * order and place them in another collection.
44     *
45     * <p>Operations on this class make no guarantees about the ordering
46     * of elements with equal priority. If you need to enforce an
47     * ordering, you can define custom classes or comparators that use a
48     * secondary key to break ties in primary priority values. For
49     * example, here is a class that applies first-in-first-out
50     * tie-breaking to comparable elements. To use it, you would insert a
51     * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
52     *
53     * <pre> {@code
54     * class FIFOEntry<E extends Comparable<? super E>>
55     * implements Comparable<FIFOEntry<E>> {
56     * static final AtomicLong seq = new AtomicLong(0);
57     * final long seqNum;
58     * final E entry;
59     * public FIFOEntry(E entry) {
60     * seqNum = seq.getAndIncrement();
61     * this.entry = entry;
62     * }
63     * public E getEntry() { return entry; }
64     * public int compareTo(FIFOEntry<E> other) {
65     * int res = entry.compareTo(other.entry);
66     * if (res == 0 && other.entry != this.entry)
67     * res = (seqNum < other.seqNum ? -1 : 1);
68     * return res;
69     * }
70     * }}</pre>
71     *
72     * <p>This class is a member of the
73     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
74     * Java Collections Framework</a>.
75     *
76     * @since 1.5
77     * @author Doug Lea
78     * @param <E> the type of elements held in this queue
79     */
80     @SuppressWarnings("unchecked")
81     public class PriorityBlockingQueue<E> extends AbstractQueue<E>
82     implements BlockingQueue<E>, java.io.Serializable {
83     private static final long serialVersionUID = 5595510919245408276L;
84    
85     /*
86     * The implementation uses an array-based binary heap, with public
87     * operations protected with a single lock. However, allocation
88     * during resizing uses a simple spinlock (used only while not
89     * holding main lock) in order to allow takes to operate
90     * concurrently with allocation. This avoids repeated
91     * postponement of waiting consumers and consequent element
92     * build-up. The need to back away from lock during allocation
93     * makes it impossible to simply wrap delegated
94     * java.util.PriorityQueue operations within a lock, as was done
95     * in a previous version of this class. To maintain
96     * interoperability, a plain PriorityQueue is still used during
97     * serialization, which maintains compatibility at the expense of
98     * transiently doubling overhead.
99     */
100    
101     /**
102     * Default array capacity.
103     */
104     private static final int DEFAULT_INITIAL_CAPACITY = 11;
105    
106     /**
107     * The maximum size of array to allocate.
108     * Some VMs reserve some header words in an array.
109     * Attempts to allocate larger arrays may result in
110     * OutOfMemoryError: Requested array size exceeds VM limit
111     */
112     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
113    
114     /**
115     * Priority queue represented as a balanced binary heap: the two
116     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
117     * priority queue is ordered by comparator, or by the elements'
118     * natural ordering, if comparator is null: For each node n in the
119     * heap and each descendant d of n, n <= d. The element with the
120     * lowest value is in queue[0], assuming the queue is nonempty.
121     */
122     private transient Object[] queue;
123    
124     /**
125     * The number of elements in the priority queue.
126     */
127     private transient int size;
128    
129     /**
130     * The comparator, or null if priority queue uses elements'
131     * natural ordering.
132     */
133     private transient Comparator<? super E> comparator;
134    
135     /**
136     * Lock used for all public operations.
137     */
138     private final ReentrantLock lock;
139    
140     /**
141     * Condition for blocking when empty.
142     */
143     private final Condition notEmpty;
144    
145     /**
146     * Spinlock for allocation, acquired via CAS.
147     */
148     private transient volatile int allocationSpinLock;
149    
150     /**
151     * A plain PriorityQueue used only for serialization,
152     * to maintain compatibility with previous versions
153     * of this class. Non-null only during serialization/deserialization.
154     */
155     private PriorityQueue<E> q;
156    
157     /**
158     * Creates a {@code PriorityBlockingQueue} with the default
159     * initial capacity (11) that orders its elements according to
160     * their {@linkplain Comparable natural ordering}.
161     */
162     public PriorityBlockingQueue() {
163     this(DEFAULT_INITIAL_CAPACITY, null);
164     }
165    
166     /**
167     * Creates a {@code PriorityBlockingQueue} with the specified
168     * initial capacity that orders its elements according to their
169     * {@linkplain Comparable natural ordering}.
170     *
171     * @param initialCapacity the initial capacity for this priority queue
172     * @throws IllegalArgumentException if {@code initialCapacity} is less
173     * than 1
174     */
175     public PriorityBlockingQueue(int initialCapacity) {
176     this(initialCapacity, null);
177     }
178    
179     /**
180     * Creates a {@code PriorityBlockingQueue} with the specified initial
181     * capacity that orders its elements according to the specified
182     * comparator.
183     *
184     * @param initialCapacity the initial capacity for this priority queue
185     * @param comparator the comparator that will be used to order this
186     * priority queue. If {@code null}, the {@linkplain Comparable
187     * natural ordering} of the elements will be used.
188     * @throws IllegalArgumentException if {@code initialCapacity} is less
189     * than 1
190     */
191     public PriorityBlockingQueue(int initialCapacity,
192     Comparator<? super E> comparator) {
193     if (initialCapacity < 1)
194     throw new IllegalArgumentException();
195     this.lock = new ReentrantLock();
196     this.notEmpty = lock.newCondition();
197     this.comparator = comparator;
198     this.queue = new Object[initialCapacity];
199     }
200    
201     /**
202     * Creates a {@code PriorityBlockingQueue} containing the elements
203     * in the specified collection. If the specified collection is a
204     * {@link SortedSet} or a {@link PriorityQueue}, this
205     * priority queue will be ordered according to the same ordering.
206     * Otherwise, this priority queue will be ordered according to the
207     * {@linkplain Comparable natural ordering} of its elements.
208     *
209     * @param c the collection whose elements are to be placed
210     * into this priority queue
211     * @throws ClassCastException if elements of the specified collection
212     * cannot be compared to one another according to the priority
213     * queue's ordering
214     * @throws NullPointerException if the specified collection or any
215     * of its elements are null
216     */
217     public PriorityBlockingQueue(Collection<? extends E> c) {
218     this.lock = new ReentrantLock();
219     this.notEmpty = lock.newCondition();
220     boolean heapify = true; // true if not known to be in heap order
221     boolean screen = true; // true if must screen for nulls
222     if (c instanceof SortedSet<?>) {
223     SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
224     this.comparator = (Comparator<? super E>) ss.comparator();
225     heapify = false;
226     }
227     else if (c instanceof PriorityBlockingQueue<?>) {
228     PriorityBlockingQueue<? extends E> pq =
229     (PriorityBlockingQueue<? extends E>) c;
230     this.comparator = (Comparator<? super E>) pq.comparator();
231     screen = false;
232     if (pq.getClass() == PriorityBlockingQueue.class) // exact match
233     heapify = false;
234     }
235     Object[] a = c.toArray();
236     int n = a.length;
237     // If c.toArray incorrectly doesn't return Object[], copy it.
238     if (a.getClass() != Object[].class)
239     a = Arrays.copyOf(a, n, Object[].class);
240     if (screen && (n == 1 || this.comparator != null)) {
241     for (int i = 0; i < n; ++i)
242     if (a[i] == null)
243     throw new NullPointerException();
244     }
245     this.queue = a;
246     this.size = n;
247     if (heapify)
248     heapify();
249     }
250    
251     /**
252     * Tries to grow array to accommodate at least one more element
253     * (but normally expand by about 50%), giving up (allowing retry)
254     * on contention (which we expect to be rare). Call only while
255     * holding lock.
256     *
257     * @param array the heap array
258     * @param oldCap the length of the array
259     */
260     private void tryGrow(Object[] array, int oldCap) {
261     lock.unlock(); // must release and then re-acquire main lock
262     Object[] newArray = null;
263     if (allocationSpinLock == 0 &&
264     U.compareAndSwapInt(this, ALLOCATIONSPINLOCK, 0, 1)) {
265     try {
266     int newCap = oldCap + ((oldCap < 64) ?
267     (oldCap + 2) : // grow faster if small
268     (oldCap >> 1));
269     if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
270     int minCap = oldCap + 1;
271     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
272     throw new OutOfMemoryError();
273     newCap = MAX_ARRAY_SIZE;
274     }
275     if (newCap > oldCap && queue == array)
276     newArray = new Object[newCap];
277     } finally {
278     allocationSpinLock = 0;
279     }
280     }
281     if (newArray == null) // back off if another thread is allocating
282     Thread.yield();
283     lock.lock();
284     if (newArray != null && queue == array) {
285     queue = newArray;
286     System.arraycopy(array, 0, newArray, 0, oldCap);
287     }
288     }
289    
290     /**
291     * Mechanics for poll(). Call only while holding lock.
292     */
293     private E dequeue() {
294     int n = size - 1;
295     if (n < 0)
296     return null;
297     else {
298     Object[] array = queue;
299     E result = (E) array[0];
300     E x = (E) array[n];
301     array[n] = null;
302     Comparator<? super E> cmp = comparator;
303     if (cmp == null)
304     siftDownComparable(0, x, array, n);
305     else
306     siftDownUsingComparator(0, x, array, n, cmp);
307     size = n;
308     return result;
309     }
310     }
311    
312     /**
313     * Inserts item x at position k, maintaining heap invariant by
314     * promoting x up the tree until it is greater than or equal to
315     * its parent, or is the root.
316     *
317 jsr166 1.2 * To simplify and speed up coercions and comparisons, the
318 jsr166 1.1 * Comparable and Comparator versions are separated into different
319     * methods that are otherwise identical. (Similarly for siftDown.)
320     *
321     * @param k the position to fill
322     * @param x the item to insert
323     * @param array the heap array
324     */
325     private static <T> void siftUpComparable(int k, T x, Object[] array) {
326     Comparable<? super T> key = (Comparable<? super T>) x;
327     while (k > 0) {
328     int parent = (k - 1) >>> 1;
329     Object e = array[parent];
330     if (key.compareTo((T) e) >= 0)
331     break;
332     array[k] = e;
333     k = parent;
334     }
335     array[k] = key;
336     }
337    
338     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
339     Comparator<? super T> cmp) {
340     while (k > 0) {
341     int parent = (k - 1) >>> 1;
342     Object e = array[parent];
343     if (cmp.compare(x, (T) e) >= 0)
344     break;
345     array[k] = e;
346     k = parent;
347     }
348     array[k] = x;
349     }
350    
351     /**
352     * Inserts item x at position k, maintaining heap invariant by
353     * demoting x down the tree repeatedly until it is less than or
354     * equal to its children or is a leaf.
355     *
356     * @param k the position to fill
357     * @param x the item to insert
358     * @param array the heap array
359     * @param n heap size
360     */
361     private static <T> void siftDownComparable(int k, T x, Object[] array,
362     int n) {
363     if (n > 0) {
364     Comparable<? super T> key = (Comparable<? super T>)x;
365     int half = n >>> 1; // loop while a non-leaf
366     while (k < half) {
367     int child = (k << 1) + 1; // assume left child is least
368     Object c = array[child];
369     int right = child + 1;
370     if (right < n &&
371     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
372     c = array[child = right];
373     if (key.compareTo((T) c) <= 0)
374     break;
375     array[k] = c;
376     k = child;
377     }
378     array[k] = key;
379     }
380     }
381    
382     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
383     int n,
384     Comparator<? super T> cmp) {
385     if (n > 0) {
386     int half = n >>> 1;
387     while (k < half) {
388     int child = (k << 1) + 1;
389     Object c = array[child];
390     int right = child + 1;
391     if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
392     c = array[child = right];
393     if (cmp.compare(x, (T) c) <= 0)
394     break;
395     array[k] = c;
396     k = child;
397     }
398     array[k] = x;
399     }
400     }
401    
402     /**
403     * Establishes the heap invariant (described above) in the entire tree,
404     * assuming nothing about the order of the elements prior to the call.
405     */
406     private void heapify() {
407     Object[] array = queue;
408     int n = size;
409     int half = (n >>> 1) - 1;
410     Comparator<? super E> cmp = comparator;
411     if (cmp == null) {
412     for (int i = half; i >= 0; i--)
413     siftDownComparable(i, (E) array[i], array, n);
414     }
415     else {
416     for (int i = half; i >= 0; i--)
417     siftDownUsingComparator(i, (E) array[i], array, n, cmp);
418     }
419     }
420    
421     /**
422     * Inserts the specified element into this priority queue.
423     *
424     * @param e the element to add
425     * @return {@code true} (as specified by {@link Collection#add})
426     * @throws ClassCastException if the specified element cannot be compared
427     * with elements currently in the priority queue according to the
428     * priority queue's ordering
429     * @throws NullPointerException if the specified element is null
430     */
431     public boolean add(E e) {
432     return offer(e);
433     }
434    
435     /**
436     * Inserts the specified element into this priority queue.
437     * As the queue is unbounded, this method will never return {@code false}.
438     *
439     * @param e the element to add
440     * @return {@code true} (as specified by {@link Queue#offer})
441     * @throws ClassCastException if the specified element cannot be compared
442     * with elements currently in the priority queue according to the
443     * priority queue's ordering
444     * @throws NullPointerException if the specified element is null
445     */
446     public boolean offer(E e) {
447     if (e == null)
448     throw new NullPointerException();
449     final ReentrantLock lock = this.lock;
450     lock.lock();
451     int n, cap;
452     Object[] array;
453     while ((n = size) >= (cap = (array = queue).length))
454     tryGrow(array, cap);
455     try {
456     Comparator<? super E> cmp = comparator;
457     if (cmp == null)
458     siftUpComparable(n, e, array);
459     else
460     siftUpUsingComparator(n, e, array, cmp);
461     size = n + 1;
462     notEmpty.signal();
463     } finally {
464     lock.unlock();
465     }
466     return true;
467     }
468    
469     /**
470     * Inserts the specified element into this priority queue.
471     * As the queue is unbounded, this method will never block.
472     *
473     * @param e the element to add
474     * @throws ClassCastException if the specified element cannot be compared
475     * with elements currently in the priority queue according to the
476     * priority queue's ordering
477     * @throws NullPointerException if the specified element is null
478     */
479     public void put(E e) {
480     offer(e); // never need to block
481     }
482    
483     /**
484     * Inserts the specified element into this priority queue.
485     * As the queue is unbounded, this method will never block or
486     * return {@code false}.
487     *
488     * @param e the element to add
489     * @param timeout This parameter is ignored as the method never blocks
490     * @param unit This parameter is ignored as the method never blocks
491     * @return {@code true} (as specified by
492     * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
493     * @throws ClassCastException if the specified element cannot be compared
494     * with elements currently in the priority queue according to the
495     * priority queue's ordering
496     * @throws NullPointerException if the specified element is null
497     */
498     public boolean offer(E e, long timeout, TimeUnit unit) {
499     return offer(e); // never need to block
500     }
501    
502     public E poll() {
503     final ReentrantLock lock = this.lock;
504     lock.lock();
505     try {
506     return dequeue();
507     } finally {
508     lock.unlock();
509     }
510     }
511    
512     public E take() throws InterruptedException {
513     final ReentrantLock lock = this.lock;
514     lock.lockInterruptibly();
515     E result;
516     try {
517     while ( (result = dequeue()) == null)
518     notEmpty.await();
519     } finally {
520     lock.unlock();
521     }
522     return result;
523     }
524    
525     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
526     long nanos = unit.toNanos(timeout);
527     final ReentrantLock lock = this.lock;
528     lock.lockInterruptibly();
529     E result;
530     try {
531     while ( (result = dequeue()) == null && nanos > 0)
532     nanos = notEmpty.awaitNanos(nanos);
533     } finally {
534     lock.unlock();
535     }
536     return result;
537     }
538    
539     public E peek() {
540     final ReentrantLock lock = this.lock;
541     lock.lock();
542     try {
543     return (size == 0) ? null : (E) queue[0];
544     } finally {
545     lock.unlock();
546     }
547     }
548    
549     /**
550     * Returns the comparator used to order the elements in this queue,
551     * or {@code null} if this queue uses the {@linkplain Comparable
552     * natural ordering} of its elements.
553     *
554     * @return the comparator used to order the elements in this queue,
555     * or {@code null} if this queue uses the natural
556     * ordering of its elements
557     */
558     public Comparator<? super E> comparator() {
559     return comparator;
560     }
561    
562     public int size() {
563     final ReentrantLock lock = this.lock;
564     lock.lock();
565     try {
566     return size;
567     } finally {
568     lock.unlock();
569     }
570     }
571    
572     /**
573     * Always returns {@code Integer.MAX_VALUE} because
574     * a {@code PriorityBlockingQueue} is not capacity constrained.
575     * @return {@code Integer.MAX_VALUE} always
576     */
577     public int remainingCapacity() {
578     return Integer.MAX_VALUE;
579     }
580    
581     private int indexOf(Object o) {
582     if (o != null) {
583     Object[] array = queue;
584     int n = size;
585     for (int i = 0; i < n; i++)
586     if (o.equals(array[i]))
587     return i;
588     }
589     return -1;
590     }
591    
592     /**
593     * Removes the ith element from queue.
594     */
595     private void removeAt(int i) {
596     Object[] array = queue;
597     int n = size - 1;
598     if (n == i) // removed last element
599     array[i] = null;
600     else {
601     E moved = (E) array[n];
602     array[n] = null;
603     Comparator<? super E> cmp = comparator;
604     if (cmp == null)
605     siftDownComparable(i, moved, array, n);
606     else
607     siftDownUsingComparator(i, moved, array, n, cmp);
608     if (array[i] == moved) {
609     if (cmp == null)
610     siftUpComparable(i, moved, array);
611     else
612     siftUpUsingComparator(i, moved, array, cmp);
613     }
614     }
615     size = n;
616     }
617    
618     /**
619     * Removes a single instance of the specified element from this queue,
620     * if it is present. More formally, removes an element {@code e} such
621     * that {@code o.equals(e)}, if this queue contains one or more such
622     * elements. Returns {@code true} if and only if this queue contained
623     * the specified element (or equivalently, if this queue changed as a
624     * result of the call).
625     *
626     * @param o element to be removed from this queue, if present
627     * @return {@code true} if this queue changed as a result of the call
628     */
629     public boolean remove(Object o) {
630     final ReentrantLock lock = this.lock;
631     lock.lock();
632     try {
633     int i = indexOf(o);
634     if (i == -1)
635     return false;
636     removeAt(i);
637     return true;
638     } finally {
639     lock.unlock();
640     }
641     }
642    
643     /**
644     * Identity-based version for use in Itr.remove.
645     */
646     void removeEQ(Object o) {
647     final ReentrantLock lock = this.lock;
648     lock.lock();
649     try {
650     Object[] array = queue;
651     for (int i = 0, n = size; i < n; i++) {
652     if (o == array[i]) {
653     removeAt(i);
654     break;
655     }
656     }
657     } finally {
658     lock.unlock();
659     }
660     }
661    
662     /**
663     * Returns {@code true} if this queue contains the specified element.
664     * More formally, returns {@code true} if and only if this queue contains
665     * at least one element {@code e} such that {@code o.equals(e)}.
666     *
667     * @param o object to be checked for containment in this queue
668     * @return {@code true} if this queue contains the specified element
669     */
670     public boolean contains(Object o) {
671     final ReentrantLock lock = this.lock;
672     lock.lock();
673     try {
674     return indexOf(o) != -1;
675     } finally {
676     lock.unlock();
677     }
678     }
679    
680     public String toString() {
681     return Helpers.collectionToString(this);
682     }
683    
684     /**
685     * @throws UnsupportedOperationException {@inheritDoc}
686     * @throws ClassCastException {@inheritDoc}
687     * @throws NullPointerException {@inheritDoc}
688     * @throws IllegalArgumentException {@inheritDoc}
689     */
690     public int drainTo(Collection<? super E> c) {
691     return drainTo(c, Integer.MAX_VALUE);
692     }
693    
694     /**
695     * @throws UnsupportedOperationException {@inheritDoc}
696     * @throws ClassCastException {@inheritDoc}
697     * @throws NullPointerException {@inheritDoc}
698     * @throws IllegalArgumentException {@inheritDoc}
699     */
700     public int drainTo(Collection<? super E> c, int maxElements) {
701     if (c == null)
702     throw new NullPointerException();
703     if (c == this)
704     throw new IllegalArgumentException();
705     if (maxElements <= 0)
706     return 0;
707     final ReentrantLock lock = this.lock;
708     lock.lock();
709     try {
710     int n = Math.min(size, maxElements);
711     for (int i = 0; i < n; i++) {
712     c.add((E) queue[0]); // In this order, in case add() throws.
713     dequeue();
714     }
715     return n;
716     } finally {
717     lock.unlock();
718     }
719     }
720    
721     /**
722     * Atomically removes all of the elements from this queue.
723     * The queue will be empty after this call returns.
724     */
725     public void clear() {
726     final ReentrantLock lock = this.lock;
727     lock.lock();
728     try {
729     Object[] array = queue;
730     int n = size;
731     size = 0;
732     for (int i = 0; i < n; i++)
733     array[i] = null;
734     } finally {
735     lock.unlock();
736     }
737     }
738    
739     /**
740     * Returns an array containing all of the elements in this queue.
741     * The returned array elements are in no particular order.
742     *
743     * <p>The returned array will be "safe" in that no references to it are
744     * maintained by this queue. (In other words, this method must allocate
745     * a new array). The caller is thus free to modify the returned array.
746     *
747     * <p>This method acts as bridge between array-based and collection-based
748     * APIs.
749     *
750     * @return an array containing all of the elements in this queue
751     */
752     public Object[] toArray() {
753     final ReentrantLock lock = this.lock;
754     lock.lock();
755     try {
756     return Arrays.copyOf(queue, size);
757     } finally {
758     lock.unlock();
759     }
760     }
761    
762     /**
763     * Returns an array containing all of the elements in this queue; the
764     * runtime type of the returned array is that of the specified array.
765     * The returned array elements are in no particular order.
766     * If the queue fits in the specified array, it is returned therein.
767     * Otherwise, a new array is allocated with the runtime type of the
768     * specified array and the size of this queue.
769     *
770     * <p>If this queue fits in the specified array with room to spare
771     * (i.e., the array has more elements than this queue), the element in
772     * the array immediately following the end of the queue is set to
773     * {@code null}.
774     *
775     * <p>Like the {@link #toArray()} method, this method acts as bridge between
776     * array-based and collection-based APIs. Further, this method allows
777     * precise control over the runtime type of the output array, and may,
778     * under certain circumstances, be used to save allocation costs.
779     *
780     * <p>Suppose {@code x} is a queue known to contain only strings.
781     * The following code can be used to dump the queue into a newly
782     * allocated array of {@code String}:
783     *
784     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
785     *
786     * Note that {@code toArray(new Object[0])} is identical in function to
787     * {@code toArray()}.
788     *
789     * @param a the array into which the elements of the queue are to
790     * be stored, if it is big enough; otherwise, a new array of the
791     * same runtime type is allocated for this purpose
792     * @return an array containing all of the elements in this queue
793     * @throws ArrayStoreException if the runtime type of the specified array
794     * is not a supertype of the runtime type of every element in
795     * this queue
796     * @throws NullPointerException if the specified array is null
797     */
798     public <T> T[] toArray(T[] a) {
799     final ReentrantLock lock = this.lock;
800     lock.lock();
801     try {
802     int n = size;
803     if (a.length < n)
804     // Make a new array of a's runtime type, but my contents:
805     return (T[]) Arrays.copyOf(queue, size, a.getClass());
806     System.arraycopy(queue, 0, a, 0, n);
807     if (a.length > n)
808     a[n] = null;
809     return a;
810     } finally {
811     lock.unlock();
812     }
813     }
814    
815     /**
816     * Returns an iterator over the elements in this queue. The
817     * iterator does not return the elements in any particular order.
818     *
819     * <p>The returned iterator is
820     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
821     *
822     * @return an iterator over the elements in this queue
823     */
824     public Iterator<E> iterator() {
825     return new Itr(toArray());
826     }
827    
828     /**
829     * Snapshot iterator that works off copy of underlying q array.
830     */
831     final class Itr implements Iterator<E> {
832     final Object[] array; // Array of all elements
833     int cursor; // index of next element to return
834     int lastRet; // index of last element, or -1 if no such
835    
836     Itr(Object[] array) {
837     lastRet = -1;
838     this.array = array;
839     }
840    
841     public boolean hasNext() {
842     return cursor < array.length;
843     }
844    
845     public E next() {
846     if (cursor >= array.length)
847     throw new NoSuchElementException();
848     lastRet = cursor;
849     return (E)array[cursor++];
850     }
851    
852     public void remove() {
853     if (lastRet < 0)
854     throw new IllegalStateException();
855     removeEQ(array[lastRet]);
856     lastRet = -1;
857     }
858     }
859    
860     /**
861     * Saves this queue to a stream (that is, serializes it).
862     *
863     * For compatibility with previous version of this class, elements
864     * are first copied to a java.util.PriorityQueue, which is then
865     * serialized.
866     *
867     * @param s the stream
868     * @throws java.io.IOException if an I/O error occurs
869     */
870     private void writeObject(java.io.ObjectOutputStream s)
871     throws java.io.IOException {
872     lock.lock();
873     try {
874     // avoid zero capacity argument
875     q = new PriorityQueue<E>(Math.max(size, 1), comparator);
876     q.addAll(this);
877     s.defaultWriteObject();
878     } finally {
879     q = null;
880     lock.unlock();
881     }
882     }
883    
884     /**
885     * Reconstitutes this queue from a stream (that is, deserializes it).
886     * @param s the stream
887     * @throws ClassNotFoundException if the class of a serialized object
888     * could not be found
889     * @throws java.io.IOException if an I/O error occurs
890     */
891     private void readObject(java.io.ObjectInputStream s)
892     throws java.io.IOException, ClassNotFoundException {
893     try {
894     s.defaultReadObject();
895     this.queue = new Object[q.size()];
896     comparator = q.comparator();
897     addAll(q);
898     } finally {
899     q = null;
900     }
901     }
902    
903 jsr166 1.4 /**
904     * Immutable snapshot spliterator that binds to elements "late".
905     */
906     final class PBQSpliterator implements Spliterator<E> {
907     Object[] array; // null until late-bound-initialized
908 jsr166 1.1 int index;
909     int fence;
910    
911 jsr166 1.4 PBQSpliterator() {}
912    
913     PBQSpliterator(Object[] array, int index, int fence) {
914 jsr166 1.1 this.array = array;
915     this.index = index;
916     this.fence = fence;
917     }
918    
919 jsr166 1.4 private int getFence() {
920     if (array == null)
921     fence = (array = toArray()).length;
922     return fence;
923 jsr166 1.1 }
924    
925 jsr166 1.4 public PBQSpliterator trySplit() {
926 jsr166 1.1 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
927     return (lo >= mid) ? null :
928 jsr166 1.4 new PBQSpliterator(array, lo, index = mid);
929 jsr166 1.1 }
930    
931     public void forEachRemaining(Consumer<? super E> action) {
932 jsr166 1.4 Objects.requireNonNull(action);
933     final int hi = getFence(), lo = index;
934     final Object[] a = array;
935     index = hi; // ensure exhaustion
936     for (int i = lo; i < hi; i++)
937     action.accept((E) a[i]);
938 jsr166 1.1 }
939    
940     public boolean tryAdvance(Consumer<? super E> action) {
941 jsr166 1.4 Objects.requireNonNull(action);
942 jsr166 1.1 if (getFence() > index && index >= 0) {
943 jsr166 1.4 action.accept((E) array[index++]);
944 jsr166 1.1 return true;
945     }
946     return false;
947     }
948    
949 jsr166 1.4 public long estimateSize() { return getFence() - index; }
950 jsr166 1.1
951     public int characteristics() {
952 jsr166 1.4 return (Spliterator.NONNULL |
953     Spliterator.SIZED |
954     Spliterator.SUBSIZED);
955 jsr166 1.1 }
956     }
957    
958     /**
959     * Returns a {@link Spliterator} over the elements in this queue.
960     *
961     * <p>The returned spliterator is
962     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
963     *
964     * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
965     * {@link Spliterator#NONNULL}.
966     *
967     * @implNote
968     * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
969     *
970     * @return a {@code Spliterator} over the elements in this queue
971     * @since 1.8
972     */
973     public Spliterator<E> spliterator() {
974 jsr166 1.4 return new PBQSpliterator();
975 jsr166 1.1 }
976    
977     // Unsafe mechanics
978     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
979     private static final long ALLOCATIONSPINLOCK;
980     static {
981     try {
982     ALLOCATIONSPINLOCK = U.objectFieldOffset
983     (PriorityBlockingQueue.class.getDeclaredField("allocationSpinLock"));
984     } catch (ReflectiveOperationException e) {
985     throw new Error(e);
986     }
987     }
988     }