ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.148
Committed: Fri Mar 18 16:01:42 2022 UTC (2 years, 2 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.147: +11 -5 lines
Log Message:
jdk17+ suppressWarnings, FJ updates

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.AbstractQueue;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Comparator;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.Objects;
18 import java.util.PriorityQueue;
19 import java.util.Queue;
20 import java.util.SortedSet;
21 import java.util.Spliterator;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import java.util.function.Predicate;
26 // OPENJDK import jdk.internal.access.SharedSecrets;
27 import jdk.internal.util.ArraysSupport;
28
29 /**
30 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
31 * the same ordering rules as class {@link PriorityQueue} and supplies
32 * blocking retrieval operations. While this queue is logically
33 * unbounded, attempted additions may fail due to resource exhaustion
34 * (causing {@code OutOfMemoryError}). This class does not permit
35 * {@code null} elements. A priority queue relying on {@linkplain
36 * Comparable natural ordering} also does not permit insertion of
37 * non-comparable objects (doing so results in
38 * {@code ClassCastException}).
39 *
40 * <p>This class and its iterator implement all of the <em>optional</em>
41 * methods of the {@link Collection} and {@link Iterator} interfaces.
42 * The Iterator provided in method {@link #iterator()} and the
43 * Spliterator provided in method {@link #spliterator()} are <em>not</em>
44 * guaranteed to traverse the elements of the PriorityBlockingQueue in
45 * any particular order. If you need ordered traversal, consider using
46 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can
47 * be used to <em>remove</em> some or all elements in priority order and
48 * place them in another collection.
49 *
50 * <p>Operations on this class make no guarantees about the ordering
51 * of elements with equal priority. If you need to enforce an
52 * ordering, you can define custom classes or comparators that use a
53 * secondary key to break ties in primary priority values. For
54 * example, here is a class that applies first-in-first-out
55 * tie-breaking to comparable elements. To use it, you would insert a
56 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
57 *
58 * <pre> {@code
59 * class FIFOEntry<E extends Comparable<? super E>>
60 * implements Comparable<FIFOEntry<E>> {
61 * static final AtomicLong seq = new AtomicLong();
62 * final long seqNum;
63 * final E entry;
64 * public FIFOEntry(E entry) {
65 * seqNum = seq.getAndIncrement();
66 * this.entry = entry;
67 * }
68 * public E getEntry() { return entry; }
69 * public int compareTo(FIFOEntry<E> other) {
70 * int res = entry.compareTo(other.entry);
71 * if (res == 0 && other.entry != this.entry)
72 * res = (seqNum < other.seqNum ? -1 : 1);
73 * return res;
74 * }
75 * }}</pre>
76 *
77 * <p>This class is a member of the
78 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
79 * Java Collections Framework</a>.
80 *
81 * @since 1.5
82 * @author Doug Lea
83 * @param <E> the type of elements held in this queue
84 */
85 @SuppressWarnings("unchecked")
86 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
87 implements BlockingQueue<E>, java.io.Serializable {
88 private static final long serialVersionUID = 5595510919245408276L;
89
90 /*
91 * The implementation uses an array-based binary heap, with public
92 * operations protected with a single lock. However, allocation
93 * during resizing uses a simple spinlock (used only while not
94 * holding main lock) in order to allow takes to operate
95 * concurrently with allocation. This avoids repeated
96 * postponement of waiting consumers and consequent element
97 * build-up. The need to back away from lock during allocation
98 * makes it impossible to simply wrap delegated
99 * java.util.PriorityQueue operations within a lock, as was done
100 * in a previous version of this class. To maintain
101 * interoperability, a plain PriorityQueue is still used during
102 * serialization, which maintains compatibility at the expense of
103 * transiently doubling overhead.
104 */
105
106 /**
107 * Default array capacity.
108 */
109 private static final int DEFAULT_INITIAL_CAPACITY = 11;
110
111 /**
112 * Priority queue represented as a balanced binary heap: the two
113 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
114 * priority queue is ordered by comparator, or by the elements'
115 * natural ordering, if comparator is null: For each node n in the
116 * heap and each descendant d of n, n <= d. The element with the
117 * lowest value is in queue[0], assuming the queue is nonempty.
118 */
119 private transient Object[] queue;
120
121 /**
122 * The number of elements in the priority queue.
123 */
124 private transient int size;
125
126 /**
127 * The comparator, or null if priority queue uses elements'
128 * natural ordering.
129 */
130 private transient Comparator<? super E> comparator;
131
132 /**
133 * Lock used for all public operations.
134 */
135 private final ReentrantLock lock = new ReentrantLock();
136
137 /**
138 * Condition for blocking when empty.
139 */
140 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
141 private final Condition notEmpty = lock.newCondition();
142
143 /**
144 * Spinlock for allocation, acquired via CAS.
145 */
146 private transient volatile int allocationSpinLock;
147
148 /**
149 * A plain PriorityQueue used only for serialization,
150 * to maintain compatibility with previous versions
151 * of this class. Non-null only during serialization/deserialization.
152 */
153 private PriorityQueue<E> q;
154
155 /**
156 * Creates a {@code PriorityBlockingQueue} with the default
157 * initial capacity (11) that orders its elements according to
158 * their {@linkplain Comparable natural ordering}.
159 */
160 public PriorityBlockingQueue() {
161 this(DEFAULT_INITIAL_CAPACITY, null);
162 }
163
164 /**
165 * Creates a {@code PriorityBlockingQueue} with the specified
166 * initial capacity that orders its elements according to their
167 * {@linkplain Comparable natural ordering}.
168 *
169 * @param initialCapacity the initial capacity for this priority queue
170 * @throws IllegalArgumentException if {@code initialCapacity} is less
171 * than 1
172 */
173 public PriorityBlockingQueue(int initialCapacity) {
174 this(initialCapacity, null);
175 }
176
177 /**
178 * Creates a {@code PriorityBlockingQueue} with the specified initial
179 * capacity that orders its elements according to the specified
180 * comparator.
181 *
182 * @param initialCapacity the initial capacity for this priority queue
183 * @param comparator the comparator that will be used to order this
184 * priority queue. If {@code null}, the {@linkplain Comparable
185 * natural ordering} of the elements will be used.
186 * @throws IllegalArgumentException if {@code initialCapacity} is less
187 * than 1
188 */
189 public PriorityBlockingQueue(int initialCapacity,
190 Comparator<? super E> comparator) {
191 if (initialCapacity < 1)
192 throw new IllegalArgumentException();
193 this.comparator = comparator;
194 this.queue = new Object[Math.max(1, initialCapacity)];
195 }
196
197 /**
198 * Creates a {@code PriorityBlockingQueue} containing the elements
199 * in the specified collection. If the specified collection is a
200 * {@link SortedSet} or a {@link PriorityBlockingQueue}, this
201 * priority queue will be ordered according to the same ordering.
202 * Otherwise, this priority queue will be ordered according to the
203 * {@linkplain Comparable natural ordering} of its elements.
204 *
205 * @param c the collection whose elements are to be placed
206 * into this priority queue
207 * @throws ClassCastException if elements of the specified collection
208 * cannot be compared to one another according to the priority
209 * queue's ordering
210 * @throws NullPointerException if the specified collection or any
211 * of its elements are null
212 */
213 public PriorityBlockingQueue(Collection<? extends E> c) {
214 boolean heapify = true; // true if not known to be in heap order
215 boolean screen = true; // true if must screen for nulls
216 if (c instanceof SortedSet<?>) {
217 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
218 this.comparator = (Comparator<? super E>) ss.comparator();
219 heapify = false;
220 }
221 else if (c instanceof PriorityBlockingQueue<?>) {
222 PriorityBlockingQueue<? extends E> pq =
223 (PriorityBlockingQueue<? extends E>) c;
224 this.comparator = (Comparator<? super E>) pq.comparator();
225 screen = false;
226 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
227 heapify = false;
228 }
229 Object[] es = c.toArray();
230 int n = es.length;
231 if (c.getClass() != java.util.ArrayList.class)
232 es = Arrays.copyOf(es, n, Object[].class);
233 if (screen && (n == 1 || this.comparator != null)) {
234 for (Object e : es)
235 if (e == null)
236 throw new NullPointerException();
237 }
238 this.queue = ensureNonEmpty(es);
239 this.size = n;
240 if (heapify)
241 heapify();
242 }
243
244 /** Ensures that queue[0] exists, helping peek() and poll(). */
245 private static Object[] ensureNonEmpty(Object[] es) {
246 return (es.length > 0) ? es : new Object[1];
247 }
248
249 /**
250 * Tries to grow array to accommodate at least one more element
251 * (but normally expand by about 50%), giving up (allowing retry)
252 * on contention (which we expect to be rare). Call only while
253 * holding lock.
254 *
255 * @param array the heap array
256 * @param oldCap the length of the array
257 */
258 private void tryGrow(Object[] array, int oldCap) {
259 lock.unlock(); // must release and then re-acquire main lock
260 Object[] newArray = null;
261 if (allocationSpinLock == 0 &&
262 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
263 try {
264 int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
265 int newCap = oldCap + ((oldCap < 64) ?
266 (oldCap + 2) : // grow faster if small
267 (oldCap >> 1));
268 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
269 int minCap = oldCap + 1;
270 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
271 throw new OutOfMemoryError();
272 newCap = MAX_ARRAY_SIZE;
273 }
274 if (newCap > oldCap && queue == array)
275 newArray = new Object[newCap];
276 } finally {
277 allocationSpinLock = 0;
278 }
279 }
280 if (newArray == null) // back off if another thread is allocating
281 Thread.yield();
282 lock.lock();
283 if (newArray != null && queue == array) {
284 queue = newArray;
285 System.arraycopy(array, 0, newArray, 0, oldCap);
286 }
287 }
288
289 /**
290 * Mechanics for poll(). Call only while holding lock.
291 */
292 private E dequeue() {
293 // assert lock.isHeldByCurrentThread();
294 final Object[] es;
295 final E result;
296
297 if ((result = (E) ((es = queue)[0])) != null) {
298 final int n;
299 final E x = (E) es[(n = --size)];
300 es[n] = null;
301 if (n > 0) {
302 final Comparator<? super E> cmp;
303 if ((cmp = comparator) == null)
304 siftDownComparable(0, x, es, n);
305 else
306 siftDownUsingComparator(0, x, es, n, cmp);
307 }
308 }
309 return result;
310 }
311
312 /**
313 * Inserts item x at position k, maintaining heap invariant by
314 * promoting x up the tree until it is greater than or equal to
315 * its parent, or is the root.
316 *
317 * To simplify and speed up coercions and comparisons, the
318 * Comparable and Comparator versions are separated into different
319 * methods that are otherwise identical. (Similarly for siftDown.)
320 *
321 * @param k the position to fill
322 * @param x the item to insert
323 * @param es the heap array
324 */
325 private static <T> void siftUpComparable(int k, T x, Object[] es) {
326 Comparable<? super T> key = (Comparable<? super T>) x;
327 while (k > 0) {
328 int parent = (k - 1) >>> 1;
329 Object e = es[parent];
330 if (key.compareTo((T) e) >= 0)
331 break;
332 es[k] = e;
333 k = parent;
334 }
335 es[k] = key;
336 }
337
338 private static <T> void siftUpUsingComparator(
339 int k, T x, Object[] es, Comparator<? super T> cmp) {
340 while (k > 0) {
341 int parent = (k - 1) >>> 1;
342 Object e = es[parent];
343 if (cmp.compare(x, (T) e) >= 0)
344 break;
345 es[k] = e;
346 k = parent;
347 }
348 es[k] = x;
349 }
350
351 /**
352 * Inserts item x at position k, maintaining heap invariant by
353 * demoting x down the tree repeatedly until it is less than or
354 * equal to its children or is a leaf.
355 *
356 * @param k the position to fill
357 * @param x the item to insert
358 * @param es the heap array
359 * @param n heap size
360 */
361 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
362 // assert n > 0;
363 Comparable<? super T> key = (Comparable<? super T>)x;
364 int half = n >>> 1; // loop while a non-leaf
365 while (k < half) {
366 int child = (k << 1) + 1; // assume left child is least
367 Object c = es[child];
368 int right = child + 1;
369 if (right < n &&
370 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
371 c = es[child = right];
372 if (key.compareTo((T) c) <= 0)
373 break;
374 es[k] = c;
375 k = child;
376 }
377 es[k] = key;
378 }
379
380 private static <T> void siftDownUsingComparator(
381 int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
382 // assert n > 0;
383 int half = n >>> 1;
384 while (k < half) {
385 int child = (k << 1) + 1;
386 Object c = es[child];
387 int right = child + 1;
388 if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
389 c = es[child = right];
390 if (cmp.compare(x, (T) c) <= 0)
391 break;
392 es[k] = c;
393 k = child;
394 }
395 es[k] = x;
396 }
397
398 /**
399 * Establishes the heap invariant (described above) in the entire tree,
400 * assuming nothing about the order of the elements prior to the call.
401 * This classic algorithm due to Floyd (1964) is known to be O(size).
402 */
403 private void heapify() {
404 final Object[] es = queue;
405 int n = size, i = (n >>> 1) - 1;
406 final Comparator<? super E> cmp;
407 if ((cmp = comparator) == null)
408 for (; i >= 0; i--)
409 siftDownComparable(i, (E) es[i], es, n);
410 else
411 for (; i >= 0; i--)
412 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
413 }
414
415 /**
416 * Inserts the specified element into this priority queue.
417 *
418 * @param e the element to add
419 * @return {@code true} (as specified by {@link Collection#add})
420 * @throws ClassCastException if the specified element cannot be compared
421 * with elements currently in the priority queue according to the
422 * priority queue's ordering
423 * @throws NullPointerException if the specified element is null
424 */
425 public boolean add(E e) {
426 return offer(e);
427 }
428
429 /**
430 * Inserts the specified element into this priority queue.
431 * As the queue is unbounded, this method will never return {@code false}.
432 *
433 * @param e the element to add
434 * @return {@code true} (as specified by {@link Queue#offer})
435 * @throws ClassCastException if the specified element cannot be compared
436 * with elements currently in the priority queue according to the
437 * priority queue's ordering
438 * @throws NullPointerException if the specified element is null
439 */
440 public boolean offer(E e) {
441 if (e == null)
442 throw new NullPointerException();
443 final ReentrantLock lock = this.lock;
444 lock.lock();
445 int n, cap;
446 Object[] es;
447 while ((n = size) >= (cap = (es = queue).length))
448 tryGrow(es, cap);
449 try {
450 final Comparator<? super E> cmp;
451 if ((cmp = comparator) == null)
452 siftUpComparable(n, e, es);
453 else
454 siftUpUsingComparator(n, e, es, cmp);
455 size = n + 1;
456 notEmpty.signal();
457 } finally {
458 lock.unlock();
459 }
460 return true;
461 }
462
463 /**
464 * Inserts the specified element into this priority queue.
465 * As the queue is unbounded, this method will never block.
466 *
467 * @param e the element to add
468 * @throws ClassCastException if the specified element cannot be compared
469 * with elements currently in the priority queue according to the
470 * priority queue's ordering
471 * @throws NullPointerException if the specified element is null
472 */
473 public void put(E e) {
474 offer(e); // never need to block
475 }
476
477 /**
478 * Inserts the specified element into this priority queue.
479 * As the queue is unbounded, this method will never block or
480 * return {@code false}.
481 *
482 * @param e the element to add
483 * @param timeout This parameter is ignored as the method never blocks
484 * @param unit This parameter is ignored as the method never blocks
485 * @return {@code true} (as specified by
486 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
487 * @throws ClassCastException if the specified element cannot be compared
488 * with elements currently in the priority queue according to the
489 * priority queue's ordering
490 * @throws NullPointerException if the specified element is null
491 */
492 public boolean offer(E e, long timeout, TimeUnit unit) {
493 return offer(e); // never need to block
494 }
495
496 public E poll() {
497 final ReentrantLock lock = this.lock;
498 lock.lock();
499 try {
500 return dequeue();
501 } finally {
502 lock.unlock();
503 }
504 }
505
506 public E take() throws InterruptedException {
507 final ReentrantLock lock = this.lock;
508 lock.lockInterruptibly();
509 E result;
510 try {
511 while ( (result = dequeue()) == null)
512 notEmpty.await();
513 } finally {
514 lock.unlock();
515 }
516 return result;
517 }
518
519 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
520 long nanos = unit.toNanos(timeout);
521 final ReentrantLock lock = this.lock;
522 lock.lockInterruptibly();
523 E result;
524 try {
525 while ( (result = dequeue()) == null && nanos > 0)
526 nanos = notEmpty.awaitNanos(nanos);
527 } finally {
528 lock.unlock();
529 }
530 return result;
531 }
532
533 public E peek() {
534 final ReentrantLock lock = this.lock;
535 lock.lock();
536 try {
537 return (E) queue[0];
538 } finally {
539 lock.unlock();
540 }
541 }
542
543 /**
544 * Returns the comparator used to order the elements in this queue,
545 * or {@code null} if this queue uses the {@linkplain Comparable
546 * natural ordering} of its elements.
547 *
548 * @return the comparator used to order the elements in this queue,
549 * or {@code null} if this queue uses the natural
550 * ordering of its elements
551 */
552 public Comparator<? super E> comparator() {
553 return comparator;
554 }
555
556 public int size() {
557 final ReentrantLock lock = this.lock;
558 lock.lock();
559 try {
560 return size;
561 } finally {
562 lock.unlock();
563 }
564 }
565
566 /**
567 * Always returns {@code Integer.MAX_VALUE} because
568 * a {@code PriorityBlockingQueue} is not capacity constrained.
569 * @return {@code Integer.MAX_VALUE} always
570 */
571 public int remainingCapacity() {
572 return Integer.MAX_VALUE;
573 }
574
575 private int indexOf(Object o) {
576 if (o != null) {
577 final Object[] es = queue;
578 for (int i = 0, n = size; i < n; i++)
579 if (o.equals(es[i]))
580 return i;
581 }
582 return -1;
583 }
584
585 /**
586 * Removes the ith element from queue.
587 */
588 private void removeAt(int i) {
589 final Object[] es = queue;
590 final int n = size - 1;
591 if (n == i) // removed last element
592 es[i] = null;
593 else {
594 E moved = (E) es[n];
595 es[n] = null;
596 final Comparator<? super E> cmp;
597 if ((cmp = comparator) == null)
598 siftDownComparable(i, moved, es, n);
599 else
600 siftDownUsingComparator(i, moved, es, n, cmp);
601 if (es[i] == moved) {
602 if (cmp == null)
603 siftUpComparable(i, moved, es);
604 else
605 siftUpUsingComparator(i, moved, es, cmp);
606 }
607 }
608 size = n;
609 }
610
611 /**
612 * Removes a single instance of the specified element from this queue,
613 * if it is present. More formally, removes an element {@code e} such
614 * that {@code o.equals(e)}, if this queue contains one or more such
615 * elements. Returns {@code true} if and only if this queue contained
616 * the specified element (or equivalently, if this queue changed as a
617 * result of the call).
618 *
619 * @param o element to be removed from this queue, if present
620 * @return {@code true} if this queue changed as a result of the call
621 */
622 public boolean remove(Object o) {
623 final ReentrantLock lock = this.lock;
624 lock.lock();
625 try {
626 int i = indexOf(o);
627 if (i == -1)
628 return false;
629 removeAt(i);
630 return true;
631 } finally {
632 lock.unlock();
633 }
634 }
635
636 /**
637 * Identity-based version for use in Itr.remove.
638 *
639 * @param o element to be removed from this queue, if present
640 */
641 void removeEq(Object o) {
642 final ReentrantLock lock = this.lock;
643 lock.lock();
644 try {
645 final Object[] es = queue;
646 for (int i = 0, n = size; i < n; i++) {
647 if (o == es[i]) {
648 removeAt(i);
649 break;
650 }
651 }
652 } finally {
653 lock.unlock();
654 }
655 }
656
657 /**
658 * Returns {@code true} if this queue contains the specified element.
659 * More formally, returns {@code true} if and only if this queue contains
660 * at least one element {@code e} such that {@code o.equals(e)}.
661 *
662 * @param o object to be checked for containment in this queue
663 * @return {@code true} if this queue contains the specified element
664 */
665 public boolean contains(Object o) {
666 final ReentrantLock lock = this.lock;
667 lock.lock();
668 try {
669 return indexOf(o) != -1;
670 } finally {
671 lock.unlock();
672 }
673 }
674
675 public String toString() {
676 return Helpers.collectionToString(this);
677 }
678
679 /**
680 * @throws UnsupportedOperationException {@inheritDoc}
681 * @throws ClassCastException {@inheritDoc}
682 * @throws NullPointerException {@inheritDoc}
683 * @throws IllegalArgumentException {@inheritDoc}
684 */
685 public int drainTo(Collection<? super E> c) {
686 return drainTo(c, Integer.MAX_VALUE);
687 }
688
689 /**
690 * @throws UnsupportedOperationException {@inheritDoc}
691 * @throws ClassCastException {@inheritDoc}
692 * @throws NullPointerException {@inheritDoc}
693 * @throws IllegalArgumentException {@inheritDoc}
694 */
695 public int drainTo(Collection<? super E> c, int maxElements) {
696 Objects.requireNonNull(c);
697 if (c == this)
698 throw new IllegalArgumentException();
699 if (maxElements <= 0)
700 return 0;
701 final ReentrantLock lock = this.lock;
702 lock.lock();
703 try {
704 int n = Math.min(size, maxElements);
705 for (int i = 0; i < n; i++) {
706 c.add((E) queue[0]); // In this order, in case add() throws.
707 dequeue();
708 }
709 return n;
710 } finally {
711 lock.unlock();
712 }
713 }
714
715 /**
716 * Atomically removes all of the elements from this queue.
717 * The queue will be empty after this call returns.
718 */
719 public void clear() {
720 final ReentrantLock lock = this.lock;
721 lock.lock();
722 try {
723 final Object[] es = queue;
724 for (int i = 0, n = size; i < n; i++)
725 es[i] = null;
726 size = 0;
727 } finally {
728 lock.unlock();
729 }
730 }
731
732 /**
733 * Returns an array containing all of the elements in this queue.
734 * The returned array elements are in no particular order.
735 *
736 * <p>The returned array will be "safe" in that no references to it are
737 * maintained by this queue. (In other words, this method must allocate
738 * a new array). The caller is thus free to modify the returned array.
739 *
740 * <p>This method acts as bridge between array-based and collection-based
741 * APIs.
742 *
743 * @return an array containing all of the elements in this queue
744 */
745 public Object[] toArray() {
746 final ReentrantLock lock = this.lock;
747 lock.lock();
748 try {
749 return Arrays.copyOf(queue, size);
750 } finally {
751 lock.unlock();
752 }
753 }
754
755 /**
756 * Returns an array containing all of the elements in this queue; the
757 * runtime type of the returned array is that of the specified array.
758 * The returned array elements are in no particular order.
759 * If the queue fits in the specified array, it is returned therein.
760 * Otherwise, a new array is allocated with the runtime type of the
761 * specified array and the size of this queue.
762 *
763 * <p>If this queue fits in the specified array with room to spare
764 * (i.e., the array has more elements than this queue), the element in
765 * the array immediately following the end of the queue is set to
766 * {@code null}.
767 *
768 * <p>Like the {@link #toArray()} method, this method acts as bridge between
769 * array-based and collection-based APIs. Further, this method allows
770 * precise control over the runtime type of the output array, and may,
771 * under certain circumstances, be used to save allocation costs.
772 *
773 * <p>Suppose {@code x} is a queue known to contain only strings.
774 * The following code can be used to dump the queue into a newly
775 * allocated array of {@code String}:
776 *
777 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
778 *
779 * Note that {@code toArray(new Object[0])} is identical in function to
780 * {@code toArray()}.
781 *
782 * @param a the array into which the elements of the queue are to
783 * be stored, if it is big enough; otherwise, a new array of the
784 * same runtime type is allocated for this purpose
785 * @return an array containing all of the elements in this queue
786 * @throws ArrayStoreException if the runtime type of the specified array
787 * is not a supertype of the runtime type of every element in
788 * this queue
789 * @throws NullPointerException if the specified array is null
790 */
791 public <T> T[] toArray(T[] a) {
792 final ReentrantLock lock = this.lock;
793 lock.lock();
794 try {
795 int n = size;
796 if (a.length < n)
797 // Make a new array of a's runtime type, but my contents:
798 return (T[]) Arrays.copyOf(queue, size, a.getClass());
799 System.arraycopy(queue, 0, a, 0, n);
800 if (a.length > n)
801 a[n] = null;
802 return a;
803 } finally {
804 lock.unlock();
805 }
806 }
807
808 /**
809 * Returns an iterator over the elements in this queue. The
810 * iterator does not return the elements in any particular order.
811 *
812 * <p>The returned iterator is
813 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
814 *
815 * @return an iterator over the elements in this queue
816 */
817 public Iterator<E> iterator() {
818 return new Itr(toArray());
819 }
820
821 /**
822 * Snapshot iterator that works off copy of underlying q array.
823 */
824 final class Itr implements Iterator<E> {
825 final Object[] array; // Array of all elements
826 int cursor; // index of next element to return
827 int lastRet = -1; // index of last element, or -1 if no such
828
829 Itr(Object[] array) {
830 this.array = array;
831 }
832
833 public boolean hasNext() {
834 return cursor < array.length;
835 }
836
837 public E next() {
838 if (cursor >= array.length)
839 throw new NoSuchElementException();
840 return (E)array[lastRet = cursor++];
841 }
842
843 public void remove() {
844 if (lastRet < 0)
845 throw new IllegalStateException();
846 removeEq(array[lastRet]);
847 lastRet = -1;
848 }
849
850 public void forEachRemaining(Consumer<? super E> action) {
851 Objects.requireNonNull(action);
852 final Object[] es = array;
853 int i;
854 if ((i = cursor) < es.length) {
855 lastRet = -1;
856 cursor = es.length;
857 for (; i < es.length; i++)
858 action.accept((E) es[i]);
859 lastRet = es.length - 1;
860 }
861 }
862 }
863
864 /**
865 * Saves this queue to a stream (that is, serializes it).
866 *
867 * For compatibility with previous version of this class, elements
868 * are first copied to a java.util.PriorityQueue, which is then
869 * serialized.
870 *
871 * @param s the stream
872 * @throws java.io.IOException if an I/O error occurs
873 */
874 private void writeObject(java.io.ObjectOutputStream s)
875 throws java.io.IOException {
876 lock.lock();
877 try {
878 // avoid zero capacity argument
879 q = new PriorityQueue<E>(Math.max(size, 1), comparator);
880 q.addAll(this);
881 s.defaultWriteObject();
882 } finally {
883 q = null;
884 lock.unlock();
885 }
886 }
887
888 /**
889 * Reconstitutes this queue from a stream (that is, deserializes it).
890 * @param s the stream
891 * @throws ClassNotFoundException if the class of a serialized object
892 * could not be found
893 * @throws java.io.IOException if an I/O error occurs
894 */
895 private void readObject(java.io.ObjectInputStream s)
896 throws java.io.IOException, ClassNotFoundException {
897 try {
898 s.defaultReadObject();
899 int sz = q.size();
900 jsr166.Platform.checkArray(s, Object[].class, sz);
901 this.queue = new Object[Math.max(1, sz)];
902 comparator = q.comparator();
903 addAll(q);
904 } finally {
905 q = null;
906 }
907 }
908
909 /**
910 * Immutable snapshot spliterator that binds to elements "late".
911 */
912 final class PBQSpliterator implements Spliterator<E> {
913 Object[] array; // null until late-bound-initialized
914 int index;
915 int fence;
916
917 PBQSpliterator() {}
918
919 PBQSpliterator(Object[] array, int index, int fence) {
920 this.array = array;
921 this.index = index;
922 this.fence = fence;
923 }
924
925 private int getFence() {
926 if (array == null)
927 fence = (array = toArray()).length;
928 return fence;
929 }
930
931 public PBQSpliterator trySplit() {
932 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
933 return (lo >= mid) ? null :
934 new PBQSpliterator(array, lo, index = mid);
935 }
936
937 public void forEachRemaining(Consumer<? super E> action) {
938 Objects.requireNonNull(action);
939 final int hi = getFence(), lo = index;
940 final Object[] es = array;
941 index = hi; // ensure exhaustion
942 for (int i = lo; i < hi; i++)
943 action.accept((E) es[i]);
944 }
945
946 public boolean tryAdvance(Consumer<? super E> action) {
947 Objects.requireNonNull(action);
948 if (getFence() > index && index >= 0) {
949 action.accept((E) array[index++]);
950 return true;
951 }
952 return false;
953 }
954
955 public long estimateSize() { return getFence() - index; }
956
957 public int characteristics() {
958 return (Spliterator.NONNULL |
959 Spliterator.SIZED |
960 Spliterator.SUBSIZED);
961 }
962 }
963
964 /**
965 * Returns a {@link Spliterator} over the elements in this queue.
966 * The spliterator does not traverse elements in any particular order
967 * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
968 *
969 * <p>The returned spliterator is
970 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
971 *
972 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
973 * {@link Spliterator#NONNULL}.
974 *
975 * @implNote
976 * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
977 *
978 * @return a {@code Spliterator} over the elements in this queue
979 * @since 1.8
980 */
981 public Spliterator<E> spliterator() {
982 return new PBQSpliterator();
983 }
984
985 /**
986 * @throws NullPointerException {@inheritDoc}
987 */
988 public boolean removeIf(Predicate<? super E> filter) {
989 Objects.requireNonNull(filter);
990 return bulkRemove(filter);
991 }
992
993 /**
994 * @throws NullPointerException {@inheritDoc}
995 */
996 public boolean removeAll(Collection<?> c) {
997 Objects.requireNonNull(c);
998 return bulkRemove(e -> c.contains(e));
999 }
1000
1001 /**
1002 * @throws NullPointerException {@inheritDoc}
1003 */
1004 public boolean retainAll(Collection<?> c) {
1005 Objects.requireNonNull(c);
1006 return bulkRemove(e -> !c.contains(e));
1007 }
1008
1009 // A tiny bit set implementation
1010
1011 private static long[] nBits(int n) {
1012 return new long[((n - 1) >> 6) + 1];
1013 }
1014 private static void setBit(long[] bits, int i) {
1015 bits[i >> 6] |= 1L << i;
1016 }
1017 private static boolean isClear(long[] bits, int i) {
1018 return (bits[i >> 6] & (1L << i)) == 0;
1019 }
1020
1021 /** Implementation of bulk remove methods. */
1022 private boolean bulkRemove(Predicate<? super E> filter) {
1023 final ReentrantLock lock = this.lock;
1024 lock.lock();
1025 try {
1026 final Object[] es = queue;
1027 final int end = size;
1028 int i;
1029 // Optimize for initial run of survivors
1030 for (i = 0; i < end && !filter.test((E) es[i]); i++)
1031 ;
1032 if (i >= end)
1033 return false;
1034 // Tolerate predicates that reentrantly access the
1035 // collection for read, so traverse once to find elements
1036 // to delete, a second pass to physically expunge.
1037 final int beg = i;
1038 final long[] deathRow = nBits(end - beg);
1039 deathRow[0] = 1L; // set bit 0
1040 for (i = beg + 1; i < end; i++)
1041 if (filter.test((E) es[i]))
1042 setBit(deathRow, i - beg);
1043 int w = beg;
1044 for (i = beg; i < end; i++)
1045 if (isClear(deathRow, i - beg))
1046 es[w++] = es[i];
1047 for (i = size = w; i < end; i++)
1048 es[i] = null;
1049 heapify();
1050 return true;
1051 } finally {
1052 lock.unlock();
1053 }
1054 }
1055
1056 /**
1057 * @throws NullPointerException {@inheritDoc}
1058 */
1059 public void forEach(Consumer<? super E> action) {
1060 Objects.requireNonNull(action);
1061 final ReentrantLock lock = this.lock;
1062 lock.lock();
1063 try {
1064 final Object[] es = queue;
1065 for (int i = 0, n = size; i < n; i++)
1066 action.accept((E) es[i]);
1067 } finally {
1068 lock.unlock();
1069 }
1070 }
1071
1072 // VarHandle mechanics
1073 private static final VarHandle ALLOCATIONSPINLOCK;
1074 static {
1075 try {
1076 MethodHandles.Lookup l = MethodHandles.lookup();
1077 ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1078 "allocationSpinLock",
1079 int.class);
1080 } catch (ReflectiveOperationException e) {
1081 throw new ExceptionInInitializerError(e);
1082 }
1083 }
1084 }