ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
Revision: 1.16
Committed: Thu Nov 19 01:08:34 2015 UTC (8 years, 6 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.15: +2 -2 lines
Log Message:
use spinForTimeoutThreshold consistently

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent.locks;
8
9 import java.util.concurrent.TimeUnit;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.Date;
13 import sun.misc.Unsafe;
14
15 /**
16 * A version of {@link AbstractQueuedSynchronizer} in
17 * which synchronization state is maintained as a {@code long}.
18 * This class has exactly the same structure, properties, and methods
19 * as {@code AbstractQueuedSynchronizer} with the exception
20 * that all state-related parameters and results are defined
21 * as {@code long} rather than {@code int}. This class
22 * may be useful when creating synchronizers such as
23 * multilevel locks and barriers that require
24 * 64 bits of state.
25 *
26 * <p>See {@link AbstractQueuedSynchronizer} for usage
27 * notes and examples.
28 *
29 * @since 1.6
30 * @author Doug Lea
31 */
32 public abstract class AbstractQueuedLongSynchronizer
33 extends AbstractOwnableSynchronizer
34 implements java.io.Serializable {
35
36 private static final long serialVersionUID = 7373984972572414692L;
37
38 /*
39 To keep sources in sync, the remainder of this source file is
40 exactly cloned from AbstractQueuedSynchronizer, replacing class
41 name and changing ints related with sync state to longs. Please
42 keep it that way.
43 */
44
45 /**
46 * Creates a new {@code AbstractQueuedLongSynchronizer} instance
47 * with initial synchronization state of zero.
48 */
49 protected AbstractQueuedLongSynchronizer() { }
50
51 /**
52 * Wait queue node class.
53 *
54 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
55 * Hagersten) lock queue. CLH locks are normally used for
56 * spinlocks. We instead use them for blocking synchronizers, but
57 * use the same basic tactic of holding some of the control
58 * information about a thread in the predecessor of its node. A
59 * "status" field in each node keeps track of whether a thread
60 * should block. A node is signalled when its predecessor
61 * releases. Each node of the queue otherwise serves as a
62 * specific-notification-style monitor holding a single waiting
63 * thread. The status field does NOT control whether threads are
64 * granted locks etc though. A thread may try to acquire if it is
65 * first in the queue. But being first does not guarantee success;
66 * it only gives the right to contend. So the currently released
67 * contender thread may need to rewait.
68 *
69 * <p>To enqueue into a CLH lock, you atomically splice it in as new
70 * tail. To dequeue, you just set the head field.
71 * <pre>
72 * +------+ prev +-----+ +-----+
73 * head | | <---- | | <---- | | tail
74 * +------+ +-----+ +-----+
75 * </pre>
76 *
77 * <p>Insertion into a CLH queue requires only a single atomic
78 * operation on "tail", so there is a simple atomic point of
79 * demarcation from unqueued to queued. Similarly, dequeuing
80 * involves only updating the "head". However, it takes a bit
81 * more work for nodes to determine who their successors are,
82 * in part to deal with possible cancellation due to timeouts
83 * and interrupts.
84 *
85 * <p>The "prev" links (not used in original CLH locks), are mainly
86 * needed to handle cancellation. If a node is cancelled, its
87 * successor is (normally) relinked to a non-cancelled
88 * predecessor. For explanation of similar mechanics in the case
89 * of spin locks, see the papers by Scott and Scherer at
90 * http://www.cs.rochester.edu/u/scott/synchronization/
91 *
92 * <p>We also use "next" links to implement blocking mechanics.
93 * The thread id for each node is kept in its own node, so a
94 * predecessor signals the next node to wake up by traversing
95 * next link to determine which thread it is. Determination of
96 * successor must avoid races with newly queued nodes to set
97 * the "next" fields of their predecessors. This is solved
98 * when necessary by checking backwards from the atomically
99 * updated "tail" when a node's successor appears to be null.
100 * (Or, said differently, the next-links are an optimization
101 * so that we don't usually need a backward scan.)
102 *
103 * <p>Cancellation introduces some conservatism to the basic
104 * algorithms. Since we must poll for cancellation of other
105 * nodes, we can miss noticing whether a cancelled node is
106 * ahead or behind us. This is dealt with by always unparking
107 * successors upon cancellation, allowing them to stabilize on
108 * a new predecessor, unless we can identify an uncancelled
109 * predecessor who will carry this responsibility.
110 *
111 * <p>CLH queues need a dummy header node to get started. But
112 * we don't create them on construction, because it would be wasted
113 * effort if there is never contention. Instead, the node
114 * is constructed and head and tail pointers are set upon first
115 * contention.
116 *
117 * <p>Threads waiting on Conditions use the same nodes, but
118 * use an additional link. Conditions only need to link nodes
119 * in simple (non-concurrent) linked queues because they are
120 * only accessed when exclusively held. Upon await, a node is
121 * inserted into a condition queue. Upon signal, the node is
122 * transferred to the main queue. A special value of status
123 * field is used to mark which queue a node is on.
124 *
125 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
126 * Scherer and Michael Scott, along with members of JSR-166
127 * expert group, for helpful ideas, discussions, and critiques
128 * on the design of this class.
129 */
130 static final class Node {
131 /** Marker to indicate a node is waiting in shared mode */
132 static final Node SHARED = new Node();
133 /** Marker to indicate a node is waiting in exclusive mode */
134 static final Node EXCLUSIVE = null;
135
136 /** waitStatus value to indicate thread has cancelled */
137 static final int CANCELLED = 1;
138 /** waitStatus value to indicate successor's thread needs unparking */
139 static final int SIGNAL = -1;
140 /** waitStatus value to indicate thread is waiting on condition */
141 static final int CONDITION = -2;
142 /**
143 * waitStatus value to indicate the next acquireShared should
144 * unconditionally propagate
145 */
146 static final int PROPAGATE = -3;
147
148 /**
149 * Status field, taking on only the values:
150 * SIGNAL: The successor of this node is (or will soon be)
151 * blocked (via park), so the current node must
152 * unpark its successor when it releases or
153 * cancels. To avoid races, acquire methods must
154 * first indicate they need a signal,
155 * then retry the atomic acquire, and then,
156 * on failure, block.
157 * CANCELLED: This node is cancelled due to timeout or interrupt.
158 * Nodes never leave this state. In particular,
159 * a thread with cancelled node never again blocks.
160 * CONDITION: This node is currently on a condition queue.
161 * It will not be used as a sync queue node
162 * until transferred, at which time the status
163 * will be set to 0. (Use of this value here has
164 * nothing to do with the other uses of the
165 * field, but simplifies mechanics.)
166 * PROPAGATE: A releaseShared should be propagated to other
167 * nodes. This is set (for head node only) in
168 * doReleaseShared to ensure propagation
169 * continues, even if other operations have
170 * since intervened.
171 * 0: None of the above
172 *
173 * The values are arranged numerically to simplify use.
174 * Non-negative values mean that a node doesn't need to
175 * signal. So, most code doesn't need to check for particular
176 * values, just for sign.
177 *
178 * The field is initialized to 0 for normal sync nodes, and
179 * CONDITION for condition nodes. It is modified using CAS
180 * (or when possible, unconditional volatile writes).
181 */
182 volatile int waitStatus;
183
184 /**
185 * Link to predecessor node that current node/thread relies on
186 * for checking waitStatus. Assigned during enqueuing, and nulled
187 * out (for sake of GC) only upon dequeuing. Also, upon
188 * cancellation of a predecessor, we short-circuit while
189 * finding a non-cancelled one, which will always exist
190 * because the head node is never cancelled: A node becomes
191 * head only as a result of successful acquire. A
192 * cancelled thread never succeeds in acquiring, and a thread only
193 * cancels itself, not any other node.
194 */
195 volatile Node prev;
196
197 /**
198 * Link to the successor node that the current node/thread
199 * unparks upon release. Assigned during enqueuing, adjusted
200 * when bypassing cancelled predecessors, and nulled out (for
201 * sake of GC) when dequeued. The enq operation does not
202 * assign next field of a predecessor until after attachment,
203 * so seeing a null next field does not necessarily mean that
204 * node is at end of queue. However, if a next field appears
205 * to be null, we can scan prev's from the tail to
206 * double-check. The next field of cancelled nodes is set to
207 * point to the node itself instead of null, to make life
208 * easier for isOnSyncQueue.
209 */
210 volatile Node next;
211
212 /**
213 * The thread that enqueued this node. Initialized on
214 * construction and nulled out after use.
215 */
216 volatile Thread thread;
217
218 /**
219 * Link to next node waiting on condition, or the special
220 * value SHARED. Because condition queues are accessed only
221 * when holding in exclusive mode, we just need a simple
222 * linked queue to hold nodes while they are waiting on
223 * conditions. They are then transferred to the queue to
224 * re-acquire. And because conditions can only be exclusive,
225 * we save a field by using special value to indicate shared
226 * mode.
227 */
228 Node nextWaiter;
229
230 /**
231 * Returns true if node is waiting in shared mode.
232 */
233 final boolean isShared() {
234 return nextWaiter == SHARED;
235 }
236
237 /**
238 * Returns previous node, or throws NullPointerException if null.
239 * Use when predecessor cannot be null. The null check could
240 * be elided, but is present to help the VM.
241 *
242 * @return the predecessor of this node
243 */
244 final Node predecessor() throws NullPointerException {
245 Node p = prev;
246 if (p == null)
247 throw new NullPointerException();
248 else
249 return p;
250 }
251
252 Node() { // Used to establish initial head or SHARED marker
253 }
254
255 Node(Thread thread, Node mode) { // Used by addWaiter
256 this.nextWaiter = mode;
257 this.thread = thread;
258 }
259
260 Node(Thread thread, int waitStatus) { // Used by Condition
261 this.waitStatus = waitStatus;
262 this.thread = thread;
263 }
264 }
265
266 /**
267 * Head of the wait queue, lazily initialized. Except for
268 * initialization, it is modified only via method setHead. Note:
269 * If head exists, its waitStatus is guaranteed not to be
270 * CANCELLED.
271 */
272 private transient volatile Node head;
273
274 /**
275 * Tail of the wait queue, lazily initialized. Modified only via
276 * method enq to add new wait node.
277 */
278 private transient volatile Node tail;
279
280 /**
281 * The synchronization state.
282 */
283 private volatile long state;
284
285 /**
286 * Returns the current value of synchronization state.
287 * This operation has memory semantics of a {@code volatile} read.
288 * @return current state value
289 */
290 protected final long getState() {
291 return state;
292 }
293
294 /**
295 * Sets the value of synchronization state.
296 * This operation has memory semantics of a {@code volatile} write.
297 * @param newState the new state value
298 */
299 protected final void setState(long newState) {
300 state = newState;
301 }
302
303 /**
304 * Atomically sets synchronization state to the given updated
305 * value if the current state value equals the expected value.
306 * This operation has memory semantics of a {@code volatile} read
307 * and write.
308 *
309 * @param expect the expected value
310 * @param update the new value
311 * @return true if successful. False return indicates that the actual
312 * value was not equal to the expected value.
313 */
314 protected final boolean compareAndSetState(long expect, long update) {
315 // See below for intrinsics setup to support this
316 return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
317 }
318
319 // Queuing utilities
320
321 /**
322 * The number of nanoseconds for which it is faster to spin
323 * rather than to use timed park. A rough estimate suffices
324 * to improve responsiveness with very short timeouts.
325 */
326 static final long spinForTimeoutThreshold = 1000L;
327
328 /**
329 * Inserts node into queue, initializing if necessary. See picture above.
330 * @param node the node to insert
331 * @return node's predecessor
332 */
333 private Node enq(final Node node) {
334 for (;;) {
335 Node t = tail;
336 if (t == null) { // Must initialize
337 if (compareAndSetHead(new Node()))
338 tail = head;
339 } else {
340 node.prev = t;
341 if (compareAndSetTail(t, node)) {
342 t.next = node;
343 return t;
344 }
345 }
346 }
347 }
348
349 /**
350 * Creates and enqueues node for current thread and given mode.
351 *
352 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
353 * @return the new node
354 */
355 private Node addWaiter(Node mode) {
356 Node node = new Node(Thread.currentThread(), mode);
357 // Try the fast path of enq; backup to full enq on failure
358 Node pred = tail;
359 if (pred != null) {
360 node.prev = pred;
361 if (compareAndSetTail(pred, node)) {
362 pred.next = node;
363 return node;
364 }
365 }
366 enq(node);
367 return node;
368 }
369
370 /**
371 * Sets head of queue to be node, thus dequeuing. Called only by
372 * acquire methods. Also nulls out unused fields for sake of GC
373 * and to suppress unnecessary signals and traversals.
374 *
375 * @param node the node
376 */
377 private void setHead(Node node) {
378 head = node;
379 node.thread = null;
380 node.prev = null;
381 }
382
383 /**
384 * Wakes up node's successor, if one exists.
385 *
386 * @param node the node
387 */
388 private void unparkSuccessor(Node node) {
389 /*
390 * If status is negative (i.e., possibly needing signal) try
391 * to clear in anticipation of signalling. It is OK if this
392 * fails or if status is changed by waiting thread.
393 */
394 int ws = node.waitStatus;
395 if (ws < 0)
396 compareAndSetWaitStatus(node, ws, 0);
397
398 /*
399 * Thread to unpark is held in successor, which is normally
400 * just the next node. But if cancelled or apparently null,
401 * traverse backwards from tail to find the actual
402 * non-cancelled successor.
403 */
404 Node s = node.next;
405 if (s == null || s.waitStatus > 0) {
406 s = null;
407 for (Node p = tail; p != null && p != node; p = p.prev)
408 if (p.waitStatus <= 0)
409 s = p;
410 }
411 if (s != null)
412 LockSupport.unpark(s.thread);
413 }
414
415 /**
416 * Release action for shared mode -- signals successor and ensures
417 * propagation. (Note: For exclusive mode, release just amounts
418 * to calling unparkSuccessor of head if it needs signal.)
419 */
420 private void doReleaseShared() {
421 /*
422 * Ensure that a release propagates, even if there are other
423 * in-progress acquires/releases. This proceeds in the usual
424 * way of trying to unparkSuccessor of head if it needs
425 * signal. But if it does not, status is set to PROPAGATE to
426 * ensure that upon release, propagation continues.
427 * Additionally, we must loop in case a new node is added
428 * while we are doing this. Also, unlike other uses of
429 * unparkSuccessor, we need to know if CAS to reset status
430 * fails, if so rechecking.
431 */
432 for (;;) {
433 Node h = head;
434 if (h != null && h != tail) {
435 int ws = h.waitStatus;
436 if (ws == Node.SIGNAL) {
437 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
438 continue; // loop to recheck cases
439 unparkSuccessor(h);
440 }
441 else if (ws == 0 &&
442 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
443 continue; // loop on failed CAS
444 }
445 if (h == head) // loop if head changed
446 break;
447 }
448 }
449
450 /**
451 * Sets head of queue, and checks if successor may be waiting
452 * in shared mode, if so propagating if either propagate > 0 or
453 * PROPAGATE status was set.
454 *
455 * @param node the node
456 * @param propagate the return value from a tryAcquireShared
457 */
458 private void setHeadAndPropagate(Node node, long propagate) {
459 Node h = head; // Record old head for check below
460 setHead(node);
461 /*
462 * Try to signal next queued node if:
463 * Propagation was indicated by caller,
464 * or was recorded (as h.waitStatus) by a previous operation
465 * (note: this uses sign-check of waitStatus because
466 * PROPAGATE status may transition to SIGNAL.)
467 * and
468 * The next node is waiting in shared mode,
469 * or we don't know, because it appears null
470 *
471 * The conservatism in both of these checks may cause
472 * unnecessary wake-ups, but only when there are multiple
473 * racing acquires/releases, so most need signals now or soon
474 * anyway.
475 */
476 if (propagate > 0 || h == null || h.waitStatus < 0) {
477 Node s = node.next;
478 if (s == null || s.isShared())
479 doReleaseShared();
480 }
481 }
482
483 // Utilities for various versions of acquire
484
485 /**
486 * Cancels an ongoing attempt to acquire.
487 *
488 * @param node the node
489 */
490 private void cancelAcquire(Node node) {
491 // Ignore if node doesn't exist
492 if (node == null)
493 return;
494
495 node.thread = null;
496
497 // Skip cancelled predecessors
498 Node pred = node.prev;
499 while (pred.waitStatus > 0)
500 node.prev = pred = pred.prev;
501
502 // predNext is the apparent node to unsplice. CASes below will
503 // fail if not, in which case, we lost race vs another cancel
504 // or signal, so no further action is necessary.
505 Node predNext = pred.next;
506
507 // Can use unconditional write instead of CAS here.
508 // After this atomic step, other Nodes can skip past us.
509 // Before, we are free of interference from other threads.
510 node.waitStatus = Node.CANCELLED;
511
512 // If we are the tail, remove ourselves.
513 if (node == tail && compareAndSetTail(node, pred)) {
514 compareAndSetNext(pred, predNext, null);
515 } else {
516 // If successor needs signal, try to set pred's next-link
517 // so it will get one. Otherwise wake it up to propagate.
518 int ws;
519 if (pred != head &&
520 ((ws = pred.waitStatus) == Node.SIGNAL ||
521 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
522 pred.thread != null) {
523 Node next = node.next;
524 if (next != null && next.waitStatus <= 0)
525 compareAndSetNext(pred, predNext, next);
526 } else {
527 unparkSuccessor(node);
528 }
529
530 node.next = node; // help GC
531 }
532 }
533
534 /**
535 * Checks and updates status for a node that failed to acquire.
536 * Returns true if thread should block. This is the main signal
537 * control in all acquire loops. Requires that pred == node.prev.
538 *
539 * @param pred node's predecessor holding status
540 * @param node the node
541 * @return {@code true} if thread should block
542 */
543 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
544 int ws = pred.waitStatus;
545 if (ws == Node.SIGNAL)
546 /*
547 * This node has already set status asking a release
548 * to signal it, so it can safely park.
549 */
550 return true;
551 if (ws > 0) {
552 /*
553 * Predecessor was cancelled. Skip over predecessors and
554 * indicate retry.
555 */
556 do {
557 node.prev = pred = pred.prev;
558 } while (pred.waitStatus > 0);
559 pred.next = node;
560 } else {
561 /*
562 * waitStatus must be 0 or PROPAGATE. Indicate that we
563 * need a signal, but don't park yet. Caller will need to
564 * retry to make sure it cannot acquire before parking.
565 */
566 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
567 }
568 return false;
569 }
570
571 /**
572 * Convenience method to interrupt current thread.
573 */
574 static void selfInterrupt() {
575 Thread.currentThread().interrupt();
576 }
577
578 /**
579 * Convenience method to park and then check if interrupted
580 *
581 * @return {@code true} if interrupted
582 */
583 private final boolean parkAndCheckInterrupt() {
584 LockSupport.park(this);
585 return Thread.interrupted();
586 }
587
588 /*
589 * Various flavors of acquire, varying in exclusive/shared and
590 * control modes. Each is mostly the same, but annoyingly
591 * different. Only a little bit of factoring is possible due to
592 * interactions of exception mechanics (including ensuring that we
593 * cancel if tryAcquire throws exception) and other control, at
594 * least not without hurting performance too much.
595 */
596
597 /**
598 * Acquires in exclusive uninterruptible mode for thread already in
599 * queue. Used by condition wait methods as well as acquire.
600 *
601 * @param node the node
602 * @param arg the acquire argument
603 * @return {@code true} if interrupted while waiting
604 */
605 final boolean acquireQueued(final Node node, long arg) {
606 boolean failed = true;
607 try {
608 boolean interrupted = false;
609 for (;;) {
610 final Node p = node.predecessor();
611 if (p == head && tryAcquire(arg)) {
612 setHead(node);
613 p.next = null; // help GC
614 failed = false;
615 return interrupted;
616 }
617 if (shouldParkAfterFailedAcquire(p, node) &&
618 parkAndCheckInterrupt())
619 interrupted = true;
620 }
621 } finally {
622 if (failed)
623 cancelAcquire(node);
624 }
625 }
626
627 /**
628 * Acquires in exclusive interruptible mode.
629 * @param arg the acquire argument
630 */
631 private void doAcquireInterruptibly(long arg)
632 throws InterruptedException {
633 final Node node = addWaiter(Node.EXCLUSIVE);
634 boolean failed = true;
635 try {
636 for (;;) {
637 final Node p = node.predecessor();
638 if (p == head && tryAcquire(arg)) {
639 setHead(node);
640 p.next = null; // help GC
641 failed = false;
642 return;
643 }
644 if (shouldParkAfterFailedAcquire(p, node) &&
645 parkAndCheckInterrupt())
646 throw new InterruptedException();
647 }
648 } finally {
649 if (failed)
650 cancelAcquire(node);
651 }
652 }
653
654 /**
655 * Acquires in exclusive timed mode.
656 *
657 * @param arg the acquire argument
658 * @param nanosTimeout max wait time
659 * @return {@code true} if acquired
660 */
661 private boolean doAcquireNanos(long arg, long nanosTimeout)
662 throws InterruptedException {
663 if (nanosTimeout <= 0L)
664 return false;
665 final long deadline = System.nanoTime() + nanosTimeout;
666 final Node node = addWaiter(Node.EXCLUSIVE);
667 boolean failed = true;
668 try {
669 for (;;) {
670 final Node p = node.predecessor();
671 if (p == head && tryAcquire(arg)) {
672 setHead(node);
673 p.next = null; // help GC
674 failed = false;
675 return true;
676 }
677 nanosTimeout = deadline - System.nanoTime();
678 if (nanosTimeout <= 0L)
679 return false;
680 if (shouldParkAfterFailedAcquire(p, node) &&
681 nanosTimeout > spinForTimeoutThreshold)
682 LockSupport.parkNanos(this, nanosTimeout);
683 if (Thread.interrupted())
684 throw new InterruptedException();
685 }
686 } finally {
687 if (failed)
688 cancelAcquire(node);
689 }
690 }
691
692 /**
693 * Acquires in shared uninterruptible mode.
694 * @param arg the acquire argument
695 */
696 private void doAcquireShared(long arg) {
697 final Node node = addWaiter(Node.SHARED);
698 boolean failed = true;
699 try {
700 boolean interrupted = false;
701 for (;;) {
702 final Node p = node.predecessor();
703 if (p == head) {
704 long r = tryAcquireShared(arg);
705 if (r >= 0) {
706 setHeadAndPropagate(node, r);
707 p.next = null; // help GC
708 if (interrupted)
709 selfInterrupt();
710 failed = false;
711 return;
712 }
713 }
714 if (shouldParkAfterFailedAcquire(p, node) &&
715 parkAndCheckInterrupt())
716 interrupted = true;
717 }
718 } finally {
719 if (failed)
720 cancelAcquire(node);
721 }
722 }
723
724 /**
725 * Acquires in shared interruptible mode.
726 * @param arg the acquire argument
727 */
728 private void doAcquireSharedInterruptibly(long arg)
729 throws InterruptedException {
730 final Node node = addWaiter(Node.SHARED);
731 boolean failed = true;
732 try {
733 for (;;) {
734 final Node p = node.predecessor();
735 if (p == head) {
736 long r = tryAcquireShared(arg);
737 if (r >= 0) {
738 setHeadAndPropagate(node, r);
739 p.next = null; // help GC
740 failed = false;
741 return;
742 }
743 }
744 if (shouldParkAfterFailedAcquire(p, node) &&
745 parkAndCheckInterrupt())
746 throw new InterruptedException();
747 }
748 } finally {
749 if (failed)
750 cancelAcquire(node);
751 }
752 }
753
754 /**
755 * Acquires in shared timed mode.
756 *
757 * @param arg the acquire argument
758 * @param nanosTimeout max wait time
759 * @return {@code true} if acquired
760 */
761 private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
762 throws InterruptedException {
763 if (nanosTimeout <= 0L)
764 return false;
765 final long deadline = System.nanoTime() + nanosTimeout;
766 final Node node = addWaiter(Node.SHARED);
767 boolean failed = true;
768 try {
769 for (;;) {
770 final Node p = node.predecessor();
771 if (p == head) {
772 long r = tryAcquireShared(arg);
773 if (r >= 0) {
774 setHeadAndPropagate(node, r);
775 p.next = null; // help GC
776 failed = false;
777 return true;
778 }
779 }
780 nanosTimeout = deadline - System.nanoTime();
781 if (nanosTimeout <= 0L)
782 return false;
783 if (shouldParkAfterFailedAcquire(p, node) &&
784 nanosTimeout > spinForTimeoutThreshold)
785 LockSupport.parkNanos(this, nanosTimeout);
786 if (Thread.interrupted())
787 throw new InterruptedException();
788 }
789 } finally {
790 if (failed)
791 cancelAcquire(node);
792 }
793 }
794
795 // Main exported methods
796
797 /**
798 * Attempts to acquire in exclusive mode. This method should query
799 * if the state of the object permits it to be acquired in the
800 * exclusive mode, and if so to acquire it.
801 *
802 * <p>This method is always invoked by the thread performing
803 * acquire. If this method reports failure, the acquire method
804 * may queue the thread, if it is not already queued, until it is
805 * signalled by a release from some other thread. This can be used
806 * to implement method {@link Lock#tryLock()}.
807 *
808 * <p>The default
809 * implementation throws {@link UnsupportedOperationException}.
810 *
811 * @param arg the acquire argument. This value is always the one
812 * passed to an acquire method, or is the value saved on entry
813 * to a condition wait. The value is otherwise uninterpreted
814 * and can represent anything you like.
815 * @return {@code true} if successful. Upon success, this object has
816 * been acquired.
817 * @throws IllegalMonitorStateException if acquiring would place this
818 * synchronizer in an illegal state. This exception must be
819 * thrown in a consistent fashion for synchronization to work
820 * correctly.
821 * @throws UnsupportedOperationException if exclusive mode is not supported
822 */
823 protected boolean tryAcquire(long arg) {
824 throw new UnsupportedOperationException();
825 }
826
827 /**
828 * Attempts to set the state to reflect a release in exclusive
829 * mode.
830 *
831 * <p>This method is always invoked by the thread performing release.
832 *
833 * <p>The default implementation throws
834 * {@link UnsupportedOperationException}.
835 *
836 * @param arg the release argument. This value is always the one
837 * passed to a release method, or the current state value upon
838 * entry to a condition wait. The value is otherwise
839 * uninterpreted and can represent anything you like.
840 * @return {@code true} if this object is now in a fully released
841 * state, so that any waiting threads may attempt to acquire;
842 * and {@code false} otherwise.
843 * @throws IllegalMonitorStateException if releasing would place this
844 * synchronizer in an illegal state. This exception must be
845 * thrown in a consistent fashion for synchronization to work
846 * correctly.
847 * @throws UnsupportedOperationException if exclusive mode is not supported
848 */
849 protected boolean tryRelease(long arg) {
850 throw new UnsupportedOperationException();
851 }
852
853 /**
854 * Attempts to acquire in shared mode. This method should query if
855 * the state of the object permits it to be acquired in the shared
856 * mode, and if so to acquire it.
857 *
858 * <p>This method is always invoked by the thread performing
859 * acquire. If this method reports failure, the acquire method
860 * may queue the thread, if it is not already queued, until it is
861 * signalled by a release from some other thread.
862 *
863 * <p>The default implementation throws {@link
864 * UnsupportedOperationException}.
865 *
866 * @param arg the acquire argument. This value is always the one
867 * passed to an acquire method, or is the value saved on entry
868 * to a condition wait. The value is otherwise uninterpreted
869 * and can represent anything you like.
870 * @return a negative value on failure; zero if acquisition in shared
871 * mode succeeded but no subsequent shared-mode acquire can
872 * succeed; and a positive value if acquisition in shared
873 * mode succeeded and subsequent shared-mode acquires might
874 * also succeed, in which case a subsequent waiting thread
875 * must check availability. (Support for three different
876 * return values enables this method to be used in contexts
877 * where acquires only sometimes act exclusively.) Upon
878 * success, this object has been acquired.
879 * @throws IllegalMonitorStateException if acquiring would place this
880 * synchronizer in an illegal state. This exception must be
881 * thrown in a consistent fashion for synchronization to work
882 * correctly.
883 * @throws UnsupportedOperationException if shared mode is not supported
884 */
885 protected long tryAcquireShared(long arg) {
886 throw new UnsupportedOperationException();
887 }
888
889 /**
890 * Attempts to set the state to reflect a release in shared mode.
891 *
892 * <p>This method is always invoked by the thread performing release.
893 *
894 * <p>The default implementation throws
895 * {@link UnsupportedOperationException}.
896 *
897 * @param arg the release argument. This value is always the one
898 * passed to a release method, or the current state value upon
899 * entry to a condition wait. The value is otherwise
900 * uninterpreted and can represent anything you like.
901 * @return {@code true} if this release of shared mode may permit a
902 * waiting acquire (shared or exclusive) to succeed; and
903 * {@code false} otherwise
904 * @throws IllegalMonitorStateException if releasing would place this
905 * synchronizer in an illegal state. This exception must be
906 * thrown in a consistent fashion for synchronization to work
907 * correctly.
908 * @throws UnsupportedOperationException if shared mode is not supported
909 */
910 protected boolean tryReleaseShared(long arg) {
911 throw new UnsupportedOperationException();
912 }
913
914 /**
915 * Returns {@code true} if synchronization is held exclusively with
916 * respect to the current (calling) thread. This method is invoked
917 * upon each call to a non-waiting {@link ConditionObject} method.
918 * (Waiting methods instead invoke {@link #release}.)
919 *
920 * <p>The default implementation throws {@link
921 * UnsupportedOperationException}. This method is invoked
922 * internally only within {@link ConditionObject} methods, so need
923 * not be defined if conditions are not used.
924 *
925 * @return {@code true} if synchronization is held exclusively;
926 * {@code false} otherwise
927 * @throws UnsupportedOperationException if conditions are not supported
928 */
929 protected boolean isHeldExclusively() {
930 throw new UnsupportedOperationException();
931 }
932
933 /**
934 * Acquires in exclusive mode, ignoring interrupts. Implemented
935 * by invoking at least once {@link #tryAcquire},
936 * returning on success. Otherwise the thread is queued, possibly
937 * repeatedly blocking and unblocking, invoking {@link
938 * #tryAcquire} until success. This method can be used
939 * to implement method {@link Lock#lock}.
940 *
941 * @param arg the acquire argument. This value is conveyed to
942 * {@link #tryAcquire} but is otherwise uninterpreted and
943 * can represent anything you like.
944 */
945 public final void acquire(long arg) {
946 if (!tryAcquire(arg) &&
947 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
948 selfInterrupt();
949 }
950
951 /**
952 * Acquires in exclusive mode, aborting if interrupted.
953 * Implemented by first checking interrupt status, then invoking
954 * at least once {@link #tryAcquire}, returning on
955 * success. Otherwise the thread is queued, possibly repeatedly
956 * blocking and unblocking, invoking {@link #tryAcquire}
957 * until success or the thread is interrupted. This method can be
958 * used to implement method {@link Lock#lockInterruptibly}.
959 *
960 * @param arg the acquire argument. This value is conveyed to
961 * {@link #tryAcquire} but is otherwise uninterpreted and
962 * can represent anything you like.
963 * @throws InterruptedException if the current thread is interrupted
964 */
965 public final void acquireInterruptibly(long arg)
966 throws InterruptedException {
967 if (Thread.interrupted())
968 throw new InterruptedException();
969 if (!tryAcquire(arg))
970 doAcquireInterruptibly(arg);
971 }
972
973 /**
974 * Attempts to acquire in exclusive mode, aborting if interrupted,
975 * and failing if the given timeout elapses. Implemented by first
976 * checking interrupt status, then invoking at least once {@link
977 * #tryAcquire}, returning on success. Otherwise, the thread is
978 * queued, possibly repeatedly blocking and unblocking, invoking
979 * {@link #tryAcquire} until success or the thread is interrupted
980 * or the timeout elapses. This method can be used to implement
981 * method {@link Lock#tryLock(long, TimeUnit)}.
982 *
983 * @param arg the acquire argument. This value is conveyed to
984 * {@link #tryAcquire} but is otherwise uninterpreted and
985 * can represent anything you like.
986 * @param nanosTimeout the maximum number of nanoseconds to wait
987 * @return {@code true} if acquired; {@code false} if timed out
988 * @throws InterruptedException if the current thread is interrupted
989 */
990 public final boolean tryAcquireNanos(long arg, long nanosTimeout)
991 throws InterruptedException {
992 if (Thread.interrupted())
993 throw new InterruptedException();
994 return tryAcquire(arg) ||
995 doAcquireNanos(arg, nanosTimeout);
996 }
997
998 /**
999 * Releases in exclusive mode. Implemented by unblocking one or
1000 * more threads if {@link #tryRelease} returns true.
1001 * This method can be used to implement method {@link Lock#unlock}.
1002 *
1003 * @param arg the release argument. This value is conveyed to
1004 * {@link #tryRelease} but is otherwise uninterpreted and
1005 * can represent anything you like.
1006 * @return the value returned from {@link #tryRelease}
1007 */
1008 public final boolean release(long arg) {
1009 if (tryRelease(arg)) {
1010 Node h = head;
1011 if (h != null && h.waitStatus != 0)
1012 unparkSuccessor(h);
1013 return true;
1014 }
1015 return false;
1016 }
1017
1018 /**
1019 * Acquires in shared mode, ignoring interrupts. Implemented by
1020 * first invoking at least once {@link #tryAcquireShared},
1021 * returning on success. Otherwise the thread is queued, possibly
1022 * repeatedly blocking and unblocking, invoking {@link
1023 * #tryAcquireShared} until success.
1024 *
1025 * @param arg the acquire argument. This value is conveyed to
1026 * {@link #tryAcquireShared} but is otherwise uninterpreted
1027 * and can represent anything you like.
1028 */
1029 public final void acquireShared(long arg) {
1030 if (tryAcquireShared(arg) < 0)
1031 doAcquireShared(arg);
1032 }
1033
1034 /**
1035 * Acquires in shared mode, aborting if interrupted. Implemented
1036 * by first checking interrupt status, then invoking at least once
1037 * {@link #tryAcquireShared}, returning on success. Otherwise the
1038 * thread is queued, possibly repeatedly blocking and unblocking,
1039 * invoking {@link #tryAcquireShared} until success or the thread
1040 * is interrupted.
1041 * @param arg the acquire argument.
1042 * This value is conveyed to {@link #tryAcquireShared} but is
1043 * otherwise uninterpreted and can represent anything
1044 * you like.
1045 * @throws InterruptedException if the current thread is interrupted
1046 */
1047 public final void acquireSharedInterruptibly(long arg)
1048 throws InterruptedException {
1049 if (Thread.interrupted())
1050 throw new InterruptedException();
1051 if (tryAcquireShared(arg) < 0)
1052 doAcquireSharedInterruptibly(arg);
1053 }
1054
1055 /**
1056 * Attempts to acquire in shared mode, aborting if interrupted, and
1057 * failing if the given timeout elapses. Implemented by first
1058 * checking interrupt status, then invoking at least once {@link
1059 * #tryAcquireShared}, returning on success. Otherwise, the
1060 * thread is queued, possibly repeatedly blocking and unblocking,
1061 * invoking {@link #tryAcquireShared} until success or the thread
1062 * is interrupted or the timeout elapses.
1063 *
1064 * @param arg the acquire argument. This value is conveyed to
1065 * {@link #tryAcquireShared} but is otherwise uninterpreted
1066 * and can represent anything you like.
1067 * @param nanosTimeout the maximum number of nanoseconds to wait
1068 * @return {@code true} if acquired; {@code false} if timed out
1069 * @throws InterruptedException if the current thread is interrupted
1070 */
1071 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
1072 throws InterruptedException {
1073 if (Thread.interrupted())
1074 throw new InterruptedException();
1075 return tryAcquireShared(arg) >= 0 ||
1076 doAcquireSharedNanos(arg, nanosTimeout);
1077 }
1078
1079 /**
1080 * Releases in shared mode. Implemented by unblocking one or more
1081 * threads if {@link #tryReleaseShared} returns true.
1082 *
1083 * @param arg the release argument. This value is conveyed to
1084 * {@link #tryReleaseShared} but is otherwise uninterpreted
1085 * and can represent anything you like.
1086 * @return the value returned from {@link #tryReleaseShared}
1087 */
1088 public final boolean releaseShared(long arg) {
1089 if (tryReleaseShared(arg)) {
1090 doReleaseShared();
1091 return true;
1092 }
1093 return false;
1094 }
1095
1096 // Queue inspection methods
1097
1098 /**
1099 * Queries whether any threads are waiting to acquire. Note that
1100 * because cancellations due to interrupts and timeouts may occur
1101 * at any time, a {@code true} return does not guarantee that any
1102 * other thread will ever acquire.
1103 *
1104 * <p>In this implementation, this operation returns in
1105 * constant time.
1106 *
1107 * @return {@code true} if there may be other threads waiting to acquire
1108 */
1109 public final boolean hasQueuedThreads() {
1110 return head != tail;
1111 }
1112
1113 /**
1114 * Queries whether any threads have ever contended to acquire this
1115 * synchronizer; that is, if an acquire method has ever blocked.
1116 *
1117 * <p>In this implementation, this operation returns in
1118 * constant time.
1119 *
1120 * @return {@code true} if there has ever been contention
1121 */
1122 public final boolean hasContended() {
1123 return head != null;
1124 }
1125
1126 /**
1127 * Returns the first (longest-waiting) thread in the queue, or
1128 * {@code null} if no threads are currently queued.
1129 *
1130 * <p>In this implementation, this operation normally returns in
1131 * constant time, but may iterate upon contention if other threads are
1132 * concurrently modifying the queue.
1133 *
1134 * @return the first (longest-waiting) thread in the queue, or
1135 * {@code null} if no threads are currently queued
1136 */
1137 public final Thread getFirstQueuedThread() {
1138 // handle only fast path, else relay
1139 return (head == tail) ? null : fullGetFirstQueuedThread();
1140 }
1141
1142 /**
1143 * Version of getFirstQueuedThread called when fastpath fails
1144 */
1145 private Thread fullGetFirstQueuedThread() {
1146 /*
1147 * The first node is normally head.next. Try to get its
1148 * thread field, ensuring consistent reads: If thread
1149 * field is nulled out or s.prev is no longer head, then
1150 * some other thread(s) concurrently performed setHead in
1151 * between some of our reads. We try this twice before
1152 * resorting to traversal.
1153 */
1154 Node h, s;
1155 Thread st;
1156 if (((h = head) != null && (s = h.next) != null &&
1157 s.prev == head && (st = s.thread) != null) ||
1158 ((h = head) != null && (s = h.next) != null &&
1159 s.prev == head && (st = s.thread) != null))
1160 return st;
1161
1162 /*
1163 * Head's next field might not have been set yet, or may have
1164 * been unset after setHead. So we must check to see if tail
1165 * is actually first node. If not, we continue on, safely
1166 * traversing from tail back to head to find first,
1167 * guaranteeing termination.
1168 */
1169
1170 Node t = tail;
1171 Thread firstThread = null;
1172 while (t != null && t != head) {
1173 Thread tt = t.thread;
1174 if (tt != null)
1175 firstThread = tt;
1176 t = t.prev;
1177 }
1178 return firstThread;
1179 }
1180
1181 /**
1182 * Returns true if the given thread is currently queued.
1183 *
1184 * <p>This implementation traverses the queue to determine
1185 * presence of the given thread.
1186 *
1187 * @param thread the thread
1188 * @return {@code true} if the given thread is on the queue
1189 * @throws NullPointerException if the thread is null
1190 */
1191 public final boolean isQueued(Thread thread) {
1192 if (thread == null)
1193 throw new NullPointerException();
1194 for (Node p = tail; p != null; p = p.prev)
1195 if (p.thread == thread)
1196 return true;
1197 return false;
1198 }
1199
1200 /**
1201 * Returns {@code true} if the apparent first queued thread, if one
1202 * exists, is waiting in exclusive mode. If this method returns
1203 * {@code true}, and the current thread is attempting to acquire in
1204 * shared mode (that is, this method is invoked from {@link
1205 * #tryAcquireShared}) then it is guaranteed that the current thread
1206 * is not the first queued thread. Used only as a heuristic in
1207 * ReentrantReadWriteLock.
1208 */
1209 final boolean apparentlyFirstQueuedIsExclusive() {
1210 Node h, s;
1211 return (h = head) != null &&
1212 (s = h.next) != null &&
1213 !s.isShared() &&
1214 s.thread != null;
1215 }
1216
1217 /**
1218 * Queries whether any threads have been waiting to acquire longer
1219 * than the current thread.
1220 *
1221 * <p>An invocation of this method is equivalent to (but may be
1222 * more efficient than):
1223 * <pre> {@code
1224 * getFirstQueuedThread() != Thread.currentThread() &&
1225 * hasQueuedThreads()}</pre>
1226 *
1227 * <p>Note that because cancellations due to interrupts and
1228 * timeouts may occur at any time, a {@code true} return does not
1229 * guarantee that some other thread will acquire before the current
1230 * thread. Likewise, it is possible for another thread to win a
1231 * race to enqueue after this method has returned {@code false},
1232 * due to the queue being empty.
1233 *
1234 * <p>This method is designed to be used by a fair synchronizer to
1235 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
1236 * Such a synchronizer's {@link #tryAcquire} method should return
1237 * {@code false}, and its {@link #tryAcquireShared} method should
1238 * return a negative value, if this method returns {@code true}
1239 * (unless this is a reentrant acquire). For example, the {@code
1240 * tryAcquire} method for a fair, reentrant, exclusive mode
1241 * synchronizer might look like this:
1242 *
1243 * <pre> {@code
1244 * protected boolean tryAcquire(int arg) {
1245 * if (isHeldExclusively()) {
1246 * // A reentrant acquire; increment hold count
1247 * return true;
1248 * } else if (hasQueuedPredecessors()) {
1249 * return false;
1250 * } else {
1251 * // try to acquire normally
1252 * }
1253 * }}</pre>
1254 *
1255 * @return {@code true} if there is a queued thread preceding the
1256 * current thread, and {@code false} if the current thread
1257 * is at the head of the queue or the queue is empty
1258 * @since 1.7
1259 */
1260 public final boolean hasQueuedPredecessors() {
1261 // The correctness of this depends on head being initialized
1262 // before tail and on head.next being accurate if the current
1263 // thread is first in queue.
1264 Node t = tail; // Read fields in reverse initialization order
1265 Node h = head;
1266 Node s;
1267 return h != t &&
1268 ((s = h.next) == null || s.thread != Thread.currentThread());
1269 }
1270
1271
1272 // Instrumentation and monitoring methods
1273
1274 /**
1275 * Returns an estimate of the number of threads waiting to
1276 * acquire. The value is only an estimate because the number of
1277 * threads may change dynamically while this method traverses
1278 * internal data structures. This method is designed for use in
1279 * monitoring system state, not for synchronization
1280 * control.
1281 *
1282 * @return the estimated number of threads waiting to acquire
1283 */
1284 public final int getQueueLength() {
1285 int n = 0;
1286 for (Node p = tail; p != null; p = p.prev) {
1287 if (p.thread != null)
1288 ++n;
1289 }
1290 return n;
1291 }
1292
1293 /**
1294 * Returns a collection containing threads that may be waiting to
1295 * acquire. Because the actual set of threads may change
1296 * dynamically while constructing this result, the returned
1297 * collection is only a best-effort estimate. The elements of the
1298 * returned collection are in no particular order. This method is
1299 * designed to facilitate construction of subclasses that provide
1300 * more extensive monitoring facilities.
1301 *
1302 * @return the collection of threads
1303 */
1304 public final Collection<Thread> getQueuedThreads() {
1305 ArrayList<Thread> list = new ArrayList<Thread>();
1306 for (Node p = tail; p != null; p = p.prev) {
1307 Thread t = p.thread;
1308 if (t != null)
1309 list.add(t);
1310 }
1311 return list;
1312 }
1313
1314 /**
1315 * Returns a collection containing threads that may be waiting to
1316 * acquire in exclusive mode. This has the same properties
1317 * as {@link #getQueuedThreads} except that it only returns
1318 * those threads waiting due to an exclusive acquire.
1319 *
1320 * @return the collection of threads
1321 */
1322 public final Collection<Thread> getExclusiveQueuedThreads() {
1323 ArrayList<Thread> list = new ArrayList<Thread>();
1324 for (Node p = tail; p != null; p = p.prev) {
1325 if (!p.isShared()) {
1326 Thread t = p.thread;
1327 if (t != null)
1328 list.add(t);
1329 }
1330 }
1331 return list;
1332 }
1333
1334 /**
1335 * Returns a collection containing threads that may be waiting to
1336 * acquire in shared mode. This has the same properties
1337 * as {@link #getQueuedThreads} except that it only returns
1338 * those threads waiting due to a shared acquire.
1339 *
1340 * @return the collection of threads
1341 */
1342 public final Collection<Thread> getSharedQueuedThreads() {
1343 ArrayList<Thread> list = new ArrayList<Thread>();
1344 for (Node p = tail; p != null; p = p.prev) {
1345 if (p.isShared()) {
1346 Thread t = p.thread;
1347 if (t != null)
1348 list.add(t);
1349 }
1350 }
1351 return list;
1352 }
1353
1354 /**
1355 * Returns a string identifying this synchronizer, as well as its state.
1356 * The state, in brackets, includes the String {@code "State ="}
1357 * followed by the current value of {@link #getState}, and either
1358 * {@code "nonempty"} or {@code "empty"} depending on whether the
1359 * queue is empty.
1360 *
1361 * @return a string identifying this synchronizer, as well as its state
1362 */
1363 public String toString() {
1364 return super.toString()
1365 + "[State = " + getState() + ", "
1366 + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1367 }
1368
1369
1370 // Internal support methods for Conditions
1371
1372 /**
1373 * Returns true if a node, always one that was initially placed on
1374 * a condition queue, is now waiting to reacquire on sync queue.
1375 * @param node the node
1376 * @return true if is reacquiring
1377 */
1378 final boolean isOnSyncQueue(Node node) {
1379 if (node.waitStatus == Node.CONDITION || node.prev == null)
1380 return false;
1381 if (node.next != null) // If has successor, it must be on queue
1382 return true;
1383 /*
1384 * node.prev can be non-null, but not yet on queue because
1385 * the CAS to place it on queue can fail. So we have to
1386 * traverse from tail to make sure it actually made it. It
1387 * will always be near the tail in calls to this method, and
1388 * unless the CAS failed (which is unlikely), it will be
1389 * there, so we hardly ever traverse much.
1390 */
1391 return findNodeFromTail(node);
1392 }
1393
1394 /**
1395 * Returns true if node is on sync queue by searching backwards from tail.
1396 * Called only when needed by isOnSyncQueue.
1397 * @return true if present
1398 */
1399 private boolean findNodeFromTail(Node node) {
1400 Node p = tail;
1401 for (;;) {
1402 if (p == node)
1403 return true;
1404 if (p == null)
1405 return false;
1406 p = p.prev;
1407 }
1408 }
1409
1410 /**
1411 * Transfers a node from a condition queue onto sync queue.
1412 * Returns true if successful.
1413 * @param node the node
1414 * @return true if successfully transferred (else the node was
1415 * cancelled before signal)
1416 */
1417 final boolean transferForSignal(Node node) {
1418 /*
1419 * If cannot change waitStatus, the node has been cancelled.
1420 */
1421 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1422 return false;
1423
1424 /*
1425 * Splice onto queue and try to set waitStatus of predecessor to
1426 * indicate that thread is (probably) waiting. If cancelled or
1427 * attempt to set waitStatus fails, wake up to resync (in which
1428 * case the waitStatus can be transiently and harmlessly wrong).
1429 */
1430 Node p = enq(node);
1431 int ws = p.waitStatus;
1432 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
1433 LockSupport.unpark(node.thread);
1434 return true;
1435 }
1436
1437 /**
1438 * Transfers node, if necessary, to sync queue after a cancelled wait.
1439 * Returns true if thread was cancelled before being signalled.
1440 *
1441 * @param node the node
1442 * @return true if cancelled before the node was signalled
1443 */
1444 final boolean transferAfterCancelledWait(Node node) {
1445 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1446 enq(node);
1447 return true;
1448 }
1449 /*
1450 * If we lost out to a signal(), then we can't proceed
1451 * until it finishes its enq(). Cancelling during an
1452 * incomplete transfer is both rare and transient, so just
1453 * spin.
1454 */
1455 while (!isOnSyncQueue(node))
1456 Thread.yield();
1457 return false;
1458 }
1459
1460 /**
1461 * Invokes release with current state value; returns saved state.
1462 * Cancels node and throws exception on failure.
1463 * @param node the condition node for this wait
1464 * @return previous sync state
1465 */
1466 final long fullyRelease(Node node) {
1467 boolean failed = true;
1468 try {
1469 long savedState = getState();
1470 if (release(savedState)) {
1471 failed = false;
1472 return savedState;
1473 } else {
1474 throw new IllegalMonitorStateException();
1475 }
1476 } finally {
1477 if (failed)
1478 node.waitStatus = Node.CANCELLED;
1479 }
1480 }
1481
1482 // Instrumentation methods for conditions
1483
1484 /**
1485 * Queries whether the given ConditionObject
1486 * uses this synchronizer as its lock.
1487 *
1488 * @param condition the condition
1489 * @return {@code true} if owned
1490 * @throws NullPointerException if the condition is null
1491 */
1492 public final boolean owns(ConditionObject condition) {
1493 return condition.isOwnedBy(this);
1494 }
1495
1496 /**
1497 * Queries whether any threads are waiting on the given condition
1498 * associated with this synchronizer. Note that because timeouts
1499 * and interrupts may occur at any time, a {@code true} return
1500 * does not guarantee that a future {@code signal} will awaken
1501 * any threads. This method is designed primarily for use in
1502 * monitoring of the system state.
1503 *
1504 * @param condition the condition
1505 * @return {@code true} if there are any waiting threads
1506 * @throws IllegalMonitorStateException if exclusive synchronization
1507 * is not held
1508 * @throws IllegalArgumentException if the given condition is
1509 * not associated with this synchronizer
1510 * @throws NullPointerException if the condition is null
1511 */
1512 public final boolean hasWaiters(ConditionObject condition) {
1513 if (!owns(condition))
1514 throw new IllegalArgumentException("Not owner");
1515 return condition.hasWaiters();
1516 }
1517
1518 /**
1519 * Returns an estimate of the number of threads waiting on the
1520 * given condition associated with this synchronizer. Note that
1521 * because timeouts and interrupts may occur at any time, the
1522 * estimate serves only as an upper bound on the actual number of
1523 * waiters. This method is designed for use in monitoring of the
1524 * system state, not for synchronization control.
1525 *
1526 * @param condition the condition
1527 * @return the estimated number of waiting threads
1528 * @throws IllegalMonitorStateException if exclusive synchronization
1529 * is not held
1530 * @throws IllegalArgumentException if the given condition is
1531 * not associated with this synchronizer
1532 * @throws NullPointerException if the condition is null
1533 */
1534 public final int getWaitQueueLength(ConditionObject condition) {
1535 if (!owns(condition))
1536 throw new IllegalArgumentException("Not owner");
1537 return condition.getWaitQueueLength();
1538 }
1539
1540 /**
1541 * Returns a collection containing those threads that may be
1542 * waiting on the given condition associated with this
1543 * synchronizer. Because the actual set of threads may change
1544 * dynamically while constructing this result, the returned
1545 * collection is only a best-effort estimate. The elements of the
1546 * returned collection are in no particular order.
1547 *
1548 * @param condition the condition
1549 * @return the collection of threads
1550 * @throws IllegalMonitorStateException if exclusive synchronization
1551 * is not held
1552 * @throws IllegalArgumentException if the given condition is
1553 * not associated with this synchronizer
1554 * @throws NullPointerException if the condition is null
1555 */
1556 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1557 if (!owns(condition))
1558 throw new IllegalArgumentException("Not owner");
1559 return condition.getWaitingThreads();
1560 }
1561
1562 /**
1563 * Condition implementation for a {@link
1564 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link
1565 * Lock} implementation.
1566 *
1567 * <p>Method documentation for this class describes mechanics,
1568 * not behavioral specifications from the point of view of Lock
1569 * and Condition users. Exported versions of this class will in
1570 * general need to be accompanied by documentation describing
1571 * condition semantics that rely on those of the associated
1572 * {@code AbstractQueuedLongSynchronizer}.
1573 *
1574 * <p>This class is Serializable, but all fields are transient,
1575 * so deserialized conditions have no waiters.
1576 *
1577 * @since 1.6
1578 */
1579 public class ConditionObject implements Condition, java.io.Serializable {
1580 private static final long serialVersionUID = 1173984872572414699L;
1581 /** First node of condition queue. */
1582 private transient Node firstWaiter;
1583 /** Last node of condition queue. */
1584 private transient Node lastWaiter;
1585
1586 /**
1587 * Creates a new {@code ConditionObject} instance.
1588 */
1589 public ConditionObject() { }
1590
1591 // Internal methods
1592
1593 /**
1594 * Adds a new waiter to wait queue.
1595 * @return its new wait node
1596 */
1597 private Node addConditionWaiter() {
1598 Node t = lastWaiter;
1599 // If lastWaiter is cancelled, clean out.
1600 if (t != null && t.waitStatus != Node.CONDITION) {
1601 unlinkCancelledWaiters();
1602 t = lastWaiter;
1603 }
1604 Node node = new Node(Thread.currentThread(), Node.CONDITION);
1605 if (t == null)
1606 firstWaiter = node;
1607 else
1608 t.nextWaiter = node;
1609 lastWaiter = node;
1610 return node;
1611 }
1612
1613 /**
1614 * Removes and transfers nodes until hit non-cancelled one or
1615 * null. Split out from signal in part to encourage compilers
1616 * to inline the case of no waiters.
1617 * @param first (non-null) the first node on condition queue
1618 */
1619 private void doSignal(Node first) {
1620 do {
1621 if ( (firstWaiter = first.nextWaiter) == null)
1622 lastWaiter = null;
1623 first.nextWaiter = null;
1624 } while (!transferForSignal(first) &&
1625 (first = firstWaiter) != null);
1626 }
1627
1628 /**
1629 * Removes and transfers all nodes.
1630 * @param first (non-null) the first node on condition queue
1631 */
1632 private void doSignalAll(Node first) {
1633 lastWaiter = firstWaiter = null;
1634 do {
1635 Node next = first.nextWaiter;
1636 first.nextWaiter = null;
1637 transferForSignal(first);
1638 first = next;
1639 } while (first != null);
1640 }
1641
1642 /**
1643 * Unlinks cancelled waiter nodes from condition queue.
1644 * Called only while holding lock. This is called when
1645 * cancellation occurred during condition wait, and upon
1646 * insertion of a new waiter when lastWaiter is seen to have
1647 * been cancelled. This method is needed to avoid garbage
1648 * retention in the absence of signals. So even though it may
1649 * require a full traversal, it comes into play only when
1650 * timeouts or cancellations occur in the absence of
1651 * signals. It traverses all nodes rather than stopping at a
1652 * particular target to unlink all pointers to garbage nodes
1653 * without requiring many re-traversals during cancellation
1654 * storms.
1655 */
1656 private void unlinkCancelledWaiters() {
1657 Node t = firstWaiter;
1658 Node trail = null;
1659 while (t != null) {
1660 Node next = t.nextWaiter;
1661 if (t.waitStatus != Node.CONDITION) {
1662 t.nextWaiter = null;
1663 if (trail == null)
1664 firstWaiter = next;
1665 else
1666 trail.nextWaiter = next;
1667 if (next == null)
1668 lastWaiter = trail;
1669 }
1670 else
1671 trail = t;
1672 t = next;
1673 }
1674 }
1675
1676 // public methods
1677
1678 /**
1679 * Moves the longest-waiting thread, if one exists, from the
1680 * wait queue for this condition to the wait queue for the
1681 * owning lock.
1682 *
1683 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1684 * returns {@code false}
1685 */
1686 public final void signal() {
1687 if (!isHeldExclusively())
1688 throw new IllegalMonitorStateException();
1689 Node first = firstWaiter;
1690 if (first != null)
1691 doSignal(first);
1692 }
1693
1694 /**
1695 * Moves all threads from the wait queue for this condition to
1696 * the wait queue for the owning lock.
1697 *
1698 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1699 * returns {@code false}
1700 */
1701 public final void signalAll() {
1702 if (!isHeldExclusively())
1703 throw new IllegalMonitorStateException();
1704 Node first = firstWaiter;
1705 if (first != null)
1706 doSignalAll(first);
1707 }
1708
1709 /**
1710 * Implements uninterruptible condition wait.
1711 * <ol>
1712 * <li>Save lock state returned by {@link #getState}.
1713 * <li>Invoke {@link #release} with saved state as argument,
1714 * throwing IllegalMonitorStateException if it fails.
1715 * <li>Block until signalled.
1716 * <li>Reacquire by invoking specialized version of
1717 * {@link #acquire} with saved state as argument.
1718 * </ol>
1719 */
1720 public final void awaitUninterruptibly() {
1721 Node node = addConditionWaiter();
1722 long savedState = fullyRelease(node);
1723 boolean interrupted = false;
1724 while (!isOnSyncQueue(node)) {
1725 LockSupport.park(this);
1726 if (Thread.interrupted())
1727 interrupted = true;
1728 }
1729 if (acquireQueued(node, savedState) || interrupted)
1730 selfInterrupt();
1731 }
1732
1733 /*
1734 * For interruptible waits, we need to track whether to throw
1735 * InterruptedException, if interrupted while blocked on
1736 * condition, versus reinterrupt current thread, if
1737 * interrupted while blocked waiting to re-acquire.
1738 */
1739
1740 /** Mode meaning to reinterrupt on exit from wait */
1741 private static final int REINTERRUPT = 1;
1742 /** Mode meaning to throw InterruptedException on exit from wait */
1743 private static final int THROW_IE = -1;
1744
1745 /**
1746 * Checks for interrupt, returning THROW_IE if interrupted
1747 * before signalled, REINTERRUPT if after signalled, or
1748 * 0 if not interrupted.
1749 */
1750 private int checkInterruptWhileWaiting(Node node) {
1751 return Thread.interrupted() ?
1752 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
1753 0;
1754 }
1755
1756 /**
1757 * Throws InterruptedException, reinterrupts current thread, or
1758 * does nothing, depending on mode.
1759 */
1760 private void reportInterruptAfterWait(int interruptMode)
1761 throws InterruptedException {
1762 if (interruptMode == THROW_IE)
1763 throw new InterruptedException();
1764 else if (interruptMode == REINTERRUPT)
1765 selfInterrupt();
1766 }
1767
1768 /**
1769 * Implements interruptible condition wait.
1770 * <ol>
1771 * <li>If current thread is interrupted, throw InterruptedException.
1772 * <li>Save lock state returned by {@link #getState}.
1773 * <li>Invoke {@link #release} with saved state as argument,
1774 * throwing IllegalMonitorStateException if it fails.
1775 * <li>Block until signalled or interrupted.
1776 * <li>Reacquire by invoking specialized version of
1777 * {@link #acquire} with saved state as argument.
1778 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1779 * </ol>
1780 */
1781 public final void await() throws InterruptedException {
1782 if (Thread.interrupted())
1783 throw new InterruptedException();
1784 Node node = addConditionWaiter();
1785 long savedState = fullyRelease(node);
1786 int interruptMode = 0;
1787 while (!isOnSyncQueue(node)) {
1788 LockSupport.park(this);
1789 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1790 break;
1791 }
1792 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1793 interruptMode = REINTERRUPT;
1794 if (node.nextWaiter != null) // clean up if cancelled
1795 unlinkCancelledWaiters();
1796 if (interruptMode != 0)
1797 reportInterruptAfterWait(interruptMode);
1798 }
1799
1800 /**
1801 * Implements timed condition wait.
1802 * <ol>
1803 * <li>If current thread is interrupted, throw InterruptedException.
1804 * <li>Save lock state returned by {@link #getState}.
1805 * <li>Invoke {@link #release} with saved state as argument,
1806 * throwing IllegalMonitorStateException if it fails.
1807 * <li>Block until signalled, interrupted, or timed out.
1808 * <li>Reacquire by invoking specialized version of
1809 * {@link #acquire} with saved state as argument.
1810 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1811 * </ol>
1812 */
1813 public final long awaitNanos(long nanosTimeout)
1814 throws InterruptedException {
1815 if (Thread.interrupted())
1816 throw new InterruptedException();
1817 long initialNanos = nanosTimeout;
1818 Node node = addConditionWaiter();
1819 long savedState = fullyRelease(node);
1820 final long deadline = System.nanoTime() + nanosTimeout;
1821 int interruptMode = 0;
1822 while (!isOnSyncQueue(node)) {
1823 if (nanosTimeout <= 0L) {
1824 transferAfterCancelledWait(node);
1825 break;
1826 }
1827 if (nanosTimeout > spinForTimeoutThreshold)
1828 LockSupport.parkNanos(this, nanosTimeout);
1829 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1830 break;
1831 nanosTimeout = deadline - System.nanoTime();
1832 }
1833 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1834 interruptMode = REINTERRUPT;
1835 if (node.nextWaiter != null)
1836 unlinkCancelledWaiters();
1837 if (interruptMode != 0)
1838 reportInterruptAfterWait(interruptMode);
1839 long remaining = deadline - System.nanoTime(); // avoid overflow
1840 return (remaining < initialNanos) ? remaining : Long.MIN_VALUE;
1841 }
1842
1843 /**
1844 * Implements absolute timed condition wait.
1845 * <ol>
1846 * <li>If current thread is interrupted, throw InterruptedException.
1847 * <li>Save lock state returned by {@link #getState}.
1848 * <li>Invoke {@link #release} with saved state as argument,
1849 * throwing IllegalMonitorStateException if it fails.
1850 * <li>Block until signalled, interrupted, or timed out.
1851 * <li>Reacquire by invoking specialized version of
1852 * {@link #acquire} with saved state as argument.
1853 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1854 * <li>If timed out while blocked in step 4, return false, else true.
1855 * </ol>
1856 */
1857 public final boolean awaitUntil(Date deadline)
1858 throws InterruptedException {
1859 long abstime = deadline.getTime();
1860 if (Thread.interrupted())
1861 throw new InterruptedException();
1862 Node node = addConditionWaiter();
1863 long savedState = fullyRelease(node);
1864 boolean timedout = false;
1865 int interruptMode = 0;
1866 while (!isOnSyncQueue(node)) {
1867 if (System.currentTimeMillis() >= abstime) {
1868 timedout = transferAfterCancelledWait(node);
1869 break;
1870 }
1871 LockSupport.parkUntil(this, abstime);
1872 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1873 break;
1874 }
1875 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1876 interruptMode = REINTERRUPT;
1877 if (node.nextWaiter != null)
1878 unlinkCancelledWaiters();
1879 if (interruptMode != 0)
1880 reportInterruptAfterWait(interruptMode);
1881 return !timedout;
1882 }
1883
1884 /**
1885 * Implements timed condition wait.
1886 * <ol>
1887 * <li>If current thread is interrupted, throw InterruptedException.
1888 * <li>Save lock state returned by {@link #getState}.
1889 * <li>Invoke {@link #release} with saved state as argument,
1890 * throwing IllegalMonitorStateException if it fails.
1891 * <li>Block until signalled, interrupted, or timed out.
1892 * <li>Reacquire by invoking specialized version of
1893 * {@link #acquire} with saved state as argument.
1894 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1895 * <li>If timed out while blocked in step 4, return false, else true.
1896 * </ol>
1897 */
1898 public final boolean await(long time, TimeUnit unit)
1899 throws InterruptedException {
1900 long nanosTimeout = unit.toNanos(time);
1901 if (Thread.interrupted())
1902 throw new InterruptedException();
1903 Node node = addConditionWaiter();
1904 long savedState = fullyRelease(node);
1905 final long deadline = System.nanoTime() + nanosTimeout;
1906 boolean timedout = false;
1907 int interruptMode = 0;
1908 while (!isOnSyncQueue(node)) {
1909 if (nanosTimeout <= 0L) {
1910 timedout = transferAfterCancelledWait(node);
1911 break;
1912 }
1913 if (nanosTimeout > spinForTimeoutThreshold)
1914 LockSupport.parkNanos(this, nanosTimeout);
1915 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1916 break;
1917 nanosTimeout = deadline - System.nanoTime();
1918 }
1919 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1920 interruptMode = REINTERRUPT;
1921 if (node.nextWaiter != null)
1922 unlinkCancelledWaiters();
1923 if (interruptMode != 0)
1924 reportInterruptAfterWait(interruptMode);
1925 return !timedout;
1926 }
1927
1928 // support for instrumentation
1929
1930 /**
1931 * Returns true if this condition was created by the given
1932 * synchronization object.
1933 *
1934 * @return {@code true} if owned
1935 */
1936 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1937 return sync == AbstractQueuedLongSynchronizer.this;
1938 }
1939
1940 /**
1941 * Queries whether any threads are waiting on this condition.
1942 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}.
1943 *
1944 * @return {@code true} if there are any waiting threads
1945 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1946 * returns {@code false}
1947 */
1948 protected final boolean hasWaiters() {
1949 if (!isHeldExclusively())
1950 throw new IllegalMonitorStateException();
1951 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1952 if (w.waitStatus == Node.CONDITION)
1953 return true;
1954 }
1955 return false;
1956 }
1957
1958 /**
1959 * Returns an estimate of the number of threads waiting on
1960 * this condition.
1961 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}.
1962 *
1963 * @return the estimated number of waiting threads
1964 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1965 * returns {@code false}
1966 */
1967 protected final int getWaitQueueLength() {
1968 if (!isHeldExclusively())
1969 throw new IllegalMonitorStateException();
1970 int n = 0;
1971 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1972 if (w.waitStatus == Node.CONDITION)
1973 ++n;
1974 }
1975 return n;
1976 }
1977
1978 /**
1979 * Returns a collection containing those threads that may be
1980 * waiting on this Condition.
1981 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}.
1982 *
1983 * @return the collection of threads
1984 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1985 * returns {@code false}
1986 */
1987 protected final Collection<Thread> getWaitingThreads() {
1988 if (!isHeldExclusively())
1989 throw new IllegalMonitorStateException();
1990 ArrayList<Thread> list = new ArrayList<Thread>();
1991 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1992 if (w.waitStatus == Node.CONDITION) {
1993 Thread t = w.thread;
1994 if (t != null)
1995 list.add(t);
1996 }
1997 }
1998 return list;
1999 }
2000 }
2001
2002 /**
2003 * Setup to support compareAndSet. We need to natively implement
2004 * this here: For the sake of permitting future enhancements, we
2005 * cannot explicitly subclass AtomicLong, which would be
2006 * efficient and useful otherwise. So, as the lesser of evils, we
2007 * natively implement using hotspot intrinsics API. And while we
2008 * are at it, we do the same for other CASable fields (which could
2009 * otherwise be done with atomic field updaters).
2010 */
2011 private static final Unsafe unsafe = Unsafe.getUnsafe();
2012 private static final long stateOffset;
2013 private static final long headOffset;
2014 private static final long tailOffset;
2015 private static final long waitStatusOffset;
2016 private static final long nextOffset;
2017
2018 static {
2019 try {
2020 stateOffset = unsafe.objectFieldOffset
2021 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state"));
2022 headOffset = unsafe.objectFieldOffset
2023 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head"));
2024 tailOffset = unsafe.objectFieldOffset
2025 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail"));
2026 waitStatusOffset = unsafe.objectFieldOffset
2027 (Node.class.getDeclaredField("waitStatus"));
2028 nextOffset = unsafe.objectFieldOffset
2029 (Node.class.getDeclaredField("next"));
2030
2031 } catch (Exception ex) { throw new Error(ex); }
2032
2033 // Reduce the risk of rare disastrous classloading in first call to
2034 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2035 Class<?> ensureLoaded = LockSupport.class;
2036 }
2037
2038 /**
2039 * CAS head field. Used only by enq.
2040 */
2041 private final boolean compareAndSetHead(Node update) {
2042 return unsafe.compareAndSwapObject(this, headOffset, null, update);
2043 }
2044
2045 /**
2046 * CAS tail field. Used only by enq.
2047 */
2048 private final boolean compareAndSetTail(Node expect, Node update) {
2049 return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
2050 }
2051
2052 /**
2053 * CAS waitStatus field of a node.
2054 */
2055 private static final boolean compareAndSetWaitStatus(Node node,
2056 int expect,
2057 int update) {
2058 return unsafe.compareAndSwapInt(node, waitStatusOffset,
2059 expect, update);
2060 }
2061
2062 /**
2063 * CAS next field of a node.
2064 */
2065 private static final boolean compareAndSetNext(Node node,
2066 Node expect,
2067 Node update) {
2068 return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
2069 }
2070 }