ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.140
Committed: Mon Oct 1 00:10:53 2018 UTC (5 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.139: +1 -1 lines
Log Message:
update to using jdk11 by default, except link to jdk10 javadocs;
sync @docRoot references in javadoc with upstream

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.AbstractQueue;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Comparator;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.Objects;
18 import java.util.PriorityQueue;
19 import java.util.Queue;
20 import java.util.SortedSet;
21 import java.util.Spliterator;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import java.util.function.Predicate;
26 import jdk.internal.misc.SharedSecrets;
27
28 /**
29 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
30 * the same ordering rules as class {@link PriorityQueue} and supplies
31 * blocking retrieval operations. While this queue is logically
32 * unbounded, attempted additions may fail due to resource exhaustion
33 * (causing {@code OutOfMemoryError}). This class does not permit
34 * {@code null} elements. A priority queue relying on {@linkplain
35 * Comparable natural ordering} also does not permit insertion of
36 * non-comparable objects (doing so results in
37 * {@code ClassCastException}).
38 *
39 * <p>This class and its iterator implement all of the <em>optional</em>
40 * methods of the {@link Collection} and {@link Iterator} interfaces.
41 * The Iterator provided in method {@link #iterator()} and the
42 * Spliterator provided in method {@link #spliterator()} are <em>not</em>
43 * guaranteed to traverse the elements of the PriorityBlockingQueue in
44 * any particular order. If you need ordered traversal, consider using
45 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can
46 * be used to <em>remove</em> some or all elements in priority order and
47 * place them in another collection.
48 *
49 * <p>Operations on this class make no guarantees about the ordering
50 * of elements with equal priority. If you need to enforce an
51 * ordering, you can define custom classes or comparators that use a
52 * secondary key to break ties in primary priority values. For
53 * example, here is a class that applies first-in-first-out
54 * tie-breaking to comparable elements. To use it, you would insert a
55 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
56 *
57 * <pre> {@code
58 * class FIFOEntry<E extends Comparable<? super E>>
59 * implements Comparable<FIFOEntry<E>> {
60 * static final AtomicLong seq = new AtomicLong(0);
61 * final long seqNum;
62 * final E entry;
63 * public FIFOEntry(E entry) {
64 * seqNum = seq.getAndIncrement();
65 * this.entry = entry;
66 * }
67 * public E getEntry() { return entry; }
68 * public int compareTo(FIFOEntry<E> other) {
69 * int res = entry.compareTo(other.entry);
70 * if (res == 0 && other.entry != this.entry)
71 * res = (seqNum < other.seqNum ? -1 : 1);
72 * return res;
73 * }
74 * }}</pre>
75 *
76 * <p>This class is a member of the
77 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
78 * Java Collections Framework</a>.
79 *
80 * @since 1.5
81 * @author Doug Lea
82 * @param <E> the type of elements held in this queue
83 */
84 @SuppressWarnings("unchecked")
85 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
86 implements BlockingQueue<E>, java.io.Serializable {
87 private static final long serialVersionUID = 5595510919245408276L;
88
89 /*
90 * The implementation uses an array-based binary heap, with public
91 * operations protected with a single lock. However, allocation
92 * during resizing uses a simple spinlock (used only while not
93 * holding main lock) in order to allow takes to operate
94 * concurrently with allocation. This avoids repeated
95 * postponement of waiting consumers and consequent element
96 * build-up. The need to back away from lock during allocation
97 * makes it impossible to simply wrap delegated
98 * java.util.PriorityQueue operations within a lock, as was done
99 * in a previous version of this class. To maintain
100 * interoperability, a plain PriorityQueue is still used during
101 * serialization, which maintains compatibility at the expense of
102 * transiently doubling overhead.
103 */
104
105 /**
106 * Default array capacity.
107 */
108 private static final int DEFAULT_INITIAL_CAPACITY = 11;
109
110 /**
111 * The maximum size of array to allocate.
112 * Some VMs reserve some header words in an array.
113 * Attempts to allocate larger arrays may result in
114 * OutOfMemoryError: Requested array size exceeds VM limit
115 */
116 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
117
118 /**
119 * Priority queue represented as a balanced binary heap: the two
120 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
121 * priority queue is ordered by comparator, or by the elements'
122 * natural ordering, if comparator is null: For each node n in the
123 * heap and each descendant d of n, n <= d. The element with the
124 * lowest value is in queue[0], assuming the queue is nonempty.
125 */
126 private transient Object[] queue;
127
128 /**
129 * The number of elements in the priority queue.
130 */
131 private transient int size;
132
133 /**
134 * The comparator, or null if priority queue uses elements'
135 * natural ordering.
136 */
137 private transient Comparator<? super E> comparator;
138
139 /**
140 * Lock used for all public operations.
141 */
142 private final ReentrantLock lock = new ReentrantLock();
143
144 /**
145 * Condition for blocking when empty.
146 */
147 private final Condition notEmpty = lock.newCondition();
148
149 /**
150 * Spinlock for allocation, acquired via CAS.
151 */
152 private transient volatile int allocationSpinLock;
153
154 /**
155 * A plain PriorityQueue used only for serialization,
156 * to maintain compatibility with previous versions
157 * of this class. Non-null only during serialization/deserialization.
158 */
159 private PriorityQueue<E> q;
160
161 /**
162 * Creates a {@code PriorityBlockingQueue} with the default
163 * initial capacity (11) that orders its elements according to
164 * their {@linkplain Comparable natural ordering}.
165 */
166 public PriorityBlockingQueue() {
167 this(DEFAULT_INITIAL_CAPACITY, null);
168 }
169
170 /**
171 * Creates a {@code PriorityBlockingQueue} with the specified
172 * initial capacity that orders its elements according to their
173 * {@linkplain Comparable natural ordering}.
174 *
175 * @param initialCapacity the initial capacity for this priority queue
176 * @throws IllegalArgumentException if {@code initialCapacity} is less
177 * than 1
178 */
179 public PriorityBlockingQueue(int initialCapacity) {
180 this(initialCapacity, null);
181 }
182
183 /**
184 * Creates a {@code PriorityBlockingQueue} with the specified initial
185 * capacity that orders its elements according to the specified
186 * comparator.
187 *
188 * @param initialCapacity the initial capacity for this priority queue
189 * @param comparator the comparator that will be used to order this
190 * priority queue. If {@code null}, the {@linkplain Comparable
191 * natural ordering} of the elements will be used.
192 * @throws IllegalArgumentException if {@code initialCapacity} is less
193 * than 1
194 */
195 public PriorityBlockingQueue(int initialCapacity,
196 Comparator<? super E> comparator) {
197 if (initialCapacity < 1)
198 throw new IllegalArgumentException();
199 this.comparator = comparator;
200 this.queue = new Object[Math.max(1, initialCapacity)];
201 }
202
203 /**
204 * Creates a {@code PriorityBlockingQueue} containing the elements
205 * in the specified collection. If the specified collection is a
206 * {@link SortedSet} or a {@link PriorityQueue}, this
207 * priority queue will be ordered according to the same ordering.
208 * Otherwise, this priority queue will be ordered according to the
209 * {@linkplain Comparable natural ordering} of its elements.
210 *
211 * @param c the collection whose elements are to be placed
212 * into this priority queue
213 * @throws ClassCastException if elements of the specified collection
214 * cannot be compared to one another according to the priority
215 * queue's ordering
216 * @throws NullPointerException if the specified collection or any
217 * of its elements are null
218 */
219 public PriorityBlockingQueue(Collection<? extends E> c) {
220 boolean heapify = true; // true if not known to be in heap order
221 boolean screen = true; // true if must screen for nulls
222 if (c instanceof SortedSet<?>) {
223 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
224 this.comparator = (Comparator<? super E>) ss.comparator();
225 heapify = false;
226 }
227 else if (c instanceof PriorityBlockingQueue<?>) {
228 PriorityBlockingQueue<? extends E> pq =
229 (PriorityBlockingQueue<? extends E>) c;
230 this.comparator = (Comparator<? super E>) pq.comparator();
231 screen = false;
232 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
233 heapify = false;
234 }
235 Object[] es = c.toArray();
236 int n = es.length;
237 // If c.toArray incorrectly doesn't return Object[], copy it.
238 if (es.getClass() != Object[].class)
239 es = Arrays.copyOf(es, n, Object[].class);
240 if (screen && (n == 1 || this.comparator != null)) {
241 for (Object e : es)
242 if (e == null)
243 throw new NullPointerException();
244 }
245 this.queue = ensureNonEmpty(es);
246 this.size = n;
247 if (heapify)
248 heapify();
249 }
250
251 /** Ensures that queue[0] exists, helping peek() and poll(). */
252 private static Object[] ensureNonEmpty(Object[] es) {
253 return (es.length > 0) ? es : new Object[1];
254 }
255
256 /**
257 * Tries to grow array to accommodate at least one more element
258 * (but normally expand by about 50%), giving up (allowing retry)
259 * on contention (which we expect to be rare). Call only while
260 * holding lock.
261 *
262 * @param array the heap array
263 * @param oldCap the length of the array
264 */
265 private void tryGrow(Object[] array, int oldCap) {
266 lock.unlock(); // must release and then re-acquire main lock
267 Object[] newArray = null;
268 if (allocationSpinLock == 0 &&
269 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
270 try {
271 int newCap = oldCap + ((oldCap < 64) ?
272 (oldCap + 2) : // grow faster if small
273 (oldCap >> 1));
274 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
275 int minCap = oldCap + 1;
276 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
277 throw new OutOfMemoryError();
278 newCap = MAX_ARRAY_SIZE;
279 }
280 if (newCap > oldCap && queue == array)
281 newArray = new Object[newCap];
282 } finally {
283 allocationSpinLock = 0;
284 }
285 }
286 if (newArray == null) // back off if another thread is allocating
287 Thread.yield();
288 lock.lock();
289 if (newArray != null && queue == array) {
290 queue = newArray;
291 System.arraycopy(array, 0, newArray, 0, oldCap);
292 }
293 }
294
295 /**
296 * Mechanics for poll(). Call only while holding lock.
297 */
298 private E dequeue() {
299 // assert lock.isHeldByCurrentThread();
300 final Object[] es;
301 final E result;
302
303 if ((result = (E) ((es = queue)[0])) != null) {
304 final int n;
305 final E x = (E) es[(n = --size)];
306 es[n] = null;
307 if (n > 0) {
308 final Comparator<? super E> cmp;
309 if ((cmp = comparator) == null)
310 siftDownComparable(0, x, es, n);
311 else
312 siftDownUsingComparator(0, x, es, n, cmp);
313 }
314 }
315 return result;
316 }
317
318 /**
319 * Inserts item x at position k, maintaining heap invariant by
320 * promoting x up the tree until it is greater than or equal to
321 * its parent, or is the root.
322 *
323 * To simplify and speed up coercions and comparisons, the
324 * Comparable and Comparator versions are separated into different
325 * methods that are otherwise identical. (Similarly for siftDown.)
326 *
327 * @param k the position to fill
328 * @param x the item to insert
329 * @param es the heap array
330 */
331 private static <T> void siftUpComparable(int k, T x, Object[] es) {
332 Comparable<? super T> key = (Comparable<? super T>) x;
333 while (k > 0) {
334 int parent = (k - 1) >>> 1;
335 Object e = es[parent];
336 if (key.compareTo((T) e) >= 0)
337 break;
338 es[k] = e;
339 k = parent;
340 }
341 es[k] = key;
342 }
343
344 private static <T> void siftUpUsingComparator(
345 int k, T x, Object[] es, Comparator<? super T> cmp) {
346 while (k > 0) {
347 int parent = (k - 1) >>> 1;
348 Object e = es[parent];
349 if (cmp.compare(x, (T) e) >= 0)
350 break;
351 es[k] = e;
352 k = parent;
353 }
354 es[k] = x;
355 }
356
357 /**
358 * Inserts item x at position k, maintaining heap invariant by
359 * demoting x down the tree repeatedly until it is less than or
360 * equal to its children or is a leaf.
361 *
362 * @param k the position to fill
363 * @param x the item to insert
364 * @param es the heap array
365 * @param n heap size
366 */
367 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
368 // assert n > 0;
369 Comparable<? super T> key = (Comparable<? super T>)x;
370 int half = n >>> 1; // loop while a non-leaf
371 while (k < half) {
372 int child = (k << 1) + 1; // assume left child is least
373 Object c = es[child];
374 int right = child + 1;
375 if (right < n &&
376 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
377 c = es[child = right];
378 if (key.compareTo((T) c) <= 0)
379 break;
380 es[k] = c;
381 k = child;
382 }
383 es[k] = key;
384 }
385
386 private static <T> void siftDownUsingComparator(
387 int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
388 // assert n > 0;
389 int half = n >>> 1;
390 while (k < half) {
391 int child = (k << 1) + 1;
392 Object c = es[child];
393 int right = child + 1;
394 if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
395 c = es[child = right];
396 if (cmp.compare(x, (T) c) <= 0)
397 break;
398 es[k] = c;
399 k = child;
400 }
401 es[k] = x;
402 }
403
404 /**
405 * Establishes the heap invariant (described above) in the entire tree,
406 * assuming nothing about the order of the elements prior to the call.
407 * This classic algorithm due to Floyd (1964) is known to be O(size).
408 */
409 private void heapify() {
410 final Object[] es = queue;
411 int n = size, i = (n >>> 1) - 1;
412 final Comparator<? super E> cmp;
413 if ((cmp = comparator) == null)
414 for (; i >= 0; i--)
415 siftDownComparable(i, (E) es[i], es, n);
416 else
417 for (; i >= 0; i--)
418 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
419 }
420
421 /**
422 * Inserts the specified element into this priority queue.
423 *
424 * @param e the element to add
425 * @return {@code true} (as specified by {@link Collection#add})
426 * @throws ClassCastException if the specified element cannot be compared
427 * with elements currently in the priority queue according to the
428 * priority queue's ordering
429 * @throws NullPointerException if the specified element is null
430 */
431 public boolean add(E e) {
432 return offer(e);
433 }
434
435 /**
436 * Inserts the specified element into this priority queue.
437 * As the queue is unbounded, this method will never return {@code false}.
438 *
439 * @param e the element to add
440 * @return {@code true} (as specified by {@link Queue#offer})
441 * @throws ClassCastException if the specified element cannot be compared
442 * with elements currently in the priority queue according to the
443 * priority queue's ordering
444 * @throws NullPointerException if the specified element is null
445 */
446 public boolean offer(E e) {
447 if (e == null)
448 throw new NullPointerException();
449 final ReentrantLock lock = this.lock;
450 lock.lock();
451 int n, cap;
452 Object[] es;
453 while ((n = size) >= (cap = (es = queue).length))
454 tryGrow(es, cap);
455 try {
456 final Comparator<? super E> cmp;
457 if ((cmp = comparator) == null)
458 siftUpComparable(n, e, es);
459 else
460 siftUpUsingComparator(n, e, es, cmp);
461 size = n + 1;
462 notEmpty.signal();
463 } finally {
464 lock.unlock();
465 }
466 return true;
467 }
468
469 /**
470 * Inserts the specified element into this priority queue.
471 * As the queue is unbounded, this method will never block.
472 *
473 * @param e the element to add
474 * @throws ClassCastException if the specified element cannot be compared
475 * with elements currently in the priority queue according to the
476 * priority queue's ordering
477 * @throws NullPointerException if the specified element is null
478 */
479 public void put(E e) {
480 offer(e); // never need to block
481 }
482
483 /**
484 * Inserts the specified element into this priority queue.
485 * As the queue is unbounded, this method will never block or
486 * return {@code false}.
487 *
488 * @param e the element to add
489 * @param timeout This parameter is ignored as the method never blocks
490 * @param unit This parameter is ignored as the method never blocks
491 * @return {@code true} (as specified by
492 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
493 * @throws ClassCastException if the specified element cannot be compared
494 * with elements currently in the priority queue according to the
495 * priority queue's ordering
496 * @throws NullPointerException if the specified element is null
497 */
498 public boolean offer(E e, long timeout, TimeUnit unit) {
499 return offer(e); // never need to block
500 }
501
502 public E poll() {
503 final ReentrantLock lock = this.lock;
504 lock.lock();
505 try {
506 return dequeue();
507 } finally {
508 lock.unlock();
509 }
510 }
511
512 public E take() throws InterruptedException {
513 final ReentrantLock lock = this.lock;
514 lock.lockInterruptibly();
515 E result;
516 try {
517 while ( (result = dequeue()) == null)
518 notEmpty.await();
519 } finally {
520 lock.unlock();
521 }
522 return result;
523 }
524
525 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
526 long nanos = unit.toNanos(timeout);
527 final ReentrantLock lock = this.lock;
528 lock.lockInterruptibly();
529 E result;
530 try {
531 while ( (result = dequeue()) == null && nanos > 0)
532 nanos = notEmpty.awaitNanos(nanos);
533 } finally {
534 lock.unlock();
535 }
536 return result;
537 }
538
539 public E peek() {
540 final ReentrantLock lock = this.lock;
541 lock.lock();
542 try {
543 return (E) queue[0];
544 } finally {
545 lock.unlock();
546 }
547 }
548
549 /**
550 * Returns the comparator used to order the elements in this queue,
551 * or {@code null} if this queue uses the {@linkplain Comparable
552 * natural ordering} of its elements.
553 *
554 * @return the comparator used to order the elements in this queue,
555 * or {@code null} if this queue uses the natural
556 * ordering of its elements
557 */
558 public Comparator<? super E> comparator() {
559 return comparator;
560 }
561
562 public int size() {
563 final ReentrantLock lock = this.lock;
564 lock.lock();
565 try {
566 return size;
567 } finally {
568 lock.unlock();
569 }
570 }
571
572 /**
573 * Always returns {@code Integer.MAX_VALUE} because
574 * a {@code PriorityBlockingQueue} is not capacity constrained.
575 * @return {@code Integer.MAX_VALUE} always
576 */
577 public int remainingCapacity() {
578 return Integer.MAX_VALUE;
579 }
580
581 private int indexOf(Object o) {
582 if (o != null) {
583 final Object[] es = queue;
584 for (int i = 0, n = size; i < n; i++)
585 if (o.equals(es[i]))
586 return i;
587 }
588 return -1;
589 }
590
591 /**
592 * Removes the ith element from queue.
593 */
594 private void removeAt(int i) {
595 final Object[] es = queue;
596 final int n = size - 1;
597 if (n == i) // removed last element
598 es[i] = null;
599 else {
600 E moved = (E) es[n];
601 es[n] = null;
602 final Comparator<? super E> cmp;
603 if ((cmp = comparator) == null)
604 siftDownComparable(i, moved, es, n);
605 else
606 siftDownUsingComparator(i, moved, es, n, cmp);
607 if (es[i] == moved) {
608 if (cmp == null)
609 siftUpComparable(i, moved, es);
610 else
611 siftUpUsingComparator(i, moved, es, cmp);
612 }
613 }
614 size = n;
615 }
616
617 /**
618 * Removes a single instance of the specified element from this queue,
619 * if it is present. More formally, removes an element {@code e} such
620 * that {@code o.equals(e)}, if this queue contains one or more such
621 * elements. Returns {@code true} if and only if this queue contained
622 * the specified element (or equivalently, if this queue changed as a
623 * result of the call).
624 *
625 * @param o element to be removed from this queue, if present
626 * @return {@code true} if this queue changed as a result of the call
627 */
628 public boolean remove(Object o) {
629 final ReentrantLock lock = this.lock;
630 lock.lock();
631 try {
632 int i = indexOf(o);
633 if (i == -1)
634 return false;
635 removeAt(i);
636 return true;
637 } finally {
638 lock.unlock();
639 }
640 }
641
642 /**
643 * Identity-based version for use in Itr.remove.
644 *
645 * @param o element to be removed from this queue, if present
646 */
647 void removeEq(Object o) {
648 final ReentrantLock lock = this.lock;
649 lock.lock();
650 try {
651 final Object[] es = queue;
652 for (int i = 0, n = size; i < n; i++) {
653 if (o == es[i]) {
654 removeAt(i);
655 break;
656 }
657 }
658 } finally {
659 lock.unlock();
660 }
661 }
662
663 /**
664 * Returns {@code true} if this queue contains the specified element.
665 * More formally, returns {@code true} if and only if this queue contains
666 * at least one element {@code e} such that {@code o.equals(e)}.
667 *
668 * @param o object to be checked for containment in this queue
669 * @return {@code true} if this queue contains the specified element
670 */
671 public boolean contains(Object o) {
672 final ReentrantLock lock = this.lock;
673 lock.lock();
674 try {
675 return indexOf(o) != -1;
676 } finally {
677 lock.unlock();
678 }
679 }
680
681 public String toString() {
682 return Helpers.collectionToString(this);
683 }
684
685 /**
686 * @throws UnsupportedOperationException {@inheritDoc}
687 * @throws ClassCastException {@inheritDoc}
688 * @throws NullPointerException {@inheritDoc}
689 * @throws IllegalArgumentException {@inheritDoc}
690 */
691 public int drainTo(Collection<? super E> c) {
692 return drainTo(c, Integer.MAX_VALUE);
693 }
694
695 /**
696 * @throws UnsupportedOperationException {@inheritDoc}
697 * @throws ClassCastException {@inheritDoc}
698 * @throws NullPointerException {@inheritDoc}
699 * @throws IllegalArgumentException {@inheritDoc}
700 */
701 public int drainTo(Collection<? super E> c, int maxElements) {
702 Objects.requireNonNull(c);
703 if (c == this)
704 throw new IllegalArgumentException();
705 if (maxElements <= 0)
706 return 0;
707 final ReentrantLock lock = this.lock;
708 lock.lock();
709 try {
710 int n = Math.min(size, maxElements);
711 for (int i = 0; i < n; i++) {
712 c.add((E) queue[0]); // In this order, in case add() throws.
713 dequeue();
714 }
715 return n;
716 } finally {
717 lock.unlock();
718 }
719 }
720
721 /**
722 * Atomically removes all of the elements from this queue.
723 * The queue will be empty after this call returns.
724 */
725 public void clear() {
726 final ReentrantLock lock = this.lock;
727 lock.lock();
728 try {
729 final Object[] es = queue;
730 for (int i = 0, n = size; i < n; i++)
731 es[i] = null;
732 size = 0;
733 } finally {
734 lock.unlock();
735 }
736 }
737
738 /**
739 * Returns an array containing all of the elements in this queue.
740 * The returned array elements are in no particular order.
741 *
742 * <p>The returned array will be "safe" in that no references to it are
743 * maintained by this queue. (In other words, this method must allocate
744 * a new array). The caller is thus free to modify the returned array.
745 *
746 * <p>This method acts as bridge between array-based and collection-based
747 * APIs.
748 *
749 * @return an array containing all of the elements in this queue
750 */
751 public Object[] toArray() {
752 final ReentrantLock lock = this.lock;
753 lock.lock();
754 try {
755 return Arrays.copyOf(queue, size);
756 } finally {
757 lock.unlock();
758 }
759 }
760
761 /**
762 * Returns an array containing all of the elements in this queue; the
763 * runtime type of the returned array is that of the specified array.
764 * The returned array elements are in no particular order.
765 * If the queue fits in the specified array, it is returned therein.
766 * Otherwise, a new array is allocated with the runtime type of the
767 * specified array and the size of this queue.
768 *
769 * <p>If this queue fits in the specified array with room to spare
770 * (i.e., the array has more elements than this queue), the element in
771 * the array immediately following the end of the queue is set to
772 * {@code null}.
773 *
774 * <p>Like the {@link #toArray()} method, this method acts as bridge between
775 * array-based and collection-based APIs. Further, this method allows
776 * precise control over the runtime type of the output array, and may,
777 * under certain circumstances, be used to save allocation costs.
778 *
779 * <p>Suppose {@code x} is a queue known to contain only strings.
780 * The following code can be used to dump the queue into a newly
781 * allocated array of {@code String}:
782 *
783 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
784 *
785 * Note that {@code toArray(new Object[0])} is identical in function to
786 * {@code toArray()}.
787 *
788 * @param a the array into which the elements of the queue are to
789 * be stored, if it is big enough; otherwise, a new array of the
790 * same runtime type is allocated for this purpose
791 * @return an array containing all of the elements in this queue
792 * @throws ArrayStoreException if the runtime type of the specified array
793 * is not a supertype of the runtime type of every element in
794 * this queue
795 * @throws NullPointerException if the specified array is null
796 */
797 public <T> T[] toArray(T[] a) {
798 final ReentrantLock lock = this.lock;
799 lock.lock();
800 try {
801 int n = size;
802 if (a.length < n)
803 // Make a new array of a's runtime type, but my contents:
804 return (T[]) Arrays.copyOf(queue, size, a.getClass());
805 System.arraycopy(queue, 0, a, 0, n);
806 if (a.length > n)
807 a[n] = null;
808 return a;
809 } finally {
810 lock.unlock();
811 }
812 }
813
814 /**
815 * Returns an iterator over the elements in this queue. The
816 * iterator does not return the elements in any particular order.
817 *
818 * <p>The returned iterator is
819 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
820 *
821 * @return an iterator over the elements in this queue
822 */
823 public Iterator<E> iterator() {
824 return new Itr(toArray());
825 }
826
827 /**
828 * Snapshot iterator that works off copy of underlying q array.
829 */
830 final class Itr implements Iterator<E> {
831 final Object[] array; // Array of all elements
832 int cursor; // index of next element to return
833 int lastRet = -1; // index of last element, or -1 if no such
834
835 Itr(Object[] array) {
836 this.array = array;
837 }
838
839 public boolean hasNext() {
840 return cursor < array.length;
841 }
842
843 public E next() {
844 if (cursor >= array.length)
845 throw new NoSuchElementException();
846 return (E)array[lastRet = cursor++];
847 }
848
849 public void remove() {
850 if (lastRet < 0)
851 throw new IllegalStateException();
852 removeEq(array[lastRet]);
853 lastRet = -1;
854 }
855
856 public void forEachRemaining(Consumer<? super E> action) {
857 Objects.requireNonNull(action);
858 final Object[] es = array;
859 int i;
860 if ((i = cursor) < es.length) {
861 lastRet = -1;
862 cursor = es.length;
863 for (; i < es.length; i++)
864 action.accept((E) es[i]);
865 lastRet = es.length - 1;
866 }
867 }
868 }
869
870 /**
871 * Saves this queue to a stream (that is, serializes it).
872 *
873 * For compatibility with previous version of this class, elements
874 * are first copied to a java.util.PriorityQueue, which is then
875 * serialized.
876 *
877 * @param s the stream
878 * @throws java.io.IOException if an I/O error occurs
879 */
880 private void writeObject(java.io.ObjectOutputStream s)
881 throws java.io.IOException {
882 lock.lock();
883 try {
884 // avoid zero capacity argument
885 q = new PriorityQueue<E>(Math.max(size, 1), comparator);
886 q.addAll(this);
887 s.defaultWriteObject();
888 } finally {
889 q = null;
890 lock.unlock();
891 }
892 }
893
894 /**
895 * Reconstitutes this queue from a stream (that is, deserializes it).
896 * @param s the stream
897 * @throws ClassNotFoundException if the class of a serialized object
898 * could not be found
899 * @throws java.io.IOException if an I/O error occurs
900 */
901 private void readObject(java.io.ObjectInputStream s)
902 throws java.io.IOException, ClassNotFoundException {
903 try {
904 s.defaultReadObject();
905 int sz = q.size();
906 SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz);
907 this.queue = new Object[Math.max(1, sz)];
908 comparator = q.comparator();
909 addAll(q);
910 } finally {
911 q = null;
912 }
913 }
914
915 /**
916 * Immutable snapshot spliterator that binds to elements "late".
917 */
918 final class PBQSpliterator implements Spliterator<E> {
919 Object[] array; // null until late-bound-initialized
920 int index;
921 int fence;
922
923 PBQSpliterator() {}
924
925 PBQSpliterator(Object[] array, int index, int fence) {
926 this.array = array;
927 this.index = index;
928 this.fence = fence;
929 }
930
931 private int getFence() {
932 if (array == null)
933 fence = (array = toArray()).length;
934 return fence;
935 }
936
937 public PBQSpliterator trySplit() {
938 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
939 return (lo >= mid) ? null :
940 new PBQSpliterator(array, lo, index = mid);
941 }
942
943 public void forEachRemaining(Consumer<? super E> action) {
944 Objects.requireNonNull(action);
945 final int hi = getFence(), lo = index;
946 final Object[] es = array;
947 index = hi; // ensure exhaustion
948 for (int i = lo; i < hi; i++)
949 action.accept((E) es[i]);
950 }
951
952 public boolean tryAdvance(Consumer<? super E> action) {
953 Objects.requireNonNull(action);
954 if (getFence() > index && index >= 0) {
955 action.accept((E) array[index++]);
956 return true;
957 }
958 return false;
959 }
960
961 public long estimateSize() { return getFence() - index; }
962
963 public int characteristics() {
964 return (Spliterator.NONNULL |
965 Spliterator.SIZED |
966 Spliterator.SUBSIZED);
967 }
968 }
969
970 /**
971 * Returns a {@link Spliterator} over the elements in this queue.
972 * The spliterator does not traverse elements in any particular order
973 * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
974 *
975 * <p>The returned spliterator is
976 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
977 *
978 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
979 * {@link Spliterator#NONNULL}.
980 *
981 * @implNote
982 * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
983 *
984 * @return a {@code Spliterator} over the elements in this queue
985 * @since 1.8
986 */
987 public Spliterator<E> spliterator() {
988 return new PBQSpliterator();
989 }
990
991 /**
992 * @throws NullPointerException {@inheritDoc}
993 */
994 public boolean removeIf(Predicate<? super E> filter) {
995 Objects.requireNonNull(filter);
996 return bulkRemove(filter);
997 }
998
999 /**
1000 * @throws NullPointerException {@inheritDoc}
1001 */
1002 public boolean removeAll(Collection<?> c) {
1003 Objects.requireNonNull(c);
1004 return bulkRemove(e -> c.contains(e));
1005 }
1006
1007 /**
1008 * @throws NullPointerException {@inheritDoc}
1009 */
1010 public boolean retainAll(Collection<?> c) {
1011 Objects.requireNonNull(c);
1012 return bulkRemove(e -> !c.contains(e));
1013 }
1014
1015 // A tiny bit set implementation
1016
1017 private static long[] nBits(int n) {
1018 return new long[((n - 1) >> 6) + 1];
1019 }
1020 private static void setBit(long[] bits, int i) {
1021 bits[i >> 6] |= 1L << i;
1022 }
1023 private static boolean isClear(long[] bits, int i) {
1024 return (bits[i >> 6] & (1L << i)) == 0;
1025 }
1026
1027 /** Implementation of bulk remove methods. */
1028 private boolean bulkRemove(Predicate<? super E> filter) {
1029 final ReentrantLock lock = this.lock;
1030 lock.lock();
1031 try {
1032 final Object[] es = queue;
1033 final int end = size;
1034 int i;
1035 // Optimize for initial run of survivors
1036 for (i = 0; i < end && !filter.test((E) es[i]); i++)
1037 ;
1038 if (i >= end)
1039 return false;
1040 // Tolerate predicates that reentrantly access the
1041 // collection for read, so traverse once to find elements
1042 // to delete, a second pass to physically expunge.
1043 final int beg = i;
1044 final long[] deathRow = nBits(end - beg);
1045 deathRow[0] = 1L; // set bit 0
1046 for (i = beg + 1; i < end; i++)
1047 if (filter.test((E) es[i]))
1048 setBit(deathRow, i - beg);
1049 int w = beg;
1050 for (i = beg; i < end; i++)
1051 if (isClear(deathRow, i - beg))
1052 es[w++] = es[i];
1053 for (i = size = w; i < end; i++)
1054 es[i] = null;
1055 heapify();
1056 return true;
1057 } finally {
1058 lock.unlock();
1059 }
1060 }
1061
1062 /**
1063 * @throws NullPointerException {@inheritDoc}
1064 */
1065 public void forEach(Consumer<? super E> action) {
1066 Objects.requireNonNull(action);
1067 final ReentrantLock lock = this.lock;
1068 lock.lock();
1069 try {
1070 final Object[] es = queue;
1071 for (int i = 0, n = size; i < n; i++)
1072 action.accept((E) es[i]);
1073 } finally {
1074 lock.unlock();
1075 }
1076 }
1077
1078 // VarHandle mechanics
1079 private static final VarHandle ALLOCATIONSPINLOCK;
1080 static {
1081 try {
1082 MethodHandles.Lookup l = MethodHandles.lookup();
1083 ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1084 "allocationSpinLock",
1085 int.class);
1086 } catch (ReflectiveOperationException e) {
1087 throw new ExceptionInInitializerError(e);
1088 }
1089 }
1090 }