ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.143
Committed: Thu Jun 11 17:04:16 2020 UTC (3 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.142: +4 -18 lines
Log Message:
8230744: Several classes throw OutOfMemoryError without message

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.AbstractQueue;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Comparator;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.Objects;
18 import java.util.PriorityQueue;
19 import java.util.Queue;
20 import java.util.SortedSet;
21 import java.util.Spliterator;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import java.util.function.Predicate;
26 // OPENJDK import jdk.internal.access.SharedSecrets;
27 import jdk.internal.util.ArraysSupport;
28
29 /**
30 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
31 * the same ordering rules as class {@link PriorityQueue} and supplies
32 * blocking retrieval operations. While this queue is logically
33 * unbounded, attempted additions may fail due to resource exhaustion
34 * (causing {@code OutOfMemoryError}). This class does not permit
35 * {@code null} elements. A priority queue relying on {@linkplain
36 * Comparable natural ordering} also does not permit insertion of
37 * non-comparable objects (doing so results in
38 * {@code ClassCastException}).
39 *
40 * <p>This class and its iterator implement all of the <em>optional</em>
41 * methods of the {@link Collection} and {@link Iterator} interfaces.
42 * The Iterator provided in method {@link #iterator()} and the
43 * Spliterator provided in method {@link #spliterator()} are <em>not</em>
44 * guaranteed to traverse the elements of the PriorityBlockingQueue in
45 * any particular order. If you need ordered traversal, consider using
46 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can
47 * be used to <em>remove</em> some or all elements in priority order and
48 * place them in another collection.
49 *
50 * <p>Operations on this class make no guarantees about the ordering
51 * of elements with equal priority. If you need to enforce an
52 * ordering, you can define custom classes or comparators that use a
53 * secondary key to break ties in primary priority values. For
54 * example, here is a class that applies first-in-first-out
55 * tie-breaking to comparable elements. To use it, you would insert a
56 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
57 *
58 * <pre> {@code
59 * class FIFOEntry<E extends Comparable<? super E>>
60 * implements Comparable<FIFOEntry<E>> {
61 * static final AtomicLong seq = new AtomicLong(0);
62 * final long seqNum;
63 * final E entry;
64 * public FIFOEntry(E entry) {
65 * seqNum = seq.getAndIncrement();
66 * this.entry = entry;
67 * }
68 * public E getEntry() { return entry; }
69 * public int compareTo(FIFOEntry<E> other) {
70 * int res = entry.compareTo(other.entry);
71 * if (res == 0 && other.entry != this.entry)
72 * res = (seqNum < other.seqNum ? -1 : 1);
73 * return res;
74 * }
75 * }}</pre>
76 *
77 * <p>This class is a member of the
78 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
79 * Java Collections Framework</a>.
80 *
81 * @since 1.5
82 * @author Doug Lea
83 * @param <E> the type of elements held in this queue
84 */
85 @SuppressWarnings("unchecked")
86 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
87 implements BlockingQueue<E>, java.io.Serializable {
88 private static final long serialVersionUID = 5595510919245408276L;
89
90 /*
91 * The implementation uses an array-based binary heap, with public
92 * operations protected with a single lock. However, allocation
93 * during resizing uses a simple spinlock (used only while not
94 * holding main lock) in order to allow takes to operate
95 * concurrently with allocation. This avoids repeated
96 * postponement of waiting consumers and consequent element
97 * build-up. The need to back away from lock during allocation
98 * makes it impossible to simply wrap delegated
99 * java.util.PriorityQueue operations within a lock, as was done
100 * in a previous version of this class. To maintain
101 * interoperability, a plain PriorityQueue is still used during
102 * serialization, which maintains compatibility at the expense of
103 * transiently doubling overhead.
104 */
105
106 /**
107 * Default array capacity.
108 */
109 private static final int DEFAULT_INITIAL_CAPACITY = 11;
110
111 /**
112 * Priority queue represented as a balanced binary heap: the two
113 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
114 * priority queue is ordered by comparator, or by the elements'
115 * natural ordering, if comparator is null: For each node n in the
116 * heap and each descendant d of n, n <= d. The element with the
117 * lowest value is in queue[0], assuming the queue is nonempty.
118 */
119 private transient Object[] queue;
120
121 /**
122 * The number of elements in the priority queue.
123 */
124 private transient int size;
125
126 /**
127 * The comparator, or null if priority queue uses elements'
128 * natural ordering.
129 */
130 private transient Comparator<? super E> comparator;
131
132 /**
133 * Lock used for all public operations.
134 */
135 private final ReentrantLock lock = new ReentrantLock();
136
137 /**
138 * Condition for blocking when empty.
139 */
140 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
141 private final Condition notEmpty = lock.newCondition();
142
143 /**
144 * Spinlock for allocation, acquired via CAS.
145 */
146 private transient volatile int allocationSpinLock;
147
148 /**
149 * A plain PriorityQueue used only for serialization,
150 * to maintain compatibility with previous versions
151 * of this class. Non-null only during serialization/deserialization.
152 */
153 private PriorityQueue<E> q;
154
155 /**
156 * Creates a {@code PriorityBlockingQueue} with the default
157 * initial capacity (11) that orders its elements according to
158 * their {@linkplain Comparable natural ordering}.
159 */
160 public PriorityBlockingQueue() {
161 this(DEFAULT_INITIAL_CAPACITY, null);
162 }
163
164 /**
165 * Creates a {@code PriorityBlockingQueue} with the specified
166 * initial capacity that orders its elements according to their
167 * {@linkplain Comparable natural ordering}.
168 *
169 * @param initialCapacity the initial capacity for this priority queue
170 * @throws IllegalArgumentException if {@code initialCapacity} is less
171 * than 1
172 */
173 public PriorityBlockingQueue(int initialCapacity) {
174 this(initialCapacity, null);
175 }
176
177 /**
178 * Creates a {@code PriorityBlockingQueue} with the specified initial
179 * capacity that orders its elements according to the specified
180 * comparator.
181 *
182 * @param initialCapacity the initial capacity for this priority queue
183 * @param comparator the comparator that will be used to order this
184 * priority queue. If {@code null}, the {@linkplain Comparable
185 * natural ordering} of the elements will be used.
186 * @throws IllegalArgumentException if {@code initialCapacity} is less
187 * than 1
188 */
189 public PriorityBlockingQueue(int initialCapacity,
190 Comparator<? super E> comparator) {
191 if (initialCapacity < 1)
192 throw new IllegalArgumentException();
193 this.comparator = comparator;
194 this.queue = new Object[Math.max(1, initialCapacity)];
195 }
196
197 /**
198 * Creates a {@code PriorityBlockingQueue} containing the elements
199 * in the specified collection. If the specified collection is a
200 * {@link SortedSet} or a {@link PriorityQueue}, this
201 * priority queue will be ordered according to the same ordering.
202 * Otherwise, this priority queue will be ordered according to the
203 * {@linkplain Comparable natural ordering} of its elements.
204 *
205 * @param c the collection whose elements are to be placed
206 * into this priority queue
207 * @throws ClassCastException if elements of the specified collection
208 * cannot be compared to one another according to the priority
209 * queue's ordering
210 * @throws NullPointerException if the specified collection or any
211 * of its elements are null
212 */
213 public PriorityBlockingQueue(Collection<? extends E> c) {
214 boolean heapify = true; // true if not known to be in heap order
215 boolean screen = true; // true if must screen for nulls
216 if (c instanceof SortedSet<?>) {
217 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
218 this.comparator = (Comparator<? super E>) ss.comparator();
219 heapify = false;
220 }
221 else if (c instanceof PriorityBlockingQueue<?>) {
222 PriorityBlockingQueue<? extends E> pq =
223 (PriorityBlockingQueue<? extends E>) c;
224 this.comparator = (Comparator<? super E>) pq.comparator();
225 screen = false;
226 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
227 heapify = false;
228 }
229 Object[] es = c.toArray();
230 int n = es.length;
231 // If c.toArray incorrectly doesn't return Object[], copy it.
232 if (es.getClass() != Object[].class)
233 es = Arrays.copyOf(es, n, Object[].class);
234 if (screen && (n == 1 || this.comparator != null)) {
235 for (Object e : es)
236 if (e == null)
237 throw new NullPointerException();
238 }
239 this.queue = ensureNonEmpty(es);
240 this.size = n;
241 if (heapify)
242 heapify();
243 }
244
245 /** Ensures that queue[0] exists, helping peek() and poll(). */
246 private static Object[] ensureNonEmpty(Object[] es) {
247 return (es.length > 0) ? es : new Object[1];
248 }
249
250 /**
251 * Tries to grow array to accommodate at least one more element
252 * (but normally expand by about 50%), giving up (allowing retry)
253 * on contention (which we expect to be rare). Call only while
254 * holding lock.
255 *
256 * @param array the heap array
257 * @param oldCap the length of the array
258 */
259 private void tryGrow(Object[] array, int oldCap) {
260 lock.unlock(); // must release and then re-acquire main lock
261 Object[] newArray = null;
262 if (allocationSpinLock == 0 &&
263 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
264 try {
265 int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1;
266 int newCap = ArraysSupport.newLength(oldCap, 1, growth);
267 if (queue == array)
268 newArray = new Object[newCap];
269 } finally {
270 allocationSpinLock = 0;
271 }
272 }
273 if (newArray == null) // back off if another thread is allocating
274 Thread.yield();
275 lock.lock();
276 if (newArray != null && queue == array) {
277 queue = newArray;
278 System.arraycopy(array, 0, newArray, 0, oldCap);
279 }
280 }
281
282 /**
283 * Mechanics for poll(). Call only while holding lock.
284 */
285 private E dequeue() {
286 // assert lock.isHeldByCurrentThread();
287 final Object[] es;
288 final E result;
289
290 if ((result = (E) ((es = queue)[0])) != null) {
291 final int n;
292 final E x = (E) es[(n = --size)];
293 es[n] = null;
294 if (n > 0) {
295 final Comparator<? super E> cmp;
296 if ((cmp = comparator) == null)
297 siftDownComparable(0, x, es, n);
298 else
299 siftDownUsingComparator(0, x, es, n, cmp);
300 }
301 }
302 return result;
303 }
304
305 /**
306 * Inserts item x at position k, maintaining heap invariant by
307 * promoting x up the tree until it is greater than or equal to
308 * its parent, or is the root.
309 *
310 * To simplify and speed up coercions and comparisons, the
311 * Comparable and Comparator versions are separated into different
312 * methods that are otherwise identical. (Similarly for siftDown.)
313 *
314 * @param k the position to fill
315 * @param x the item to insert
316 * @param es the heap array
317 */
318 private static <T> void siftUpComparable(int k, T x, Object[] es) {
319 Comparable<? super T> key = (Comparable<? super T>) x;
320 while (k > 0) {
321 int parent = (k - 1) >>> 1;
322 Object e = es[parent];
323 if (key.compareTo((T) e) >= 0)
324 break;
325 es[k] = e;
326 k = parent;
327 }
328 es[k] = key;
329 }
330
331 private static <T> void siftUpUsingComparator(
332 int k, T x, Object[] es, Comparator<? super T> cmp) {
333 while (k > 0) {
334 int parent = (k - 1) >>> 1;
335 Object e = es[parent];
336 if (cmp.compare(x, (T) e) >= 0)
337 break;
338 es[k] = e;
339 k = parent;
340 }
341 es[k] = x;
342 }
343
344 /**
345 * Inserts item x at position k, maintaining heap invariant by
346 * demoting x down the tree repeatedly until it is less than or
347 * equal to its children or is a leaf.
348 *
349 * @param k the position to fill
350 * @param x the item to insert
351 * @param es the heap array
352 * @param n heap size
353 */
354 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
355 // assert n > 0;
356 Comparable<? super T> key = (Comparable<? super T>)x;
357 int half = n >>> 1; // loop while a non-leaf
358 while (k < half) {
359 int child = (k << 1) + 1; // assume left child is least
360 Object c = es[child];
361 int right = child + 1;
362 if (right < n &&
363 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
364 c = es[child = right];
365 if (key.compareTo((T) c) <= 0)
366 break;
367 es[k] = c;
368 k = child;
369 }
370 es[k] = key;
371 }
372
373 private static <T> void siftDownUsingComparator(
374 int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
375 // assert n > 0;
376 int half = n >>> 1;
377 while (k < half) {
378 int child = (k << 1) + 1;
379 Object c = es[child];
380 int right = child + 1;
381 if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
382 c = es[child = right];
383 if (cmp.compare(x, (T) c) <= 0)
384 break;
385 es[k] = c;
386 k = child;
387 }
388 es[k] = x;
389 }
390
391 /**
392 * Establishes the heap invariant (described above) in the entire tree,
393 * assuming nothing about the order of the elements prior to the call.
394 * This classic algorithm due to Floyd (1964) is known to be O(size).
395 */
396 private void heapify() {
397 final Object[] es = queue;
398 int n = size, i = (n >>> 1) - 1;
399 final Comparator<? super E> cmp;
400 if ((cmp = comparator) == null)
401 for (; i >= 0; i--)
402 siftDownComparable(i, (E) es[i], es, n);
403 else
404 for (; i >= 0; i--)
405 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
406 }
407
408 /**
409 * Inserts the specified element into this priority queue.
410 *
411 * @param e the element to add
412 * @return {@code true} (as specified by {@link Collection#add})
413 * @throws ClassCastException if the specified element cannot be compared
414 * with elements currently in the priority queue according to the
415 * priority queue's ordering
416 * @throws NullPointerException if the specified element is null
417 */
418 public boolean add(E e) {
419 return offer(e);
420 }
421
422 /**
423 * Inserts the specified element into this priority queue.
424 * As the queue is unbounded, this method will never return {@code false}.
425 *
426 * @param e the element to add
427 * @return {@code true} (as specified by {@link Queue#offer})
428 * @throws ClassCastException if the specified element cannot be compared
429 * with elements currently in the priority queue according to the
430 * priority queue's ordering
431 * @throws NullPointerException if the specified element is null
432 */
433 public boolean offer(E e) {
434 if (e == null)
435 throw new NullPointerException();
436 final ReentrantLock lock = this.lock;
437 lock.lock();
438 int n, cap;
439 Object[] es;
440 while ((n = size) >= (cap = (es = queue).length))
441 tryGrow(es, cap);
442 try {
443 final Comparator<? super E> cmp;
444 if ((cmp = comparator) == null)
445 siftUpComparable(n, e, es);
446 else
447 siftUpUsingComparator(n, e, es, cmp);
448 size = n + 1;
449 notEmpty.signal();
450 } finally {
451 lock.unlock();
452 }
453 return true;
454 }
455
456 /**
457 * Inserts the specified element into this priority queue.
458 * As the queue is unbounded, this method will never block.
459 *
460 * @param e the element to add
461 * @throws ClassCastException if the specified element cannot be compared
462 * with elements currently in the priority queue according to the
463 * priority queue's ordering
464 * @throws NullPointerException if the specified element is null
465 */
466 public void put(E e) {
467 offer(e); // never need to block
468 }
469
470 /**
471 * Inserts the specified element into this priority queue.
472 * As the queue is unbounded, this method will never block or
473 * return {@code false}.
474 *
475 * @param e the element to add
476 * @param timeout This parameter is ignored as the method never blocks
477 * @param unit This parameter is ignored as the method never blocks
478 * @return {@code true} (as specified by
479 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
480 * @throws ClassCastException if the specified element cannot be compared
481 * with elements currently in the priority queue according to the
482 * priority queue's ordering
483 * @throws NullPointerException if the specified element is null
484 */
485 public boolean offer(E e, long timeout, TimeUnit unit) {
486 return offer(e); // never need to block
487 }
488
489 public E poll() {
490 final ReentrantLock lock = this.lock;
491 lock.lock();
492 try {
493 return dequeue();
494 } finally {
495 lock.unlock();
496 }
497 }
498
499 public E take() throws InterruptedException {
500 final ReentrantLock lock = this.lock;
501 lock.lockInterruptibly();
502 E result;
503 try {
504 while ( (result = dequeue()) == null)
505 notEmpty.await();
506 } finally {
507 lock.unlock();
508 }
509 return result;
510 }
511
512 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
513 long nanos = unit.toNanos(timeout);
514 final ReentrantLock lock = this.lock;
515 lock.lockInterruptibly();
516 E result;
517 try {
518 while ( (result = dequeue()) == null && nanos > 0)
519 nanos = notEmpty.awaitNanos(nanos);
520 } finally {
521 lock.unlock();
522 }
523 return result;
524 }
525
526 public E peek() {
527 final ReentrantLock lock = this.lock;
528 lock.lock();
529 try {
530 return (E) queue[0];
531 } finally {
532 lock.unlock();
533 }
534 }
535
536 /**
537 * Returns the comparator used to order the elements in this queue,
538 * or {@code null} if this queue uses the {@linkplain Comparable
539 * natural ordering} of its elements.
540 *
541 * @return the comparator used to order the elements in this queue,
542 * or {@code null} if this queue uses the natural
543 * ordering of its elements
544 */
545 public Comparator<? super E> comparator() {
546 return comparator;
547 }
548
549 public int size() {
550 final ReentrantLock lock = this.lock;
551 lock.lock();
552 try {
553 return size;
554 } finally {
555 lock.unlock();
556 }
557 }
558
559 /**
560 * Always returns {@code Integer.MAX_VALUE} because
561 * a {@code PriorityBlockingQueue} is not capacity constrained.
562 * @return {@code Integer.MAX_VALUE} always
563 */
564 public int remainingCapacity() {
565 return Integer.MAX_VALUE;
566 }
567
568 private int indexOf(Object o) {
569 if (o != null) {
570 final Object[] es = queue;
571 for (int i = 0, n = size; i < n; i++)
572 if (o.equals(es[i]))
573 return i;
574 }
575 return -1;
576 }
577
578 /**
579 * Removes the ith element from queue.
580 */
581 private void removeAt(int i) {
582 final Object[] es = queue;
583 final int n = size - 1;
584 if (n == i) // removed last element
585 es[i] = null;
586 else {
587 E moved = (E) es[n];
588 es[n] = null;
589 final Comparator<? super E> cmp;
590 if ((cmp = comparator) == null)
591 siftDownComparable(i, moved, es, n);
592 else
593 siftDownUsingComparator(i, moved, es, n, cmp);
594 if (es[i] == moved) {
595 if (cmp == null)
596 siftUpComparable(i, moved, es);
597 else
598 siftUpUsingComparator(i, moved, es, cmp);
599 }
600 }
601 size = n;
602 }
603
604 /**
605 * Removes a single instance of the specified element from this queue,
606 * if it is present. More formally, removes an element {@code e} such
607 * that {@code o.equals(e)}, if this queue contains one or more such
608 * elements. Returns {@code true} if and only if this queue contained
609 * the specified element (or equivalently, if this queue changed as a
610 * result of the call).
611 *
612 * @param o element to be removed from this queue, if present
613 * @return {@code true} if this queue changed as a result of the call
614 */
615 public boolean remove(Object o) {
616 final ReentrantLock lock = this.lock;
617 lock.lock();
618 try {
619 int i = indexOf(o);
620 if (i == -1)
621 return false;
622 removeAt(i);
623 return true;
624 } finally {
625 lock.unlock();
626 }
627 }
628
629 /**
630 * Identity-based version for use in Itr.remove.
631 *
632 * @param o element to be removed from this queue, if present
633 */
634 void removeEq(Object o) {
635 final ReentrantLock lock = this.lock;
636 lock.lock();
637 try {
638 final Object[] es = queue;
639 for (int i = 0, n = size; i < n; i++) {
640 if (o == es[i]) {
641 removeAt(i);
642 break;
643 }
644 }
645 } finally {
646 lock.unlock();
647 }
648 }
649
650 /**
651 * Returns {@code true} if this queue contains the specified element.
652 * More formally, returns {@code true} if and only if this queue contains
653 * at least one element {@code e} such that {@code o.equals(e)}.
654 *
655 * @param o object to be checked for containment in this queue
656 * @return {@code true} if this queue contains the specified element
657 */
658 public boolean contains(Object o) {
659 final ReentrantLock lock = this.lock;
660 lock.lock();
661 try {
662 return indexOf(o) != -1;
663 } finally {
664 lock.unlock();
665 }
666 }
667
668 public String toString() {
669 return Helpers.collectionToString(this);
670 }
671
672 /**
673 * @throws UnsupportedOperationException {@inheritDoc}
674 * @throws ClassCastException {@inheritDoc}
675 * @throws NullPointerException {@inheritDoc}
676 * @throws IllegalArgumentException {@inheritDoc}
677 */
678 public int drainTo(Collection<? super E> c) {
679 return drainTo(c, Integer.MAX_VALUE);
680 }
681
682 /**
683 * @throws UnsupportedOperationException {@inheritDoc}
684 * @throws ClassCastException {@inheritDoc}
685 * @throws NullPointerException {@inheritDoc}
686 * @throws IllegalArgumentException {@inheritDoc}
687 */
688 public int drainTo(Collection<? super E> c, int maxElements) {
689 Objects.requireNonNull(c);
690 if (c == this)
691 throw new IllegalArgumentException();
692 if (maxElements <= 0)
693 return 0;
694 final ReentrantLock lock = this.lock;
695 lock.lock();
696 try {
697 int n = Math.min(size, maxElements);
698 for (int i = 0; i < n; i++) {
699 c.add((E) queue[0]); // In this order, in case add() throws.
700 dequeue();
701 }
702 return n;
703 } finally {
704 lock.unlock();
705 }
706 }
707
708 /**
709 * Atomically removes all of the elements from this queue.
710 * The queue will be empty after this call returns.
711 */
712 public void clear() {
713 final ReentrantLock lock = this.lock;
714 lock.lock();
715 try {
716 final Object[] es = queue;
717 for (int i = 0, n = size; i < n; i++)
718 es[i] = null;
719 size = 0;
720 } finally {
721 lock.unlock();
722 }
723 }
724
725 /**
726 * Returns an array containing all of the elements in this queue.
727 * The returned array elements are in no particular order.
728 *
729 * <p>The returned array will be "safe" in that no references to it are
730 * maintained by this queue. (In other words, this method must allocate
731 * a new array). The caller is thus free to modify the returned array.
732 *
733 * <p>This method acts as bridge between array-based and collection-based
734 * APIs.
735 *
736 * @return an array containing all of the elements in this queue
737 */
738 public Object[] toArray() {
739 final ReentrantLock lock = this.lock;
740 lock.lock();
741 try {
742 return Arrays.copyOf(queue, size);
743 } finally {
744 lock.unlock();
745 }
746 }
747
748 /**
749 * Returns an array containing all of the elements in this queue; the
750 * runtime type of the returned array is that of the specified array.
751 * The returned array elements are in no particular order.
752 * If the queue fits in the specified array, it is returned therein.
753 * Otherwise, a new array is allocated with the runtime type of the
754 * specified array and the size of this queue.
755 *
756 * <p>If this queue fits in the specified array with room to spare
757 * (i.e., the array has more elements than this queue), the element in
758 * the array immediately following the end of the queue is set to
759 * {@code null}.
760 *
761 * <p>Like the {@link #toArray()} method, this method acts as bridge between
762 * array-based and collection-based APIs. Further, this method allows
763 * precise control over the runtime type of the output array, and may,
764 * under certain circumstances, be used to save allocation costs.
765 *
766 * <p>Suppose {@code x} is a queue known to contain only strings.
767 * The following code can be used to dump the queue into a newly
768 * allocated array of {@code String}:
769 *
770 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
771 *
772 * Note that {@code toArray(new Object[0])} is identical in function to
773 * {@code toArray()}.
774 *
775 * @param a the array into which the elements of the queue are to
776 * be stored, if it is big enough; otherwise, a new array of the
777 * same runtime type is allocated for this purpose
778 * @return an array containing all of the elements in this queue
779 * @throws ArrayStoreException if the runtime type of the specified array
780 * is not a supertype of the runtime type of every element in
781 * this queue
782 * @throws NullPointerException if the specified array is null
783 */
784 public <T> T[] toArray(T[] a) {
785 final ReentrantLock lock = this.lock;
786 lock.lock();
787 try {
788 int n = size;
789 if (a.length < n)
790 // Make a new array of a's runtime type, but my contents:
791 return (T[]) Arrays.copyOf(queue, size, a.getClass());
792 System.arraycopy(queue, 0, a, 0, n);
793 if (a.length > n)
794 a[n] = null;
795 return a;
796 } finally {
797 lock.unlock();
798 }
799 }
800
801 /**
802 * Returns an iterator over the elements in this queue. The
803 * iterator does not return the elements in any particular order.
804 *
805 * <p>The returned iterator is
806 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
807 *
808 * @return an iterator over the elements in this queue
809 */
810 public Iterator<E> iterator() {
811 return new Itr(toArray());
812 }
813
814 /**
815 * Snapshot iterator that works off copy of underlying q array.
816 */
817 final class Itr implements Iterator<E> {
818 final Object[] array; // Array of all elements
819 int cursor; // index of next element to return
820 int lastRet = -1; // index of last element, or -1 if no such
821
822 Itr(Object[] array) {
823 this.array = array;
824 }
825
826 public boolean hasNext() {
827 return cursor < array.length;
828 }
829
830 public E next() {
831 if (cursor >= array.length)
832 throw new NoSuchElementException();
833 return (E)array[lastRet = cursor++];
834 }
835
836 public void remove() {
837 if (lastRet < 0)
838 throw new IllegalStateException();
839 removeEq(array[lastRet]);
840 lastRet = -1;
841 }
842
843 public void forEachRemaining(Consumer<? super E> action) {
844 Objects.requireNonNull(action);
845 final Object[] es = array;
846 int i;
847 if ((i = cursor) < es.length) {
848 lastRet = -1;
849 cursor = es.length;
850 for (; i < es.length; i++)
851 action.accept((E) es[i]);
852 lastRet = es.length - 1;
853 }
854 }
855 }
856
857 /**
858 * Saves this queue to a stream (that is, serializes it).
859 *
860 * For compatibility with previous version of this class, elements
861 * are first copied to a java.util.PriorityQueue, which is then
862 * serialized.
863 *
864 * @param s the stream
865 * @throws java.io.IOException if an I/O error occurs
866 */
867 private void writeObject(java.io.ObjectOutputStream s)
868 throws java.io.IOException {
869 lock.lock();
870 try {
871 // avoid zero capacity argument
872 q = new PriorityQueue<E>(Math.max(size, 1), comparator);
873 q.addAll(this);
874 s.defaultWriteObject();
875 } finally {
876 q = null;
877 lock.unlock();
878 }
879 }
880
881 /**
882 * Reconstitutes this queue from a stream (that is, deserializes it).
883 * @param s the stream
884 * @throws ClassNotFoundException if the class of a serialized object
885 * could not be found
886 * @throws java.io.IOException if an I/O error occurs
887 */
888 private void readObject(java.io.ObjectInputStream s)
889 throws java.io.IOException, ClassNotFoundException {
890 try {
891 s.defaultReadObject();
892 int sz = q.size();
893 jsr166.Platform.checkArray(s, Object[].class, sz);
894 this.queue = new Object[Math.max(1, sz)];
895 comparator = q.comparator();
896 addAll(q);
897 } finally {
898 q = null;
899 }
900 }
901
902 /**
903 * Immutable snapshot spliterator that binds to elements "late".
904 */
905 final class PBQSpliterator implements Spliterator<E> {
906 Object[] array; // null until late-bound-initialized
907 int index;
908 int fence;
909
910 PBQSpliterator() {}
911
912 PBQSpliterator(Object[] array, int index, int fence) {
913 this.array = array;
914 this.index = index;
915 this.fence = fence;
916 }
917
918 private int getFence() {
919 if (array == null)
920 fence = (array = toArray()).length;
921 return fence;
922 }
923
924 public PBQSpliterator trySplit() {
925 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
926 return (lo >= mid) ? null :
927 new PBQSpliterator(array, lo, index = mid);
928 }
929
930 public void forEachRemaining(Consumer<? super E> action) {
931 Objects.requireNonNull(action);
932 final int hi = getFence(), lo = index;
933 final Object[] es = array;
934 index = hi; // ensure exhaustion
935 for (int i = lo; i < hi; i++)
936 action.accept((E) es[i]);
937 }
938
939 public boolean tryAdvance(Consumer<? super E> action) {
940 Objects.requireNonNull(action);
941 if (getFence() > index && index >= 0) {
942 action.accept((E) array[index++]);
943 return true;
944 }
945 return false;
946 }
947
948 public long estimateSize() { return getFence() - index; }
949
950 public int characteristics() {
951 return (Spliterator.NONNULL |
952 Spliterator.SIZED |
953 Spliterator.SUBSIZED);
954 }
955 }
956
957 /**
958 * Returns a {@link Spliterator} over the elements in this queue.
959 * The spliterator does not traverse elements in any particular order
960 * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
961 *
962 * <p>The returned spliterator is
963 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
964 *
965 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
966 * {@link Spliterator#NONNULL}.
967 *
968 * @implNote
969 * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
970 *
971 * @return a {@code Spliterator} over the elements in this queue
972 * @since 1.8
973 */
974 public Spliterator<E> spliterator() {
975 return new PBQSpliterator();
976 }
977
978 /**
979 * @throws NullPointerException {@inheritDoc}
980 */
981 public boolean removeIf(Predicate<? super E> filter) {
982 Objects.requireNonNull(filter);
983 return bulkRemove(filter);
984 }
985
986 /**
987 * @throws NullPointerException {@inheritDoc}
988 */
989 public boolean removeAll(Collection<?> c) {
990 Objects.requireNonNull(c);
991 return bulkRemove(e -> c.contains(e));
992 }
993
994 /**
995 * @throws NullPointerException {@inheritDoc}
996 */
997 public boolean retainAll(Collection<?> c) {
998 Objects.requireNonNull(c);
999 return bulkRemove(e -> !c.contains(e));
1000 }
1001
1002 // A tiny bit set implementation
1003
1004 private static long[] nBits(int n) {
1005 return new long[((n - 1) >> 6) + 1];
1006 }
1007 private static void setBit(long[] bits, int i) {
1008 bits[i >> 6] |= 1L << i;
1009 }
1010 private static boolean isClear(long[] bits, int i) {
1011 return (bits[i >> 6] & (1L << i)) == 0;
1012 }
1013
1014 /** Implementation of bulk remove methods. */
1015 private boolean bulkRemove(Predicate<? super E> filter) {
1016 final ReentrantLock lock = this.lock;
1017 lock.lock();
1018 try {
1019 final Object[] es = queue;
1020 final int end = size;
1021 int i;
1022 // Optimize for initial run of survivors
1023 for (i = 0; i < end && !filter.test((E) es[i]); i++)
1024 ;
1025 if (i >= end)
1026 return false;
1027 // Tolerate predicates that reentrantly access the
1028 // collection for read, so traverse once to find elements
1029 // to delete, a second pass to physically expunge.
1030 final int beg = i;
1031 final long[] deathRow = nBits(end - beg);
1032 deathRow[0] = 1L; // set bit 0
1033 for (i = beg + 1; i < end; i++)
1034 if (filter.test((E) es[i]))
1035 setBit(deathRow, i - beg);
1036 int w = beg;
1037 for (i = beg; i < end; i++)
1038 if (isClear(deathRow, i - beg))
1039 es[w++] = es[i];
1040 for (i = size = w; i < end; i++)
1041 es[i] = null;
1042 heapify();
1043 return true;
1044 } finally {
1045 lock.unlock();
1046 }
1047 }
1048
1049 /**
1050 * @throws NullPointerException {@inheritDoc}
1051 */
1052 public void forEach(Consumer<? super E> action) {
1053 Objects.requireNonNull(action);
1054 final ReentrantLock lock = this.lock;
1055 lock.lock();
1056 try {
1057 final Object[] es = queue;
1058 for (int i = 0, n = size; i < n; i++)
1059 action.accept((E) es[i]);
1060 } finally {
1061 lock.unlock();
1062 }
1063 }
1064
1065 // VarHandle mechanics
1066 private static final VarHandle ALLOCATIONSPINLOCK;
1067 static {
1068 try {
1069 MethodHandles.Lookup l = MethodHandles.lookup();
1070 ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1071 "allocationSpinLock",
1072 int.class);
1073 } catch (ReflectiveOperationException e) {
1074 throw new ExceptionInInitializerError(e);
1075 }
1076 }
1077 }