ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.60
Committed: Mon Oct 11 10:24:02 2010 UTC (13 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.59: +2 -1 lines
Log Message:
Avoid zero capacity argument in serialize

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19 * Comparable natural ordering} also does not permit insertion of
20 * non-comparable objects (doing so results in
21 * <tt>ClassCastException</tt>).
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces. The Iterator provided in method {@link
26 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27 * the PriorityBlockingQueue in any particular order. If you need
28 * ordered traversal, consider using
29 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30 * can be used to <em>remove</em> some or all elements in priority
31 * order and place them in another collection.
32 *
33 * <p>Operations on this class make no guarantees about the ordering
34 * of elements with equal priority. If you need to enforce an
35 * ordering, you can define custom classes or comparators that use a
36 * secondary key to break ties in primary priority values. For
37 * example, here is a class that applies first-in-first-out
38 * tie-breaking to comparable elements. To use it, you would insert a
39 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40 *
41 * <pre> {@code
42 * class FIFOEntry<E extends Comparable<? super E>>
43 * implements Comparable<FIFOEntry<E>> {
44 * static final AtomicLong seq = new AtomicLong(0);
45 * final long seqNum;
46 * final E entry;
47 * public FIFOEntry(E entry) {
48 * seqNum = seq.getAndIncrement();
49 * this.entry = entry;
50 * }
51 * public E getEntry() { return entry; }
52 * public int compareTo(FIFOEntry<E> other) {
53 * int res = entry.compareTo(other.entry);
54 * if (res == 0 && other.entry != this.entry)
55 * res = (seqNum < other.seqNum ? -1 : 1);
56 * return res;
57 * }
58 * }}</pre>
59 *
60 * <p>This class is a member of the
61 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
62 * Java Collections Framework</a>.
63 *
64 * @since 1.5
65 * @author Doug Lea
66 * @param <E> the type of elements held in this collection
67 */
68 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 implements BlockingQueue<E>, java.io.Serializable {
70 private static final long serialVersionUID = 5595510919245408276L;
71
72 /*
73 * This implementation is a variant of the one in
74 * java.util.PriorityQueue, with public operations protected with
75 * a single lock. However, allocation during resizing uses a
76 * simple spinlock (used only while not holding main lock) in
77 * order to allow takes to operate concurrently with allocation.
78 * This avoids repeated postponement of waiting consumers and
79 * consequent build-up. The need to back away from lock during
80 * allocation makes it impossible to simply wrap delegated
81 * java.util.PriorityQueue operations within a lock; hence code
82 * duplication.
83 */
84
85 /**
86 * Default array capacity.
87 */
88 private static final int DEFAULT_INITIAL_CAPACITY = 11;
89
90 /**
91 * Priority queue represented as a balanced binary heap: the two
92 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
93 * priority queue is ordered by comparator, or by the elements'
94 * natural ordering, if comparator is null: For each node n in the
95 * heap and each descendant d of n, n <= d. The element with the
96 * lowest value is in queue[0], assuming the queue is nonempty.
97 */
98 private transient Object[] queue;
99
100 /**
101 * The number of elements in the priority queue.
102 */
103 private transient int size = 0;
104
105 /**
106 * The comparator, or null if priority queue uses elements'
107 * natural ordering.
108 */
109 private transient Comparator<? super E> comparator;
110
111 /**
112 * A plain PriorityQueue used only for serialization,
113 * to maintain compatibility with previous versions
114 * of this class. Non-null only during serialization/deserialization
115 */
116 private PriorityQueue q;
117
118 /**
119 * Lock used for all public operations
120 */
121 final ReentrantLock lock = new ReentrantLock();
122 private final Condition notEmpty = lock.newCondition();
123
124 /**
125 * Spinlock for allocation, acquired via CAS.
126 */
127 private transient volatile int allocationSpinLock;
128
129 /**
130 * Creates a <tt>PriorityBlockingQueue</tt> with the default
131 * initial capacity (11) that orders its elements according to
132 * their {@linkplain Comparable natural ordering}.
133 */
134 public PriorityBlockingQueue() {
135 this(DEFAULT_INITIAL_CAPACITY, null);
136 }
137
138 /**
139 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
140 * initial capacity that orders its elements according to their
141 * {@linkplain Comparable natural ordering}.
142 *
143 * @param initialCapacity the initial capacity for this priority queue
144 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
145 * than 1
146 */
147 public PriorityBlockingQueue(int initialCapacity) {
148 this(initialCapacity, null);
149 }
150
151 /**
152 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
153 * capacity that orders its elements according to the specified
154 * comparator.
155 *
156 * @param initialCapacity the initial capacity for this priority queue
157 * @param comparator the comparator that will be used to order this
158 * priority queue. If {@code null}, the {@linkplain Comparable
159 * natural ordering} of the elements will be used.
160 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
161 * than 1
162 */
163 public PriorityBlockingQueue(int initialCapacity,
164 Comparator<? super E> comparator) {
165 if (initialCapacity < 1)
166 throw new IllegalArgumentException();
167 this.queue = new Object[initialCapacity];
168 this.comparator = comparator;
169 }
170
171 /**
172 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
173 * in the specified collection. If the specified collection is a
174 * {@link SortedSet} or a {@link PriorityQueue}, this
175 * priority queue will be ordered according to the same ordering.
176 * Otherwise, this priority queue will be ordered according to the
177 * {@linkplain Comparable natural ordering} of its elements.
178 *
179 * @param c the collection whose elements are to be placed
180 * into this priority queue
181 * @throws ClassCastException if elements of the specified collection
182 * cannot be compared to one another according to the priority
183 * queue's ordering
184 * @throws NullPointerException if the specified collection or any
185 * of its elements are null
186 */
187 public PriorityBlockingQueue(Collection<? extends E> c) {
188 if (c instanceof SortedSet<?>) {
189 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
190 this.comparator = (Comparator<? super E>) ss.comparator();
191 initElementsFromCollection(ss);
192 }
193 else if (c instanceof PriorityBlockingQueue<?>) {
194 PriorityBlockingQueue<? extends E> pq =
195 (PriorityBlockingQueue<? extends E>) c;
196 this.comparator = (Comparator<? super E>) pq.comparator();
197 initFromPriorityBlockingQueue(pq);
198 }
199 else {
200 this.comparator = null;
201 initFromCollection(c);
202 }
203 }
204
205 private void initFromPriorityBlockingQueue(PriorityBlockingQueue<? extends E> c) {
206 if (c.getClass() == PriorityBlockingQueue.class) {
207 this.queue = c.toArray();
208 this.size = c.size();
209 } else {
210 initFromCollection(c);
211 }
212 }
213
214 private void initElementsFromCollection(Collection<? extends E> c) {
215 Object[] a = c.toArray();
216 // If c.toArray incorrectly doesn't return Object[], copy it.
217 if (a.getClass() != Object[].class)
218 a = Arrays.copyOf(a, a.length, Object[].class);
219 int len = a.length;
220 if (len == 1 || this.comparator != null)
221 for (int i = 0; i < len; i++)
222 if (a[i] == null)
223 throw new NullPointerException();
224 this.queue = a;
225 this.size = a.length;
226 }
227
228 /**
229 * Initializes queue array with elements from the given Collection.
230 *
231 * @param c the collection
232 */
233 private void initFromCollection(Collection<? extends E> c) {
234 initElementsFromCollection(c);
235 heapify();
236 }
237
238 /**
239 * Tries to grow array to at least minCap, giving up (allowing
240 * retry) on contention. Call only while holding lock.
241 */
242 private void tryGrow(int minCap, Object[] array, int oldCap) {
243 lock.unlock(); // must release and then re-acquire main lock
244 Object[] newArray = null;
245 if (allocationSpinLock == 0 &&
246 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
247 0, 1)) {
248 try {
249 int newCap = oldCap + ((oldCap < 64) ?
250 (oldCap + 2) :
251 (oldCap >> 1));
252 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
253 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
254 throw new OutOfMemoryError();
255 newCap = MAX_ARRAY_SIZE;
256 }
257 if (queue == array && newCap > array.length)
258 newArray = new Object[newCap];
259 } finally {
260 allocationSpinLock = 0;
261 }
262 }
263 else
264 Thread.yield();
265 lock.lock();
266 if (newArray != null && queue == array) {
267 System.arraycopy(array, 0, newArray, 0, minCap);
268 queue = newArray;
269 }
270 }
271
272 /**
273 * Mechanics for poll(), call only while holding lock
274 */
275 private E internalPoll() {
276 int s = size - 1;
277 if (s >= 0) {
278 size = s;
279 E result = (E) queue[0];
280 E x = (E) queue[s];
281 queue[s] = null;
282 if (s != 0)
283 siftDown(0, x);
284 return result;
285 }
286 else
287 return null;
288 }
289
290 /**
291 * Inserts item x at position k, maintaining heap invariant by
292 * promoting x up the tree until it is greater than or equal to
293 * its parent, or is the root.
294 *
295 * To simplify and speed up coercions and comparisons. the
296 * Comparable and Comparator versions are separated into different
297 * methods that are otherwise identical. (Similarly for siftDown.)
298 *
299 * @param k the position to fill
300 * @param x the item to insert
301 */
302 private void siftUp(int k, E x) {
303 if (comparator != null)
304 siftUpUsingComparator(k, x);
305 else
306 siftUpComparable(k, x);
307 }
308
309 private void siftUpComparable(int k, E x) {
310 Comparable<? super E> key = (Comparable<? super E>) x;
311 while (k > 0) {
312 int parent = (k - 1) >>> 1;
313 Object e = queue[parent];
314 if (key.compareTo((E) e) >= 0)
315 break;
316 queue[k] = e;
317 k = parent;
318 }
319 queue[k] = key;
320 }
321
322 private void siftUpUsingComparator(int k, E x) {
323 while (k > 0) {
324 int parent = (k - 1) >>> 1;
325 Object e = queue[parent];
326 if (comparator.compare(x, (E) e) >= 0)
327 break;
328 queue[k] = e;
329 k = parent;
330 }
331 queue[k] = x;
332 }
333
334 /**
335 * Inserts item x at position k, maintaining heap invariant by
336 * demoting x down the tree repeatedly until it is less than or
337 * equal to its children or is a leaf.
338 *
339 * @param k the position to fill
340 * @param x the item to insert
341 */
342 private void siftDown(int k, E x) {
343 if (comparator != null)
344 siftDownUsingComparator(k, x);
345 else
346 siftDownComparable(k, x);
347 }
348
349 private void siftDownComparable(int k, E x) {
350 Comparable<? super E> key = (Comparable<? super E>)x;
351 int half = size >>> 1; // loop while a non-leaf
352 while (k < half) {
353 int child = (k << 1) + 1; // assume left child is least
354 Object c = queue[child];
355 int right = child + 1;
356 if (right < size &&
357 ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
358 c = queue[child = right];
359 if (key.compareTo((E) c) <= 0)
360 break;
361 queue[k] = c;
362 k = child;
363 }
364 queue[k] = key;
365 }
366
367 private void siftDownUsingComparator(int k, E x) {
368 int half = size >>> 1;
369 while (k < half) {
370 int child = (k << 1) + 1;
371 Object c = queue[child];
372 int right = child + 1;
373 if (right < size &&
374 comparator.compare((E) c, (E) queue[right]) > 0)
375 c = queue[child = right];
376 if (comparator.compare(x, (E) c) <= 0)
377 break;
378 queue[k] = c;
379 k = child;
380 }
381 queue[k] = x;
382 }
383
384 /**
385 * Establishes the heap invariant (described above) in the entire tree,
386 * assuming nothing about the order of the elements prior to the call.
387 */
388 private void heapify() {
389 for (int i = (size >>> 1) - 1; i >= 0; i--)
390 siftDown(i, (E) queue[i]);
391 }
392
393 /**
394 * The maximum size of array to allocate.
395 * Some VMs reserve some header words in an array.
396 * Attempts to allocate larger arrays may result in
397 * OutOfMemoryError: Requested array size exceeds VM limit
398 */
399 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
400
401 /**
402 * Inserts the specified element into this priority queue.
403 *
404 * @param e the element to add
405 * @return <tt>true</tt> (as specified by {@link Collection#add})
406 * @throws ClassCastException if the specified element cannot be compared
407 * with elements currently in the priority queue according to the
408 * priority queue's ordering
409 * @throws NullPointerException if the specified element is null
410 */
411 public boolean add(E e) {
412 return offer(e);
413 }
414
415 /**
416 * Inserts the specified element into this priority queue.
417 *
418 * @param e the element to add
419 * @return <tt>true</tt> (as specified by {@link Queue#offer})
420 * @throws ClassCastException if the specified element cannot be compared
421 * with elements currently in the priority queue according to the
422 * priority queue's ordering
423 * @throws NullPointerException if the specified element is null
424 */
425 public boolean offer(E e) {
426 if (e == null)
427 throw new NullPointerException();
428 final ReentrantLock lock = this.lock;
429 lock.lock();
430 int len, cap;
431 Object[] array;
432 while ((len = size) >= (cap = (array = queue).length))
433 tryGrow(len, array, cap);
434 try {
435 size = len + 1;
436 if (len == 0)
437 array[0] = e;
438 else
439 siftUp(len, e);
440 notEmpty.signal();
441 } finally {
442 lock.unlock();
443 }
444 return true;
445 }
446
447 /**
448 * Inserts the specified element into this priority queue. As the queue is
449 * unbounded this method will never block.
450 *
451 * @param e the element to add
452 * @throws ClassCastException if the specified element cannot be compared
453 * with elements currently in the priority queue according to the
454 * priority queue's ordering
455 * @throws NullPointerException if the specified element is null
456 */
457 public void put(E e) {
458 offer(e); // never need to block
459 }
460
461 /**
462 * Inserts the specified element into this priority queue. As the queue is
463 * unbounded this method will never block.
464 *
465 * @param e the element to add
466 * @param timeout This parameter is ignored as the method never blocks
467 * @param unit This parameter is ignored as the method never blocks
468 * @return <tt>true</tt>
469 * @throws ClassCastException if the specified element cannot be compared
470 * with elements currently in the priority queue according to the
471 * priority queue's ordering
472 * @throws NullPointerException if the specified element is null
473 */
474 public boolean offer(E e, long timeout, TimeUnit unit) {
475 return offer(e); // never need to block
476 }
477
478 public E poll() {
479 final ReentrantLock lock = this.lock;
480 lock.lock();
481 try {
482 return internalPoll();
483 } finally {
484 lock.unlock();
485 }
486 }
487
488 public E take() throws InterruptedException {
489 E result = null;
490 final ReentrantLock lock = this.lock;
491 lock.lockInterruptibly();
492 try {
493 while ( (result = internalPoll()) == null)
494 notEmpty.await();
495 } finally {
496 lock.unlock();
497 }
498 return result;
499 }
500
501 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
502 long nanos = unit.toNanos(timeout);
503 E result = null;
504 final ReentrantLock lock = this.lock;
505 lock.lockInterruptibly();
506 try {
507 while ( (result = internalPoll()) == null && nanos > 0)
508 nanos = notEmpty.awaitNanos(nanos);
509 } finally {
510 lock.unlock();
511 }
512 return result;
513 }
514
515 public E peek() {
516 E result = null;
517 final ReentrantLock lock = this.lock;
518 lock.lock();
519 try {
520 if (size >= 0)
521 result = (E) queue[0];
522 } finally {
523 lock.unlock();
524 }
525 return result;
526 }
527
528 /**
529 * Returns the comparator used to order the elements in this queue,
530 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
531 * natural ordering} of its elements.
532 *
533 * @return the comparator used to order the elements in this queue,
534 * or <tt>null</tt> if this queue uses the natural
535 * ordering of its elements
536 */
537 public Comparator<? super E> comparator() {
538 return comparator;
539 }
540
541 public int size() {
542 int n;
543 final ReentrantLock lock = this.lock;
544 lock.lock();
545 try {
546 n = size;
547 } finally {
548 lock.unlock();
549 }
550 return n;
551 }
552
553 /**
554 * Always returns <tt>Integer.MAX_VALUE</tt> because
555 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
556 * @return <tt>Integer.MAX_VALUE</tt>
557 */
558 public int remainingCapacity() {
559 return Integer.MAX_VALUE;
560 }
561
562 private int indexOf(Object o) {
563 if (o != null) {
564 for (int i = 0; i < size; i++)
565 if (o.equals(queue[i]))
566 return i;
567 }
568 return -1;
569 }
570
571 /**
572 * Removes the ith element from queue.
573 */
574 private void removeAt(int i) {
575 int s = --size;
576 if (s == i) // removed last element
577 queue[i] = null;
578 else {
579 E moved = (E) queue[s];
580 queue[s] = null;
581 siftDown(i, moved);
582 if (queue[i] == moved)
583 siftUp(i, moved);
584 }
585 }
586
587 /**
588 * Removes a single instance of the specified element from this queue,
589 * if it is present. More formally, removes an element {@code e} such
590 * that {@code o.equals(e)}, if this queue contains one or more such
591 * elements. Returns {@code true} if and only if this queue contained
592 * the specified element (or equivalently, if this queue changed as a
593 * result of the call).
594 *
595 * @param o element to be removed from this queue, if present
596 * @return <tt>true</tt> if this queue changed as a result of the call
597 */
598 public boolean remove(Object o) {
599 boolean removed = false;
600 final ReentrantLock lock = this.lock;
601 lock.lock();
602 try {
603 int i = indexOf(o);
604 if (i != -1) {
605 removeAt(i);
606 removed = true;
607 }
608 } finally {
609 lock.unlock();
610 }
611 return removed;
612 }
613
614
615 /**
616 * Identity-based version for use in Itr.remove
617 */
618 private void removeEQ(Object o) {
619 final ReentrantLock lock = this.lock;
620 lock.lock();
621 try {
622 for (int i = 0; i < size; i++) {
623 if (o == queue[i]) {
624 removeAt(i);
625 break;
626 }
627 }
628 } finally {
629 lock.unlock();
630 }
631 }
632
633 /**
634 * Returns {@code true} if this queue contains the specified element.
635 * More formally, returns {@code true} if and only if this queue contains
636 * at least one element {@code e} such that {@code o.equals(e)}.
637 *
638 * @param o object to be checked for containment in this queue
639 * @return <tt>true</tt> if this queue contains the specified element
640 */
641 public boolean contains(Object o) {
642 int index;
643 final ReentrantLock lock = this.lock;
644 lock.lock();
645 try {
646 index = indexOf(o);
647 } finally {
648 lock.unlock();
649 }
650 return index != -1;
651 }
652
653 /**
654 * Returns an array containing all of the elements in this queue.
655 * The returned array elements are in no particular order.
656 *
657 * <p>The returned array will be "safe" in that no references to it are
658 * maintained by this queue. (In other words, this method must allocate
659 * a new array). The caller is thus free to modify the returned array.
660 *
661 * <p>This method acts as bridge between array-based and collection-based
662 * APIs.
663 *
664 * @return an array containing all of the elements in this queue
665 */
666 public Object[] toArray() {
667 final ReentrantLock lock = this.lock;
668 lock.lock();
669 try {
670 return Arrays.copyOf(queue, size);
671 } finally {
672 lock.unlock();
673 }
674 }
675
676
677 public String toString() {
678 final ReentrantLock lock = this.lock;
679 lock.lock();
680 try {
681 int n = size;
682 if (n == 0)
683 return "[]";
684 StringBuilder sb = new StringBuilder();
685 sb.append('[');
686 for (int i = 0; i < n; ++i) {
687 E e = (E)queue[i];
688 sb.append(e == this ? "(this Collection)" : e);
689 if (i != n - 1)
690 sb.append(',').append(' ');
691 }
692 return sb.append(']').toString();
693 } finally {
694 lock.unlock();
695 }
696 }
697
698 /**
699 * @throws UnsupportedOperationException {@inheritDoc}
700 * @throws ClassCastException {@inheritDoc}
701 * @throws NullPointerException {@inheritDoc}
702 * @throws IllegalArgumentException {@inheritDoc}
703 */
704 public int drainTo(Collection<? super E> c) {
705 if (c == null)
706 throw new NullPointerException();
707 if (c == this)
708 throw new IllegalArgumentException();
709 final ReentrantLock lock = this.lock;
710 lock.lock();
711 try {
712 int n = 0;
713 E e;
714 while ( (e = internalPoll()) != null) {
715 c.add(e);
716 ++n;
717 }
718 return n;
719 } finally {
720 lock.unlock();
721 }
722 }
723
724 /**
725 * @throws UnsupportedOperationException {@inheritDoc}
726 * @throws ClassCastException {@inheritDoc}
727 * @throws NullPointerException {@inheritDoc}
728 * @throws IllegalArgumentException {@inheritDoc}
729 */
730 public int drainTo(Collection<? super E> c, int maxElements) {
731 if (c == null)
732 throw new NullPointerException();
733 if (c == this)
734 throw new IllegalArgumentException();
735 if (maxElements <= 0)
736 return 0;
737 final ReentrantLock lock = this.lock;
738 lock.lock();
739 try {
740 int n = 0;
741 E e;
742 while (n < maxElements && (e = internalPoll()) != null) {
743 c.add(e);
744 ++n;
745 }
746 return n;
747 } finally {
748 lock.unlock();
749 }
750 }
751
752 /**
753 * Atomically removes all of the elements from this queue.
754 * The queue will be empty after this call returns.
755 */
756 public void clear() {
757 final ReentrantLock lock = this.lock;
758 lock.lock();
759 try {
760 for (int i = 0; i < size; i++)
761 queue[i] = null;
762 size = 0;
763 } finally {
764 lock.unlock();
765 }
766 }
767
768 /**
769 * Returns an array containing all of the elements in this queue; the
770 * runtime type of the returned array is that of the specified array.
771 * The returned array elements are in no particular order.
772 * If the queue fits in the specified array, it is returned therein.
773 * Otherwise, a new array is allocated with the runtime type of the
774 * specified array and the size of this queue.
775 *
776 * <p>If this queue fits in the specified array with room to spare
777 * (i.e., the array has more elements than this queue), the element in
778 * the array immediately following the end of the queue is set to
779 * <tt>null</tt>.
780 *
781 * <p>Like the {@link #toArray()} method, this method acts as bridge between
782 * array-based and collection-based APIs. Further, this method allows
783 * precise control over the runtime type of the output array, and may,
784 * under certain circumstances, be used to save allocation costs.
785 *
786 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
787 * The following code can be used to dump the queue into a newly
788 * allocated array of <tt>String</tt>:
789 *
790 * <pre>
791 * String[] y = x.toArray(new String[0]);</pre>
792 *
793 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
794 * <tt>toArray()</tt>.
795 *
796 * @param a the array into which the elements of the queue are to
797 * be stored, if it is big enough; otherwise, a new array of the
798 * same runtime type is allocated for this purpose
799 * @return an array containing all of the elements in this queue
800 * @throws ArrayStoreException if the runtime type of the specified array
801 * is not a supertype of the runtime type of every element in
802 * this queue
803 * @throws NullPointerException if the specified array is null
804 */
805 public <T> T[] toArray(T[] a) {
806 final ReentrantLock lock = this.lock;
807 lock.lock();
808 try {
809 if (a.length < size)
810 // Make a new array of a's runtime type, but my contents:
811 return (T[]) Arrays.copyOf(queue, size, a.getClass());
812 System.arraycopy(queue, 0, a, 0, size);
813 if (a.length > size)
814 a[size] = null;
815 return a;
816 } finally {
817 lock.unlock();
818 }
819 }
820
821 /**
822 * Returns an iterator over the elements in this queue. The
823 * iterator does not return the elements in any particular order.
824 * The returned <tt>Iterator</tt> is a "weakly consistent"
825 * iterator that will never throw {@link
826 * ConcurrentModificationException}, and guarantees to traverse
827 * elements as they existed upon construction of the iterator, and
828 * may (but is not guaranteed to) reflect any modifications
829 * subsequent to construction.
830 *
831 * @return an iterator over the elements in this queue
832 */
833 public Iterator<E> iterator() {
834 return new Itr(toArray());
835 }
836
837 /**
838 * Snapshot iterator that works off copy of underlying q array.
839 */
840 final class Itr implements Iterator<E> {
841 final Object[] array; // Array of all elements
842 int cursor; // index of next element to return;
843 int lastRet; // index of last element, or -1 if no such
844
845 Itr(Object[] array) {
846 lastRet = -1;
847 this.array = array;
848 }
849
850 public boolean hasNext() {
851 return cursor < array.length;
852 }
853
854 public E next() {
855 if (cursor >= array.length)
856 throw new NoSuchElementException();
857 lastRet = cursor;
858 return (E)array[cursor++];
859 }
860
861 public void remove() {
862 if (lastRet < 0)
863 throw new IllegalStateException();
864 removeEQ(array[lastRet]);
865 lastRet = -1;
866 }
867 }
868
869 /**
870 * Saves the state to a stream (that is, serializes it). For
871 * compatibility with previous version of this class,
872 * elements are first copied to a java.util.PriorityQueue,
873 * which is then serialized.
874 */
875 private void writeObject(java.io.ObjectOutputStream s)
876 throws java.io.IOException {
877 lock.lock();
878 try {
879 int n = size; // avoid zero capacity argument
880 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
881 q.addAll(this);
882 s.defaultWriteObject();
883 q = null;
884 } finally {
885 lock.unlock();
886 }
887 }
888
889 /**
890 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
891 * (that is, deserializes it).
892 *
893 * @param s the stream
894 */
895 private void readObject(java.io.ObjectInputStream s)
896 throws java.io.IOException, ClassNotFoundException {
897 s.defaultReadObject();
898 this.queue = new Object[q.size()];
899 comparator = q.comparator();
900 addAll(q);
901 q = null;
902 }
903
904 // Unsafe mechanics
905 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
906 private static final long allocationSpinLockOffset =
907 objectFieldOffset(UNSAFE, "allocationSpinLock",
908 PriorityBlockingQueue.class);
909
910 static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
911 String field, Class<?> klazz) {
912 try {
913 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
914 } catch (NoSuchFieldException e) {
915 // Convert Exception to corresponding Error
916 NoSuchFieldError error = new NoSuchFieldError(field);
917 error.initCause(e);
918 throw error;
919 }
920 }
921
922 }