36 |
|
* <p> This class supports an optional fairness policy for ordering |
37 |
|
* waiting producer and consumer threads. By default, this ordering |
38 |
|
* is not guaranteed. However, a queue constructed with fairness set |
39 |
< |
* to <tt>true</tt> grants threads access in FIFO order. |
39 |
> |
* to <tt>true</tt> grants threads access in FIFO order. |
40 |
|
* |
41 |
|
* <p>This class and its iterator implement all of the |
42 |
|
* <em>optional</em> methods of the {@link Collection} and {@link |
98 |
|
* of further adaptations. |
99 |
|
* 2. SynchronousQueues must block threads waiting to become |
100 |
|
* fulfilled. |
101 |
< |
* 3. Support for cancellation via timeout and interrupts, |
102 |
< |
* including cleaning out cancelled nodes/threads |
101 |
> |
* 3. Support for cancellation via timeout and interrupts, |
102 |
> |
* including cleaning out cancelled nodes/threads |
103 |
|
* from lists to avoid garbage retention and memory depletion. |
104 |
|
* |
105 |
|
* Blocking is mainly accomplished using LockSupport park/unpark, |
247 |
|
} |
248 |
|
|
249 |
|
/** |
250 |
< |
* Try to cancel a wait by matching node to itself. |
250 |
> |
* Try to cancel a wait by matching node to itself. |
251 |
|
*/ |
252 |
|
void tryCancel() { |
253 |
|
matchUpdater.compareAndSet(this, null, this); |
315 |
|
SNode h = head; |
316 |
|
if (h == null || h.mode == mode) { // empty or same-mode |
317 |
|
if (timed && nanos <= 0) { // can't wait |
318 |
< |
if (h != null && h.isCancelled()) |
318 |
> |
if (h != null && h.isCancelled()) |
319 |
|
casHead(h, h.next); // pop cancelled node |
320 |
|
else |
321 |
< |
return null; |
321 |
> |
return null; |
322 |
|
} else if (casHead(h, s = snode(s, e, h, mode))) { |
323 |
|
SNode m = awaitFulfill(s, timed, nanos); |
324 |
|
if (m == s) { // wait was cancelled |
406 |
|
return m; |
407 |
|
if (timed) { |
408 |
|
long now = System.nanoTime(); |
409 |
< |
nanos -= now - lastTime; |
409 |
> |
nanos -= now - lastTime; |
410 |
|
lastTime = now; |
411 |
|
if (nanos <= 0) { |
412 |
|
s.tryCancel(); |
437 |
|
* Unlinks s from the stack. |
438 |
|
*/ |
439 |
|
void clean(SNode s) { |
440 |
< |
s.item = null; // forget item |
440 |
> |
s.item = null; // forget item |
441 |
|
s.waiter = null; // forget thread |
442 |
|
|
443 |
|
/* |
487 |
|
volatile QNode next; // next node in queue |
488 |
|
volatile Object item; // CAS'ed to or from null |
489 |
|
volatile Thread waiter; // to control park/unpark |
490 |
< |
final boolean isData; |
490 |
> |
final boolean isData; |
491 |
|
|
492 |
|
QNode(Object item, boolean isData) { |
493 |
|
this.item = item; |
513 |
|
} |
514 |
|
|
515 |
|
/** |
516 |
< |
* Try to cancel by CAS'ing ref to this as item. |
516 |
> |
* Try to cancel by CAS'ing ref to this as item. |
517 |
|
*/ |
518 |
|
void tryCancel(Object cmp) { |
519 |
|
itemUpdater.compareAndSet(this, cmp, this); |
523 |
|
return item == this; |
524 |
|
} |
525 |
|
|
526 |
< |
/** |
526 |
> |
/** |
527 |
|
* Returns true if this node is known to be off the queue |
528 |
|
* because its next pointer has been forgotten due to |
529 |
|
* an advanceHead operation. |
591 |
|
* Puts or takes an item. |
592 |
|
*/ |
593 |
|
Object transfer(Object e, boolean timed, long nanos) { |
594 |
< |
/* Basic algorithm is to loop trying to take either of |
594 |
> |
/* Basic algorithm is to loop trying to take either of |
595 |
|
* two actions: |
596 |
|
* |
597 |
< |
* 1. If queue apparently empty or holding same-mode nodes, |
597 |
> |
* 1. If queue apparently empty or holding same-mode nodes, |
598 |
|
* try to add node to queue of waiters, wait to be |
599 |
|
* fulfilled (or cancelled) and return matching item. |
600 |
|
* |
697 |
|
return x; |
698 |
|
if (timed) { |
699 |
|
long now = System.nanoTime(); |
700 |
< |
nanos -= now - lastTime; |
700 |
> |
nanos -= now - lastTime; |
701 |
|
lastTime = now; |
702 |
|
if (nanos <= 0) { |
703 |
|
s.tryCancel(e); |
761 |
|
(dn = d.next) != null && // has successor |
762 |
|
dn != d && // that is on list |
763 |
|
dp.casNext(d, dn))) // d unspliced |
764 |
< |
casCleanMe(dp, null); |
765 |
< |
if (dp == pred) |
764 |
> |
casCleanMe(dp, null); |
765 |
> |
if (dp == pred) |
766 |
|
return; // s is already saved node |
767 |
< |
} else if (casCleanMe(null, pred)) |
767 |
> |
} else if (casCleanMe(null, pred)) |
768 |
|
return; // Postpone cleaning s |
769 |
|
} |
770 |
|
} |
817 |
|
* @throws InterruptedException {@inheritDoc} |
818 |
|
* @throws NullPointerException {@inheritDoc} |
819 |
|
*/ |
820 |
< |
public boolean offer(E o, long timeout, TimeUnit unit) |
820 |
> |
public boolean offer(E o, long timeout, TimeUnit unit) |
821 |
|
throws InterruptedException { |
822 |
|
if (o == null) throw new NullPointerException(); |
823 |
|
if (transferer.transfer(o, true, unit.toNanos(timeout)) != null) |