ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
Revision: 1.4
Committed: Wed Jan 17 06:11:59 2018 UTC (6 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +20 -14 lines
Log Message:
backport 8191483: AbstractQueuedSynchronizer cancel/cancel race

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent.locks;
8
9 import java.util.ArrayList;
10 import java.util.Collection;
11 import java.util.Date;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
14
15 /**
16 * A version of {@link AbstractQueuedSynchronizer} in
17 * which synchronization state is maintained as a {@code long}.
18 * This class has exactly the same structure, properties, and methods
19 * as {@code AbstractQueuedSynchronizer} with the exception
20 * that all state-related parameters and results are defined
21 * as {@code long} rather than {@code int}. This class
22 * may be useful when creating synchronizers such as
23 * multilevel locks and barriers that require
24 * 64 bits of state.
25 *
26 * <p>See {@link AbstractQueuedSynchronizer} for usage
27 * notes and examples.
28 *
29 * @since 1.6
30 * @author Doug Lea
31 */
32 public abstract class AbstractQueuedLongSynchronizer
33 extends AbstractOwnableSynchronizer
34 implements java.io.Serializable {
35
36 private static final long serialVersionUID = 7373984972572414692L;
37
38 /*
39 * To keep sources in sync, the remainder of this source file is
40 * exactly cloned from AbstractQueuedSynchronizer, replacing class
41 * name and changing ints related with sync state to longs. Please
42 * keep it that way.
43 */
44
45 /**
46 * Creates a new {@code AbstractQueuedLongSynchronizer} instance
47 * with initial synchronization state of zero.
48 */
49 protected AbstractQueuedLongSynchronizer() { }
50
51 /**
52 * Head of the wait queue, lazily initialized. Except for
53 * initialization, it is modified only via method setHead. Note:
54 * If head exists, its waitStatus is guaranteed not to be
55 * CANCELLED.
56 */
57 private transient volatile Node head;
58
59 /**
60 * Tail of the wait queue, lazily initialized. Modified only via
61 * method enq to add new wait node.
62 */
63 private transient volatile Node tail;
64
65 /**
66 * The synchronization state.
67 */
68 private volatile long state;
69
70 /**
71 * Returns the current value of synchronization state.
72 * This operation has memory semantics of a {@code volatile} read.
73 * @return current state value
74 */
75 protected final long getState() {
76 return state;
77 }
78
79 /**
80 * Sets the value of synchronization state.
81 * This operation has memory semantics of a {@code volatile} write.
82 * @param newState the new state value
83 */
84 protected final void setState(long newState) {
85 // Use putLongVolatile instead of ordinary volatile store when
86 // using compareAndSwapLong, for sake of some 32bit systems.
87 U.putLongVolatile(this, STATE, newState);
88 }
89
90 /**
91 * Atomically sets synchronization state to the given updated
92 * value if the current state value equals the expected value.
93 * This operation has memory semantics of a {@code volatile} read
94 * and write.
95 *
96 * @param expect the expected value
97 * @param update the new value
98 * @return {@code true} if successful. False return indicates that the actual
99 * value was not equal to the expected value.
100 */
101 protected final boolean compareAndSetState(long expect, long update) {
102 return U.compareAndSwapLong(this, STATE, expect, update);
103 }
104
105 // Queuing utilities
106
107 /**
108 * The number of nanoseconds for which it is faster to spin
109 * rather than to use timed park. A rough estimate suffices
110 * to improve responsiveness with very short timeouts.
111 */
112 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
113
114 /**
115 * Inserts node into queue, initializing if necessary. See picture above.
116 * @param node the node to insert
117 * @return node's predecessor
118 */
119 private Node enq(Node node) {
120 for (;;) {
121 Node oldTail = tail;
122 if (oldTail != null) {
123 U.putObject(node, Node.PREV, oldTail);
124 if (compareAndSetTail(oldTail, node)) {
125 oldTail.next = node;
126 return oldTail;
127 }
128 } else {
129 initializeSyncQueue();
130 }
131 }
132 }
133
134 /**
135 * Creates and enqueues node for current thread and given mode.
136 *
137 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
138 * @return the new node
139 */
140 private Node addWaiter(Node mode) {
141 Node node = new Node(mode);
142
143 for (;;) {
144 Node oldTail = tail;
145 if (oldTail != null) {
146 U.putObject(node, Node.PREV, oldTail);
147 if (compareAndSetTail(oldTail, node)) {
148 oldTail.next = node;
149 return node;
150 }
151 } else {
152 initializeSyncQueue();
153 }
154 }
155 }
156
157 /**
158 * Sets head of queue to be node, thus dequeuing. Called only by
159 * acquire methods. Also nulls out unused fields for sake of GC
160 * and to suppress unnecessary signals and traversals.
161 *
162 * @param node the node
163 */
164 private void setHead(Node node) {
165 head = node;
166 node.thread = null;
167 node.prev = null;
168 }
169
170 /**
171 * Wakes up node's successor, if one exists.
172 *
173 * @param node the node
174 */
175 private void unparkSuccessor(Node node) {
176 /*
177 * If status is negative (i.e., possibly needing signal) try
178 * to clear in anticipation of signalling. It is OK if this
179 * fails or if status is changed by waiting thread.
180 */
181 int ws = node.waitStatus;
182 if (ws < 0)
183 node.compareAndSetWaitStatus(ws, 0);
184
185 /*
186 * Thread to unpark is held in successor, which is normally
187 * just the next node. But if cancelled or apparently null,
188 * traverse backwards from tail to find the actual
189 * non-cancelled successor.
190 */
191 Node s = node.next;
192 if (s == null || s.waitStatus > 0) {
193 s = null;
194 for (Node p = tail; p != node && p != null; p = p.prev)
195 if (p.waitStatus <= 0)
196 s = p;
197 }
198 if (s != null)
199 LockSupport.unpark(s.thread);
200 }
201
202 /**
203 * Release action for shared mode -- signals successor and ensures
204 * propagation. (Note: For exclusive mode, release just amounts
205 * to calling unparkSuccessor of head if it needs signal.)
206 */
207 private void doReleaseShared() {
208 /*
209 * Ensure that a release propagates, even if there are other
210 * in-progress acquires/releases. This proceeds in the usual
211 * way of trying to unparkSuccessor of head if it needs
212 * signal. But if it does not, status is set to PROPAGATE to
213 * ensure that upon release, propagation continues.
214 * Additionally, we must loop in case a new node is added
215 * while we are doing this. Also, unlike other uses of
216 * unparkSuccessor, we need to know if CAS to reset status
217 * fails, if so rechecking.
218 */
219 for (;;) {
220 Node h = head;
221 if (h != null && h != tail) {
222 int ws = h.waitStatus;
223 if (ws == Node.SIGNAL) {
224 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
225 continue; // loop to recheck cases
226 unparkSuccessor(h);
227 }
228 else if (ws == 0 &&
229 !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
230 continue; // loop on failed CAS
231 }
232 if (h == head) // loop if head changed
233 break;
234 }
235 }
236
237 /**
238 * Sets head of queue, and checks if successor may be waiting
239 * in shared mode, if so propagating if either propagate > 0 or
240 * PROPAGATE status was set.
241 *
242 * @param node the node
243 * @param propagate the return value from a tryAcquireShared
244 */
245 private void setHeadAndPropagate(Node node, long propagate) {
246 Node h = head; // Record old head for check below
247 setHead(node);
248 /*
249 * Try to signal next queued node if:
250 * Propagation was indicated by caller,
251 * or was recorded (as h.waitStatus either before
252 * or after setHead) by a previous operation
253 * (note: this uses sign-check of waitStatus because
254 * PROPAGATE status may transition to SIGNAL.)
255 * and
256 * The next node is waiting in shared mode,
257 * or we don't know, because it appears null
258 *
259 * The conservatism in both of these checks may cause
260 * unnecessary wake-ups, but only when there are multiple
261 * racing acquires/releases, so most need signals now or soon
262 * anyway.
263 */
264 if (propagate > 0 || h == null || h.waitStatus < 0 ||
265 (h = head) == null || h.waitStatus < 0) {
266 Node s = node.next;
267 if (s == null || s.isShared())
268 doReleaseShared();
269 }
270 }
271
272 // Utilities for various versions of acquire
273
274 /**
275 * Cancels an ongoing attempt to acquire.
276 *
277 * @param node the node
278 */
279 private void cancelAcquire(Node node) {
280 // Ignore if node doesn't exist
281 if (node == null)
282 return;
283
284 node.thread = null;
285
286 // Skip cancelled predecessors
287 Node pred = node.prev;
288 while (pred.waitStatus > 0)
289 node.prev = pred = pred.prev;
290
291 // predNext is the apparent node to unsplice. CASes below will
292 // fail if not, in which case, we lost race vs another cancel
293 // or signal, so no further action is necessary, although with
294 // a possibility that a cancelled node may transiently remain
295 // reachable.
296 Node predNext = pred.next;
297
298 // Can use unconditional write instead of CAS here.
299 // After this atomic step, other Nodes can skip past us.
300 // Before, we are free of interference from other threads.
301 node.waitStatus = Node.CANCELLED;
302
303 // If we are the tail, remove ourselves.
304 if (node == tail && compareAndSetTail(node, pred)) {
305 pred.compareAndSetNext(predNext, null);
306 } else {
307 // If successor needs signal, try to set pred's next-link
308 // so it will get one. Otherwise wake it up to propagate.
309 int ws;
310 if (pred != head &&
311 ((ws = pred.waitStatus) == Node.SIGNAL ||
312 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
313 pred.thread != null) {
314 Node next = node.next;
315 if (next != null && next.waitStatus <= 0)
316 pred.compareAndSetNext(predNext, next);
317 } else {
318 unparkSuccessor(node);
319 }
320
321 node.next = node; // help GC
322 }
323 }
324
325 /**
326 * Checks and updates status for a node that failed to acquire.
327 * Returns true if thread should block. This is the main signal
328 * control in all acquire loops. Requires that pred == node.prev.
329 *
330 * @param pred node's predecessor holding status
331 * @param node the node
332 * @return {@code true} if thread should block
333 */
334 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
335 int ws = pred.waitStatus;
336 if (ws == Node.SIGNAL)
337 /*
338 * This node has already set status asking a release
339 * to signal it, so it can safely park.
340 */
341 return true;
342 if (ws > 0) {
343 /*
344 * Predecessor was cancelled. Skip over predecessors and
345 * indicate retry.
346 */
347 do {
348 node.prev = pred = pred.prev;
349 } while (pred.waitStatus > 0);
350 pred.next = node;
351 } else {
352 /*
353 * waitStatus must be 0 or PROPAGATE. Indicate that we
354 * need a signal, but don't park yet. Caller will need to
355 * retry to make sure it cannot acquire before parking.
356 */
357 pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
358 }
359 return false;
360 }
361
362 /**
363 * Convenience method to interrupt current thread.
364 */
365 static void selfInterrupt() {
366 Thread.currentThread().interrupt();
367 }
368
369 /**
370 * Convenience method to park and then check if interrupted.
371 *
372 * @return {@code true} if interrupted
373 */
374 private final boolean parkAndCheckInterrupt() {
375 LockSupport.park(this);
376 return Thread.interrupted();
377 }
378
379 /*
380 * Various flavors of acquire, varying in exclusive/shared and
381 * control modes. Each is mostly the same, but annoyingly
382 * different. Only a little bit of factoring is possible due to
383 * interactions of exception mechanics (including ensuring that we
384 * cancel if tryAcquire throws exception) and other control, at
385 * least not without hurting performance too much.
386 */
387
388 /**
389 * Acquires in exclusive uninterruptible mode for thread already in
390 * queue. Used by condition wait methods as well as acquire.
391 *
392 * @param node the node
393 * @param arg the acquire argument
394 * @return {@code true} if interrupted while waiting
395 */
396 final boolean acquireQueued(final Node node, long arg) {
397 boolean interrupted = false;
398 try {
399 for (;;) {
400 final Node p = node.predecessor();
401 if (p == head && tryAcquire(arg)) {
402 setHead(node);
403 p.next = null; // help GC
404 return interrupted;
405 }
406 if (shouldParkAfterFailedAcquire(p, node))
407 interrupted |= parkAndCheckInterrupt();
408 }
409 } catch (Throwable t) {
410 cancelAcquire(node);
411 if (interrupted)
412 selfInterrupt();
413 throw t;
414 }
415 }
416
417 /**
418 * Acquires in exclusive interruptible mode.
419 * @param arg the acquire argument
420 */
421 private void doAcquireInterruptibly(long arg)
422 throws InterruptedException {
423 final Node node = addWaiter(Node.EXCLUSIVE);
424 try {
425 for (;;) {
426 final Node p = node.predecessor();
427 if (p == head && tryAcquire(arg)) {
428 setHead(node);
429 p.next = null; // help GC
430 return;
431 }
432 if (shouldParkAfterFailedAcquire(p, node) &&
433 parkAndCheckInterrupt())
434 throw new InterruptedException();
435 }
436 } catch (Throwable t) {
437 cancelAcquire(node);
438 throw t;
439 }
440 }
441
442 /**
443 * Acquires in exclusive timed mode.
444 *
445 * @param arg the acquire argument
446 * @param nanosTimeout max wait time
447 * @return {@code true} if acquired
448 */
449 private boolean doAcquireNanos(long arg, long nanosTimeout)
450 throws InterruptedException {
451 if (nanosTimeout <= 0L)
452 return false;
453 final long deadline = System.nanoTime() + nanosTimeout;
454 final Node node = addWaiter(Node.EXCLUSIVE);
455 try {
456 for (;;) {
457 final Node p = node.predecessor();
458 if (p == head && tryAcquire(arg)) {
459 setHead(node);
460 p.next = null; // help GC
461 return true;
462 }
463 nanosTimeout = deadline - System.nanoTime();
464 if (nanosTimeout <= 0L) {
465 cancelAcquire(node);
466 return false;
467 }
468 if (shouldParkAfterFailedAcquire(p, node) &&
469 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
470 LockSupport.parkNanos(this, nanosTimeout);
471 if (Thread.interrupted())
472 throw new InterruptedException();
473 }
474 } catch (Throwable t) {
475 cancelAcquire(node);
476 throw t;
477 }
478 }
479
480 /**
481 * Acquires in shared uninterruptible mode.
482 * @param arg the acquire argument
483 */
484 private void doAcquireShared(long arg) {
485 final Node node = addWaiter(Node.SHARED);
486 boolean interrupted = false;
487 try {
488 for (;;) {
489 final Node p = node.predecessor();
490 if (p == head) {
491 long r = tryAcquireShared(arg);
492 if (r >= 0) {
493 setHeadAndPropagate(node, r);
494 p.next = null; // help GC
495 return;
496 }
497 }
498 if (shouldParkAfterFailedAcquire(p, node))
499 interrupted |= parkAndCheckInterrupt();
500 }
501 } catch (Throwable t) {
502 cancelAcquire(node);
503 throw t;
504 } finally {
505 if (interrupted)
506 selfInterrupt();
507 }
508 }
509
510 /**
511 * Acquires in shared interruptible mode.
512 * @param arg the acquire argument
513 */
514 private void doAcquireSharedInterruptibly(long arg)
515 throws InterruptedException {
516 final Node node = addWaiter(Node.SHARED);
517 try {
518 for (;;) {
519 final Node p = node.predecessor();
520 if (p == head) {
521 long r = tryAcquireShared(arg);
522 if (r >= 0) {
523 setHeadAndPropagate(node, r);
524 p.next = null; // help GC
525 return;
526 }
527 }
528 if (shouldParkAfterFailedAcquire(p, node) &&
529 parkAndCheckInterrupt())
530 throw new InterruptedException();
531 }
532 } catch (Throwable t) {
533 cancelAcquire(node);
534 throw t;
535 }
536 }
537
538 /**
539 * Acquires in shared timed mode.
540 *
541 * @param arg the acquire argument
542 * @param nanosTimeout max wait time
543 * @return {@code true} if acquired
544 */
545 private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
546 throws InterruptedException {
547 if (nanosTimeout <= 0L)
548 return false;
549 final long deadline = System.nanoTime() + nanosTimeout;
550 final Node node = addWaiter(Node.SHARED);
551 try {
552 for (;;) {
553 final Node p = node.predecessor();
554 if (p == head) {
555 long r = tryAcquireShared(arg);
556 if (r >= 0) {
557 setHeadAndPropagate(node, r);
558 p.next = null; // help GC
559 return true;
560 }
561 }
562 nanosTimeout = deadline - System.nanoTime();
563 if (nanosTimeout <= 0L) {
564 cancelAcquire(node);
565 return false;
566 }
567 if (shouldParkAfterFailedAcquire(p, node) &&
568 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
569 LockSupport.parkNanos(this, nanosTimeout);
570 if (Thread.interrupted())
571 throw new InterruptedException();
572 }
573 } catch (Throwable t) {
574 cancelAcquire(node);
575 throw t;
576 }
577 }
578
579 // Main exported methods
580
581 /**
582 * Attempts to acquire in exclusive mode. This method should query
583 * if the state of the object permits it to be acquired in the
584 * exclusive mode, and if so to acquire it.
585 *
586 * <p>This method is always invoked by the thread performing
587 * acquire. If this method reports failure, the acquire method
588 * may queue the thread, if it is not already queued, until it is
589 * signalled by a release from some other thread. This can be used
590 * to implement method {@link Lock#tryLock()}.
591 *
592 * <p>The default
593 * implementation throws {@link UnsupportedOperationException}.
594 *
595 * @param arg the acquire argument. This value is always the one
596 * passed to an acquire method, or is the value saved on entry
597 * to a condition wait. The value is otherwise uninterpreted
598 * and can represent anything you like.
599 * @return {@code true} if successful. Upon success, this object has
600 * been acquired.
601 * @throws IllegalMonitorStateException if acquiring would place this
602 * synchronizer in an illegal state. This exception must be
603 * thrown in a consistent fashion for synchronization to work
604 * correctly.
605 * @throws UnsupportedOperationException if exclusive mode is not supported
606 */
607 protected boolean tryAcquire(long arg) {
608 throw new UnsupportedOperationException();
609 }
610
611 /**
612 * Attempts to set the state to reflect a release in exclusive
613 * mode.
614 *
615 * <p>This method is always invoked by the thread performing release.
616 *
617 * <p>The default implementation throws
618 * {@link UnsupportedOperationException}.
619 *
620 * @param arg the release argument. This value is always the one
621 * passed to a release method, or the current state value upon
622 * entry to a condition wait. The value is otherwise
623 * uninterpreted and can represent anything you like.
624 * @return {@code true} if this object is now in a fully released
625 * state, so that any waiting threads may attempt to acquire;
626 * and {@code false} otherwise.
627 * @throws IllegalMonitorStateException if releasing would place this
628 * synchronizer in an illegal state. This exception must be
629 * thrown in a consistent fashion for synchronization to work
630 * correctly.
631 * @throws UnsupportedOperationException if exclusive mode is not supported
632 */
633 protected boolean tryRelease(long arg) {
634 throw new UnsupportedOperationException();
635 }
636
637 /**
638 * Attempts to acquire in shared mode. This method should query if
639 * the state of the object permits it to be acquired in the shared
640 * mode, and if so to acquire it.
641 *
642 * <p>This method is always invoked by the thread performing
643 * acquire. If this method reports failure, the acquire method
644 * may queue the thread, if it is not already queued, until it is
645 * signalled by a release from some other thread.
646 *
647 * <p>The default implementation throws {@link
648 * UnsupportedOperationException}.
649 *
650 * @param arg the acquire argument. This value is always the one
651 * passed to an acquire method, or is the value saved on entry
652 * to a condition wait. The value is otherwise uninterpreted
653 * and can represent anything you like.
654 * @return a negative value on failure; zero if acquisition in shared
655 * mode succeeded but no subsequent shared-mode acquire can
656 * succeed; and a positive value if acquisition in shared
657 * mode succeeded and subsequent shared-mode acquires might
658 * also succeed, in which case a subsequent waiting thread
659 * must check availability. (Support for three different
660 * return values enables this method to be used in contexts
661 * where acquires only sometimes act exclusively.) Upon
662 * success, this object has been acquired.
663 * @throws IllegalMonitorStateException if acquiring would place this
664 * synchronizer in an illegal state. This exception must be
665 * thrown in a consistent fashion for synchronization to work
666 * correctly.
667 * @throws UnsupportedOperationException if shared mode is not supported
668 */
669 protected long tryAcquireShared(long arg) {
670 throw new UnsupportedOperationException();
671 }
672
673 /**
674 * Attempts to set the state to reflect a release in shared mode.
675 *
676 * <p>This method is always invoked by the thread performing release.
677 *
678 * <p>The default implementation throws
679 * {@link UnsupportedOperationException}.
680 *
681 * @param arg the release argument. This value is always the one
682 * passed to a release method, or the current state value upon
683 * entry to a condition wait. The value is otherwise
684 * uninterpreted and can represent anything you like.
685 * @return {@code true} if this release of shared mode may permit a
686 * waiting acquire (shared or exclusive) to succeed; and
687 * {@code false} otherwise
688 * @throws IllegalMonitorStateException if releasing would place this
689 * synchronizer in an illegal state. This exception must be
690 * thrown in a consistent fashion for synchronization to work
691 * correctly.
692 * @throws UnsupportedOperationException if shared mode is not supported
693 */
694 protected boolean tryReleaseShared(long arg) {
695 throw new UnsupportedOperationException();
696 }
697
698 /**
699 * Returns {@code true} if synchronization is held exclusively with
700 * respect to the current (calling) thread. This method is invoked
701 * upon each call to a {@link ConditionObject} method.
702 *
703 * <p>The default implementation throws {@link
704 * UnsupportedOperationException}. This method is invoked
705 * internally only within {@link ConditionObject} methods, so need
706 * not be defined if conditions are not used.
707 *
708 * @return {@code true} if synchronization is held exclusively;
709 * {@code false} otherwise
710 * @throws UnsupportedOperationException if conditions are not supported
711 */
712 protected boolean isHeldExclusively() {
713 throw new UnsupportedOperationException();
714 }
715
716 /**
717 * Acquires in exclusive mode, ignoring interrupts. Implemented
718 * by invoking at least once {@link #tryAcquire},
719 * returning on success. Otherwise the thread is queued, possibly
720 * repeatedly blocking and unblocking, invoking {@link
721 * #tryAcquire} until success. This method can be used
722 * to implement method {@link Lock#lock}.
723 *
724 * @param arg the acquire argument. This value is conveyed to
725 * {@link #tryAcquire} but is otherwise uninterpreted and
726 * can represent anything you like.
727 */
728 public final void acquire(long arg) {
729 if (!tryAcquire(arg) &&
730 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
731 selfInterrupt();
732 }
733
734 /**
735 * Acquires in exclusive mode, aborting if interrupted.
736 * Implemented by first checking interrupt status, then invoking
737 * at least once {@link #tryAcquire}, returning on
738 * success. Otherwise the thread is queued, possibly repeatedly
739 * blocking and unblocking, invoking {@link #tryAcquire}
740 * until success or the thread is interrupted. This method can be
741 * used to implement method {@link Lock#lockInterruptibly}.
742 *
743 * @param arg the acquire argument. This value is conveyed to
744 * {@link #tryAcquire} but is otherwise uninterpreted and
745 * can represent anything you like.
746 * @throws InterruptedException if the current thread is interrupted
747 */
748 public final void acquireInterruptibly(long arg)
749 throws InterruptedException {
750 if (Thread.interrupted())
751 throw new InterruptedException();
752 if (!tryAcquire(arg))
753 doAcquireInterruptibly(arg);
754 }
755
756 /**
757 * Attempts to acquire in exclusive mode, aborting if interrupted,
758 * and failing if the given timeout elapses. Implemented by first
759 * checking interrupt status, then invoking at least once {@link
760 * #tryAcquire}, returning on success. Otherwise, the thread is
761 * queued, possibly repeatedly blocking and unblocking, invoking
762 * {@link #tryAcquire} until success or the thread is interrupted
763 * or the timeout elapses. This method can be used to implement
764 * method {@link Lock#tryLock(long, TimeUnit)}.
765 *
766 * @param arg the acquire argument. This value is conveyed to
767 * {@link #tryAcquire} but is otherwise uninterpreted and
768 * can represent anything you like.
769 * @param nanosTimeout the maximum number of nanoseconds to wait
770 * @return {@code true} if acquired; {@code false} if timed out
771 * @throws InterruptedException if the current thread is interrupted
772 */
773 public final boolean tryAcquireNanos(long arg, long nanosTimeout)
774 throws InterruptedException {
775 if (Thread.interrupted())
776 throw new InterruptedException();
777 return tryAcquire(arg) ||
778 doAcquireNanos(arg, nanosTimeout);
779 }
780
781 /**
782 * Releases in exclusive mode. Implemented by unblocking one or
783 * more threads if {@link #tryRelease} returns true.
784 * This method can be used to implement method {@link Lock#unlock}.
785 *
786 * @param arg the release argument. This value is conveyed to
787 * {@link #tryRelease} but is otherwise uninterpreted and
788 * can represent anything you like.
789 * @return the value returned from {@link #tryRelease}
790 */
791 public final boolean release(long arg) {
792 if (tryRelease(arg)) {
793 Node h = head;
794 if (h != null && h.waitStatus != 0)
795 unparkSuccessor(h);
796 return true;
797 }
798 return false;
799 }
800
801 /**
802 * Acquires in shared mode, ignoring interrupts. Implemented by
803 * first invoking at least once {@link #tryAcquireShared},
804 * returning on success. Otherwise the thread is queued, possibly
805 * repeatedly blocking and unblocking, invoking {@link
806 * #tryAcquireShared} until success.
807 *
808 * @param arg the acquire argument. This value is conveyed to
809 * {@link #tryAcquireShared} but is otherwise uninterpreted
810 * and can represent anything you like.
811 */
812 public final void acquireShared(long arg) {
813 if (tryAcquireShared(arg) < 0)
814 doAcquireShared(arg);
815 }
816
817 /**
818 * Acquires in shared mode, aborting if interrupted. Implemented
819 * by first checking interrupt status, then invoking at least once
820 * {@link #tryAcquireShared}, returning on success. Otherwise the
821 * thread is queued, possibly repeatedly blocking and unblocking,
822 * invoking {@link #tryAcquireShared} until success or the thread
823 * is interrupted.
824 * @param arg the acquire argument.
825 * This value is conveyed to {@link #tryAcquireShared} but is
826 * otherwise uninterpreted and can represent anything
827 * you like.
828 * @throws InterruptedException if the current thread is interrupted
829 */
830 public final void acquireSharedInterruptibly(long arg)
831 throws InterruptedException {
832 if (Thread.interrupted())
833 throw new InterruptedException();
834 if (tryAcquireShared(arg) < 0)
835 doAcquireSharedInterruptibly(arg);
836 }
837
838 /**
839 * Attempts to acquire in shared mode, aborting if interrupted, and
840 * failing if the given timeout elapses. Implemented by first
841 * checking interrupt status, then invoking at least once {@link
842 * #tryAcquireShared}, returning on success. Otherwise, the
843 * thread is queued, possibly repeatedly blocking and unblocking,
844 * invoking {@link #tryAcquireShared} until success or the thread
845 * is interrupted or the timeout elapses.
846 *
847 * @param arg the acquire argument. This value is conveyed to
848 * {@link #tryAcquireShared} but is otherwise uninterpreted
849 * and can represent anything you like.
850 * @param nanosTimeout the maximum number of nanoseconds to wait
851 * @return {@code true} if acquired; {@code false} if timed out
852 * @throws InterruptedException if the current thread is interrupted
853 */
854 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
855 throws InterruptedException {
856 if (Thread.interrupted())
857 throw new InterruptedException();
858 return tryAcquireShared(arg) >= 0 ||
859 doAcquireSharedNanos(arg, nanosTimeout);
860 }
861
862 /**
863 * Releases in shared mode. Implemented by unblocking one or more
864 * threads if {@link #tryReleaseShared} returns true.
865 *
866 * @param arg the release argument. This value is conveyed to
867 * {@link #tryReleaseShared} but is otherwise uninterpreted
868 * and can represent anything you like.
869 * @return the value returned from {@link #tryReleaseShared}
870 */
871 public final boolean releaseShared(long arg) {
872 if (tryReleaseShared(arg)) {
873 doReleaseShared();
874 return true;
875 }
876 return false;
877 }
878
879 // Queue inspection methods
880
881 /**
882 * Queries whether any threads are waiting to acquire. Note that
883 * because cancellations due to interrupts and timeouts may occur
884 * at any time, a {@code true} return does not guarantee that any
885 * other thread will ever acquire.
886 *
887 * @return {@code true} if there may be other threads waiting to acquire
888 */
889 public final boolean hasQueuedThreads() {
890 for (Node p = tail, h = head; p != h && p != null; p = p.prev)
891 if (p.waitStatus <= 0)
892 return true;
893 return false;
894 }
895
896 /**
897 * Queries whether any threads have ever contended to acquire this
898 * synchronizer; that is, if an acquire method has ever blocked.
899 *
900 * <p>In this implementation, this operation returns in
901 * constant time.
902 *
903 * @return {@code true} if there has ever been contention
904 */
905 public final boolean hasContended() {
906 return head != null;
907 }
908
909 /**
910 * Returns the first (longest-waiting) thread in the queue, or
911 * {@code null} if no threads are currently queued.
912 *
913 * <p>In this implementation, this operation normally returns in
914 * constant time, but may iterate upon contention if other threads are
915 * concurrently modifying the queue.
916 *
917 * @return the first (longest-waiting) thread in the queue, or
918 * {@code null} if no threads are currently queued
919 */
920 public final Thread getFirstQueuedThread() {
921 // handle only fast path, else relay
922 return (head == tail) ? null : fullGetFirstQueuedThread();
923 }
924
925 /**
926 * Version of getFirstQueuedThread called when fastpath fails.
927 */
928 private Thread fullGetFirstQueuedThread() {
929 /*
930 * The first node is normally head.next. Try to get its
931 * thread field, ensuring consistent reads: If thread
932 * field is nulled out or s.prev is no longer head, then
933 * some other thread(s) concurrently performed setHead in
934 * between some of our reads. We try this twice before
935 * resorting to traversal.
936 */
937 Node h, s;
938 Thread st;
939 if (((h = head) != null && (s = h.next) != null &&
940 s.prev == head && (st = s.thread) != null) ||
941 ((h = head) != null && (s = h.next) != null &&
942 s.prev == head && (st = s.thread) != null))
943 return st;
944
945 /*
946 * Head's next field might not have been set yet, or may have
947 * been unset after setHead. So we must check to see if tail
948 * is actually first node. If not, we continue on, safely
949 * traversing from tail back to head to find first,
950 * guaranteeing termination.
951 */
952
953 Thread firstThread = null;
954 for (Node p = tail; p != null && p != head; p = p.prev) {
955 Thread t = p.thread;
956 if (t != null)
957 firstThread = t;
958 }
959 return firstThread;
960 }
961
962 /**
963 * Returns true if the given thread is currently queued.
964 *
965 * <p>This implementation traverses the queue to determine
966 * presence of the given thread.
967 *
968 * @param thread the thread
969 * @return {@code true} if the given thread is on the queue
970 * @throws NullPointerException if the thread is null
971 */
972 public final boolean isQueued(Thread thread) {
973 if (thread == null)
974 throw new NullPointerException();
975 for (Node p = tail; p != null; p = p.prev)
976 if (p.thread == thread)
977 return true;
978 return false;
979 }
980
981 /**
982 * Returns {@code true} if the apparent first queued thread, if one
983 * exists, is waiting in exclusive mode. If this method returns
984 * {@code true}, and the current thread is attempting to acquire in
985 * shared mode (that is, this method is invoked from {@link
986 * #tryAcquireShared}) then it is guaranteed that the current thread
987 * is not the first queued thread. Used only as a heuristic in
988 * ReentrantReadWriteLock.
989 */
990 final boolean apparentlyFirstQueuedIsExclusive() {
991 Node h, s;
992 return (h = head) != null &&
993 (s = h.next) != null &&
994 !s.isShared() &&
995 s.thread != null;
996 }
997
998 /**
999 * Queries whether any threads have been waiting to acquire longer
1000 * than the current thread.
1001 *
1002 * <p>An invocation of this method is equivalent to (but may be
1003 * more efficient than):
1004 * <pre> {@code
1005 * getFirstQueuedThread() != Thread.currentThread()
1006 * && hasQueuedThreads()}</pre>
1007 *
1008 * <p>Note that because cancellations due to interrupts and
1009 * timeouts may occur at any time, a {@code true} return does not
1010 * guarantee that some other thread will acquire before the current
1011 * thread. Likewise, it is possible for another thread to win a
1012 * race to enqueue after this method has returned {@code false},
1013 * due to the queue being empty.
1014 *
1015 * <p>This method is designed to be used by a fair synchronizer to
1016 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
1017 * Such a synchronizer's {@link #tryAcquire} method should return
1018 * {@code false}, and its {@link #tryAcquireShared} method should
1019 * return a negative value, if this method returns {@code true}
1020 * (unless this is a reentrant acquire). For example, the {@code
1021 * tryAcquire} method for a fair, reentrant, exclusive mode
1022 * synchronizer might look like this:
1023 *
1024 * <pre> {@code
1025 * protected boolean tryAcquire(int arg) {
1026 * if (isHeldExclusively()) {
1027 * // A reentrant acquire; increment hold count
1028 * return true;
1029 * } else if (hasQueuedPredecessors()) {
1030 * return false;
1031 * } else {
1032 * // try to acquire normally
1033 * }
1034 * }}</pre>
1035 *
1036 * @return {@code true} if there is a queued thread preceding the
1037 * current thread, and {@code false} if the current thread
1038 * is at the head of the queue or the queue is empty
1039 * @since 1.7
1040 */
1041 public final boolean hasQueuedPredecessors() {
1042 Node h, s;
1043 if ((h = head) != null) {
1044 if ((s = h.next) == null || s.waitStatus > 0) {
1045 s = null; // traverse in case of concurrent cancellation
1046 for (Node p = tail; p != h && p != null; p = p.prev) {
1047 if (p.waitStatus <= 0)
1048 s = p;
1049 }
1050 }
1051 if (s != null && s.thread != Thread.currentThread())
1052 return true;
1053 }
1054 return false;
1055 }
1056
1057 // Instrumentation and monitoring methods
1058
1059 /**
1060 * Returns an estimate of the number of threads waiting to
1061 * acquire. The value is only an estimate because the number of
1062 * threads may change dynamically while this method traverses
1063 * internal data structures. This method is designed for use in
1064 * monitoring system state, not for synchronization control.
1065 *
1066 * @return the estimated number of threads waiting to acquire
1067 */
1068 public final int getQueueLength() {
1069 int n = 0;
1070 for (Node p = tail; p != null; p = p.prev) {
1071 if (p.thread != null)
1072 ++n;
1073 }
1074 return n;
1075 }
1076
1077 /**
1078 * Returns a collection containing threads that may be waiting to
1079 * acquire. Because the actual set of threads may change
1080 * dynamically while constructing this result, the returned
1081 * collection is only a best-effort estimate. The elements of the
1082 * returned collection are in no particular order. This method is
1083 * designed to facilitate construction of subclasses that provide
1084 * more extensive monitoring facilities.
1085 *
1086 * @return the collection of threads
1087 */
1088 public final Collection<Thread> getQueuedThreads() {
1089 ArrayList<Thread> list = new ArrayList<>();
1090 for (Node p = tail; p != null; p = p.prev) {
1091 Thread t = p.thread;
1092 if (t != null)
1093 list.add(t);
1094 }
1095 return list;
1096 }
1097
1098 /**
1099 * Returns a collection containing threads that may be waiting to
1100 * acquire in exclusive mode. This has the same properties
1101 * as {@link #getQueuedThreads} except that it only returns
1102 * those threads waiting due to an exclusive acquire.
1103 *
1104 * @return the collection of threads
1105 */
1106 public final Collection<Thread> getExclusiveQueuedThreads() {
1107 ArrayList<Thread> list = new ArrayList<>();
1108 for (Node p = tail; p != null; p = p.prev) {
1109 if (!p.isShared()) {
1110 Thread t = p.thread;
1111 if (t != null)
1112 list.add(t);
1113 }
1114 }
1115 return list;
1116 }
1117
1118 /**
1119 * Returns a collection containing threads that may be waiting to
1120 * acquire in shared mode. This has the same properties
1121 * as {@link #getQueuedThreads} except that it only returns
1122 * those threads waiting due to a shared acquire.
1123 *
1124 * @return the collection of threads
1125 */
1126 public final Collection<Thread> getSharedQueuedThreads() {
1127 ArrayList<Thread> list = new ArrayList<>();
1128 for (Node p = tail; p != null; p = p.prev) {
1129 if (p.isShared()) {
1130 Thread t = p.thread;
1131 if (t != null)
1132 list.add(t);
1133 }
1134 }
1135 return list;
1136 }
1137
1138 /**
1139 * Returns a string identifying this synchronizer, as well as its state.
1140 * The state, in brackets, includes the String {@code "State ="}
1141 * followed by the current value of {@link #getState}, and either
1142 * {@code "nonempty"} or {@code "empty"} depending on whether the
1143 * queue is empty.
1144 *
1145 * @return a string identifying this synchronizer, as well as its state
1146 */
1147 public String toString() {
1148 return super.toString()
1149 + "[State = " + getState() + ", "
1150 + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1151 }
1152
1153
1154 // Internal support methods for Conditions
1155
1156 /**
1157 * Returns true if a node, always one that was initially placed on
1158 * a condition queue, is now waiting to reacquire on sync queue.
1159 * @param node the node
1160 * @return true if is reacquiring
1161 */
1162 final boolean isOnSyncQueue(Node node) {
1163 if (node.waitStatus == Node.CONDITION || node.prev == null)
1164 return false;
1165 if (node.next != null) // If has successor, it must be on queue
1166 return true;
1167 /*
1168 * node.prev can be non-null, but not yet on queue because
1169 * the CAS to place it on queue can fail. So we have to
1170 * traverse from tail to make sure it actually made it. It
1171 * will always be near the tail in calls to this method, and
1172 * unless the CAS failed (which is unlikely), it will be
1173 * there, so we hardly ever traverse much.
1174 */
1175 return findNodeFromTail(node);
1176 }
1177
1178 /**
1179 * Returns true if node is on sync queue by searching backwards from tail.
1180 * Called only when needed by isOnSyncQueue.
1181 * @return true if present
1182 */
1183 private boolean findNodeFromTail(Node node) {
1184 // We check for node first, since it's likely to be at or near tail.
1185 // tail is known to be non-null, so we could re-order to "save"
1186 // one null check, but we leave it this way to help the VM.
1187 for (Node p = tail;;) {
1188 if (p == node)
1189 return true;
1190 if (p == null)
1191 return false;
1192 p = p.prev;
1193 }
1194 }
1195
1196 /**
1197 * Transfers a node from a condition queue onto sync queue.
1198 * Returns true if successful.
1199 * @param node the node
1200 * @return true if successfully transferred (else the node was
1201 * cancelled before signal)
1202 */
1203 final boolean transferForSignal(Node node) {
1204 /*
1205 * If cannot change waitStatus, the node has been cancelled.
1206 */
1207 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1208 return false;
1209
1210 /*
1211 * Splice onto queue and try to set waitStatus of predecessor to
1212 * indicate that thread is (probably) waiting. If cancelled or
1213 * attempt to set waitStatus fails, wake up to resync (in which
1214 * case the waitStatus can be transiently and harmlessly wrong).
1215 */
1216 Node p = enq(node);
1217 int ws = p.waitStatus;
1218 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1219 LockSupport.unpark(node.thread);
1220 return true;
1221 }
1222
1223 /**
1224 * Transfers node, if necessary, to sync queue after a cancelled wait.
1225 * Returns true if thread was cancelled before being signalled.
1226 *
1227 * @param node the node
1228 * @return true if cancelled before the node was signalled
1229 */
1230 final boolean transferAfterCancelledWait(Node node) {
1231 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1232 enq(node);
1233 return true;
1234 }
1235 /*
1236 * If we lost out to a signal(), then we can't proceed
1237 * until it finishes its enq(). Cancelling during an
1238 * incomplete transfer is both rare and transient, so just
1239 * spin.
1240 */
1241 while (!isOnSyncQueue(node))
1242 Thread.yield();
1243 return false;
1244 }
1245
1246 /**
1247 * Invokes release with current state value; returns saved state.
1248 * Cancels node and throws exception on failure.
1249 * @param node the condition node for this wait
1250 * @return previous sync state
1251 */
1252 final long fullyRelease(Node node) {
1253 try {
1254 long savedState = getState();
1255 if (release(savedState))
1256 return savedState;
1257 throw new IllegalMonitorStateException();
1258 } catch (Throwable t) {
1259 node.waitStatus = Node.CANCELLED;
1260 throw t;
1261 }
1262 }
1263
1264 // Instrumentation methods for conditions
1265
1266 /**
1267 * Queries whether the given ConditionObject
1268 * uses this synchronizer as its lock.
1269 *
1270 * @param condition the condition
1271 * @return {@code true} if owned
1272 * @throws NullPointerException if the condition is null
1273 */
1274 public final boolean owns(ConditionObject condition) {
1275 return condition.isOwnedBy(this);
1276 }
1277
1278 /**
1279 * Queries whether any threads are waiting on the given condition
1280 * associated with this synchronizer. Note that because timeouts
1281 * and interrupts may occur at any time, a {@code true} return
1282 * does not guarantee that a future {@code signal} will awaken
1283 * any threads. This method is designed primarily for use in
1284 * monitoring of the system state.
1285 *
1286 * @param condition the condition
1287 * @return {@code true} if there are any waiting threads
1288 * @throws IllegalMonitorStateException if exclusive synchronization
1289 * is not held
1290 * @throws IllegalArgumentException if the given condition is
1291 * not associated with this synchronizer
1292 * @throws NullPointerException if the condition is null
1293 */
1294 public final boolean hasWaiters(ConditionObject condition) {
1295 if (!owns(condition))
1296 throw new IllegalArgumentException("Not owner");
1297 return condition.hasWaiters();
1298 }
1299
1300 /**
1301 * Returns an estimate of the number of threads waiting on the
1302 * given condition associated with this synchronizer. Note that
1303 * because timeouts and interrupts may occur at any time, the
1304 * estimate serves only as an upper bound on the actual number of
1305 * waiters. This method is designed for use in monitoring system
1306 * state, not for synchronization control.
1307 *
1308 * @param condition the condition
1309 * @return the estimated number of waiting threads
1310 * @throws IllegalMonitorStateException if exclusive synchronization
1311 * is not held
1312 * @throws IllegalArgumentException if the given condition is
1313 * not associated with this synchronizer
1314 * @throws NullPointerException if the condition is null
1315 */
1316 public final int getWaitQueueLength(ConditionObject condition) {
1317 if (!owns(condition))
1318 throw new IllegalArgumentException("Not owner");
1319 return condition.getWaitQueueLength();
1320 }
1321
1322 /**
1323 * Returns a collection containing those threads that may be
1324 * waiting on the given condition associated with this
1325 * synchronizer. Because the actual set of threads may change
1326 * dynamically while constructing this result, the returned
1327 * collection is only a best-effort estimate. The elements of the
1328 * returned collection are in no particular order.
1329 *
1330 * @param condition the condition
1331 * @return the collection of threads
1332 * @throws IllegalMonitorStateException if exclusive synchronization
1333 * is not held
1334 * @throws IllegalArgumentException if the given condition is
1335 * not associated with this synchronizer
1336 * @throws NullPointerException if the condition is null
1337 */
1338 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1339 if (!owns(condition))
1340 throw new IllegalArgumentException("Not owner");
1341 return condition.getWaitingThreads();
1342 }
1343
1344 /**
1345 * Condition implementation for a {@link AbstractQueuedLongSynchronizer}
1346 * serving as the basis of a {@link Lock} implementation.
1347 *
1348 * <p>Method documentation for this class describes mechanics,
1349 * not behavioral specifications from the point of view of Lock
1350 * and Condition users. Exported versions of this class will in
1351 * general need to be accompanied by documentation describing
1352 * condition semantics that rely on those of the associated
1353 * {@code AbstractQueuedLongSynchronizer}.
1354 *
1355 * <p>This class is Serializable, but all fields are transient,
1356 * so deserialized conditions have no waiters.
1357 *
1358 * @since 1.6
1359 */
1360 public class ConditionObject implements Condition, java.io.Serializable {
1361 private static final long serialVersionUID = 1173984872572414699L;
1362 /** First node of condition queue. */
1363 private transient Node firstWaiter;
1364 /** Last node of condition queue. */
1365 private transient Node lastWaiter;
1366
1367 /**
1368 * Creates a new {@code ConditionObject} instance.
1369 */
1370 public ConditionObject() { }
1371
1372 // Internal methods
1373
1374 /**
1375 * Adds a new waiter to wait queue.
1376 * @return its new wait node
1377 */
1378 private Node addConditionWaiter() {
1379 if (!isHeldExclusively())
1380 throw new IllegalMonitorStateException();
1381 Node t = lastWaiter;
1382 // If lastWaiter is cancelled, clean out.
1383 if (t != null && t.waitStatus != Node.CONDITION) {
1384 unlinkCancelledWaiters();
1385 t = lastWaiter;
1386 }
1387
1388 Node node = new Node(Node.CONDITION);
1389
1390 if (t == null)
1391 firstWaiter = node;
1392 else
1393 t.nextWaiter = node;
1394 lastWaiter = node;
1395 return node;
1396 }
1397
1398 /**
1399 * Removes and transfers nodes until hit non-cancelled one or
1400 * null. Split out from signal in part to encourage compilers
1401 * to inline the case of no waiters.
1402 * @param first (non-null) the first node on condition queue
1403 */
1404 private void doSignal(Node first) {
1405 do {
1406 if ( (firstWaiter = first.nextWaiter) == null)
1407 lastWaiter = null;
1408 first.nextWaiter = null;
1409 } while (!transferForSignal(first) &&
1410 (first = firstWaiter) != null);
1411 }
1412
1413 /**
1414 * Removes and transfers all nodes.
1415 * @param first (non-null) the first node on condition queue
1416 */
1417 private void doSignalAll(Node first) {
1418 lastWaiter = firstWaiter = null;
1419 do {
1420 Node next = first.nextWaiter;
1421 first.nextWaiter = null;
1422 transferForSignal(first);
1423 first = next;
1424 } while (first != null);
1425 }
1426
1427 /**
1428 * Unlinks cancelled waiter nodes from condition queue.
1429 * Called only while holding lock. This is called when
1430 * cancellation occurred during condition wait, and upon
1431 * insertion of a new waiter when lastWaiter is seen to have
1432 * been cancelled. This method is needed to avoid garbage
1433 * retention in the absence of signals. So even though it may
1434 * require a full traversal, it comes into play only when
1435 * timeouts or cancellations occur in the absence of
1436 * signals. It traverses all nodes rather than stopping at a
1437 * particular target to unlink all pointers to garbage nodes
1438 * without requiring many re-traversals during cancellation
1439 * storms.
1440 */
1441 private void unlinkCancelledWaiters() {
1442 Node t = firstWaiter;
1443 Node trail = null;
1444 while (t != null) {
1445 Node next = t.nextWaiter;
1446 if (t.waitStatus != Node.CONDITION) {
1447 t.nextWaiter = null;
1448 if (trail == null)
1449 firstWaiter = next;
1450 else
1451 trail.nextWaiter = next;
1452 if (next == null)
1453 lastWaiter = trail;
1454 }
1455 else
1456 trail = t;
1457 t = next;
1458 }
1459 }
1460
1461 // public methods
1462
1463 /**
1464 * Moves the longest-waiting thread, if one exists, from the
1465 * wait queue for this condition to the wait queue for the
1466 * owning lock.
1467 *
1468 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1469 * returns {@code false}
1470 */
1471 public final void signal() {
1472 if (!isHeldExclusively())
1473 throw new IllegalMonitorStateException();
1474 Node first = firstWaiter;
1475 if (first != null)
1476 doSignal(first);
1477 }
1478
1479 /**
1480 * Moves all threads from the wait queue for this condition to
1481 * the wait queue for the owning lock.
1482 *
1483 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1484 * returns {@code false}
1485 */
1486 public final void signalAll() {
1487 if (!isHeldExclusively())
1488 throw new IllegalMonitorStateException();
1489 Node first = firstWaiter;
1490 if (first != null)
1491 doSignalAll(first);
1492 }
1493
1494 /**
1495 * Implements uninterruptible condition wait.
1496 * <ol>
1497 * <li>Save lock state returned by {@link #getState}.
1498 * <li>Invoke {@link #release} with saved state as argument,
1499 * throwing IllegalMonitorStateException if it fails.
1500 * <li>Block until signalled.
1501 * <li>Reacquire by invoking specialized version of
1502 * {@link #acquire} with saved state as argument.
1503 * </ol>
1504 */
1505 public final void awaitUninterruptibly() {
1506 Node node = addConditionWaiter();
1507 long savedState = fullyRelease(node);
1508 boolean interrupted = false;
1509 while (!isOnSyncQueue(node)) {
1510 LockSupport.park(this);
1511 if (Thread.interrupted())
1512 interrupted = true;
1513 }
1514 if (acquireQueued(node, savedState) || interrupted)
1515 selfInterrupt();
1516 }
1517
1518 /*
1519 * For interruptible waits, we need to track whether to throw
1520 * InterruptedException, if interrupted while blocked on
1521 * condition, versus reinterrupt current thread, if
1522 * interrupted while blocked waiting to re-acquire.
1523 */
1524
1525 /** Mode meaning to reinterrupt on exit from wait */
1526 private static final int REINTERRUPT = 1;
1527 /** Mode meaning to throw InterruptedException on exit from wait */
1528 private static final int THROW_IE = -1;
1529
1530 /**
1531 * Checks for interrupt, returning THROW_IE if interrupted
1532 * before signalled, REINTERRUPT if after signalled, or
1533 * 0 if not interrupted.
1534 */
1535 private int checkInterruptWhileWaiting(Node node) {
1536 return Thread.interrupted() ?
1537 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
1538 0;
1539 }
1540
1541 /**
1542 * Throws InterruptedException, reinterrupts current thread, or
1543 * does nothing, depending on mode.
1544 */
1545 private void reportInterruptAfterWait(int interruptMode)
1546 throws InterruptedException {
1547 if (interruptMode == THROW_IE)
1548 throw new InterruptedException();
1549 else if (interruptMode == REINTERRUPT)
1550 selfInterrupt();
1551 }
1552
1553 /**
1554 * Implements interruptible condition wait.
1555 * <ol>
1556 * <li>If current thread is interrupted, throw InterruptedException.
1557 * <li>Save lock state returned by {@link #getState}.
1558 * <li>Invoke {@link #release} with saved state as argument,
1559 * throwing IllegalMonitorStateException if it fails.
1560 * <li>Block until signalled or interrupted.
1561 * <li>Reacquire by invoking specialized version of
1562 * {@link #acquire} with saved state as argument.
1563 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1564 * </ol>
1565 */
1566 public final void await() throws InterruptedException {
1567 if (Thread.interrupted())
1568 throw new InterruptedException();
1569 Node node = addConditionWaiter();
1570 long savedState = fullyRelease(node);
1571 int interruptMode = 0;
1572 while (!isOnSyncQueue(node)) {
1573 LockSupport.park(this);
1574 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1575 break;
1576 }
1577 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1578 interruptMode = REINTERRUPT;
1579 if (node.nextWaiter != null) // clean up if cancelled
1580 unlinkCancelledWaiters();
1581 if (interruptMode != 0)
1582 reportInterruptAfterWait(interruptMode);
1583 }
1584
1585 /**
1586 * Implements timed condition wait.
1587 * <ol>
1588 * <li>If current thread is interrupted, throw InterruptedException.
1589 * <li>Save lock state returned by {@link #getState}.
1590 * <li>Invoke {@link #release} with saved state as argument,
1591 * throwing IllegalMonitorStateException if it fails.
1592 * <li>Block until signalled, interrupted, or timed out.
1593 * <li>Reacquire by invoking specialized version of
1594 * {@link #acquire} with saved state as argument.
1595 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1596 * </ol>
1597 */
1598 public final long awaitNanos(long nanosTimeout)
1599 throws InterruptedException {
1600 if (Thread.interrupted())
1601 throw new InterruptedException();
1602 // We don't check for nanosTimeout <= 0L here, to allow
1603 // awaitNanos(0) as a way to "yield the lock".
1604 final long deadline = System.nanoTime() + nanosTimeout;
1605 long initialNanos = nanosTimeout;
1606 Node node = addConditionWaiter();
1607 long savedState = fullyRelease(node);
1608 int interruptMode = 0;
1609 while (!isOnSyncQueue(node)) {
1610 if (nanosTimeout <= 0L) {
1611 transferAfterCancelledWait(node);
1612 break;
1613 }
1614 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1615 LockSupport.parkNanos(this, nanosTimeout);
1616 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1617 break;
1618 nanosTimeout = deadline - System.nanoTime();
1619 }
1620 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1621 interruptMode = REINTERRUPT;
1622 if (node.nextWaiter != null)
1623 unlinkCancelledWaiters();
1624 if (interruptMode != 0)
1625 reportInterruptAfterWait(interruptMode);
1626 long remaining = deadline - System.nanoTime(); // avoid overflow
1627 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
1628 }
1629
1630 /**
1631 * Implements absolute timed condition wait.
1632 * <ol>
1633 * <li>If current thread is interrupted, throw InterruptedException.
1634 * <li>Save lock state returned by {@link #getState}.
1635 * <li>Invoke {@link #release} with saved state as argument,
1636 * throwing IllegalMonitorStateException if it fails.
1637 * <li>Block until signalled, interrupted, or timed out.
1638 * <li>Reacquire by invoking specialized version of
1639 * {@link #acquire} with saved state as argument.
1640 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1641 * <li>If timed out while blocked in step 4, return false, else true.
1642 * </ol>
1643 */
1644 public final boolean awaitUntil(Date deadline)
1645 throws InterruptedException {
1646 long abstime = deadline.getTime();
1647 if (Thread.interrupted())
1648 throw new InterruptedException();
1649 Node node = addConditionWaiter();
1650 long savedState = fullyRelease(node);
1651 boolean timedout = false;
1652 int interruptMode = 0;
1653 while (!isOnSyncQueue(node)) {
1654 if (System.currentTimeMillis() >= abstime) {
1655 timedout = transferAfterCancelledWait(node);
1656 break;
1657 }
1658 LockSupport.parkUntil(this, abstime);
1659 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1660 break;
1661 }
1662 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1663 interruptMode = REINTERRUPT;
1664 if (node.nextWaiter != null)
1665 unlinkCancelledWaiters();
1666 if (interruptMode != 0)
1667 reportInterruptAfterWait(interruptMode);
1668 return !timedout;
1669 }
1670
1671 /**
1672 * Implements timed condition wait.
1673 * <ol>
1674 * <li>If current thread is interrupted, throw InterruptedException.
1675 * <li>Save lock state returned by {@link #getState}.
1676 * <li>Invoke {@link #release} with saved state as argument,
1677 * throwing IllegalMonitorStateException if it fails.
1678 * <li>Block until signalled, interrupted, or timed out.
1679 * <li>Reacquire by invoking specialized version of
1680 * {@link #acquire} with saved state as argument.
1681 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1682 * <li>If timed out while blocked in step 4, return false, else true.
1683 * </ol>
1684 */
1685 public final boolean await(long time, TimeUnit unit)
1686 throws InterruptedException {
1687 long nanosTimeout = unit.toNanos(time);
1688 if (Thread.interrupted())
1689 throw new InterruptedException();
1690 // We don't check for nanosTimeout <= 0L here, to allow
1691 // await(0, unit) as a way to "yield the lock".
1692 final long deadline = System.nanoTime() + nanosTimeout;
1693 Node node = addConditionWaiter();
1694 long savedState = fullyRelease(node);
1695 boolean timedout = false;
1696 int interruptMode = 0;
1697 while (!isOnSyncQueue(node)) {
1698 if (nanosTimeout <= 0L) {
1699 timedout = transferAfterCancelledWait(node);
1700 break;
1701 }
1702 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1703 LockSupport.parkNanos(this, nanosTimeout);
1704 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1705 break;
1706 nanosTimeout = deadline - System.nanoTime();
1707 }
1708 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1709 interruptMode = REINTERRUPT;
1710 if (node.nextWaiter != null)
1711 unlinkCancelledWaiters();
1712 if (interruptMode != 0)
1713 reportInterruptAfterWait(interruptMode);
1714 return !timedout;
1715 }
1716
1717 // support for instrumentation
1718
1719 /**
1720 * Returns true if this condition was created by the given
1721 * synchronization object.
1722 *
1723 * @return {@code true} if owned
1724 */
1725 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1726 return sync == AbstractQueuedLongSynchronizer.this;
1727 }
1728
1729 /**
1730 * Queries whether any threads are waiting on this condition.
1731 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}.
1732 *
1733 * @return {@code true} if there are any waiting threads
1734 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1735 * returns {@code false}
1736 */
1737 protected final boolean hasWaiters() {
1738 if (!isHeldExclusively())
1739 throw new IllegalMonitorStateException();
1740 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1741 if (w.waitStatus == Node.CONDITION)
1742 return true;
1743 }
1744 return false;
1745 }
1746
1747 /**
1748 * Returns an estimate of the number of threads waiting on
1749 * this condition.
1750 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}.
1751 *
1752 * @return the estimated number of waiting threads
1753 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1754 * returns {@code false}
1755 */
1756 protected final int getWaitQueueLength() {
1757 if (!isHeldExclusively())
1758 throw new IllegalMonitorStateException();
1759 int n = 0;
1760 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1761 if (w.waitStatus == Node.CONDITION)
1762 ++n;
1763 }
1764 return n;
1765 }
1766
1767 /**
1768 * Returns a collection containing those threads that may be
1769 * waiting on this Condition.
1770 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1771 *
1772 * @return the collection of threads
1773 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1774 * returns {@code false}
1775 */
1776 protected final Collection<Thread> getWaitingThreads() {
1777 if (!isHeldExclusively())
1778 throw new IllegalMonitorStateException();
1779 ArrayList<Thread> list = new ArrayList<>();
1780 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1781 if (w.waitStatus == Node.CONDITION) {
1782 Thread t = w.thread;
1783 if (t != null)
1784 list.add(t);
1785 }
1786 }
1787 return list;
1788 }
1789 }
1790
1791 /**
1792 * Setup to support compareAndSet. We need to natively implement
1793 * this here: For the sake of permitting future enhancements, we
1794 * cannot explicitly subclass AtomicLong, which would be
1795 * efficient and useful otherwise. So, as the lesser of evils, we
1796 * natively implement using hotspot intrinsics API. And while we
1797 * are at it, we do the same for other CASable fields (which could
1798 * otherwise be done with atomic field updaters).
1799 */
1800 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1801 private static final long STATE;
1802 private static final long HEAD;
1803 private static final long TAIL;
1804
1805 static {
1806 try {
1807 STATE = U.objectFieldOffset
1808 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state"));
1809 HEAD = U.objectFieldOffset
1810 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head"));
1811 TAIL = U.objectFieldOffset
1812 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail"));
1813 } catch (ReflectiveOperationException e) {
1814 throw new Error(e);
1815 }
1816
1817 // Reduce the risk of rare disastrous classloading in first call to
1818 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1819 Class<?> ensureLoaded = LockSupport.class;
1820 }
1821
1822 /**
1823 * Initializes head and tail fields on first contention.
1824 */
1825 private final void initializeSyncQueue() {
1826 Node h;
1827 if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
1828 tail = h;
1829 }
1830
1831 /**
1832 * CASes tail field.
1833 */
1834 private final boolean compareAndSetTail(Node expect, Node update) {
1835 return U.compareAndSwapObject(this, TAIL, expect, update);
1836 }
1837 }