--- jsr166/src/jsr166e/StampedLock.java 2013/01/14 19:00:01 1.27 +++ jsr166/src/jsr166e/StampedLock.java 2013/01/22 15:42:28 1.28 @@ -8,6 +8,7 @@ package jsr166e; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.*; /** * A capability-based lock with three modes for controlling read/write @@ -80,10 +81,18 @@ import java.util.concurrent.TimeUnit; * locking. * *

The scheduling policy of StampedLock does not consistently - * prefer readers over writers or vice versa. A zero return from any - * "try" method for acquiring or converting locks does not carry any - * information about the state of the lock; a subsequent invocation - * may succeed. + * prefer readers over writers or vice versa. All "try" methods are + * best-effort and do not necessarily conform to any scheduling or + * fairness policy. A zero return from any "try" method for acquiring + * or converting locks does not carry any information about the state + * of the lock; a subsequent invocation may succeed. + * + *

Because it supports coordinated usage across multiple lock + * modes, this class does not directly implement the {@link Lock} or + * {@link ReadWriteLock} interfaces. However, a StampedLock may be + * viewed {@link #asReadLock()}, {@link #asWriteLock()}, or {@link + * #asReadWriteLock()} in applications requiring only the associated + * set of functionality. * *

Sample Usage. The following illustrates some usage idioms * in a class that maintains simple two-dimensional points. The sample @@ -173,10 +182,8 @@ public class StampedLock implements java * http://www.lameter.com/gelato2005.pdf * and elsewhere; see * Boehm's http://www.hpl.hp.com/techreports/2012/HPL-2012-68.html) - * Ordered RW locks (see Shirako et al + * and Ordered RW locks (see Shirako et al * http://dl.acm.org/citation.cfm?id=2312015) - * and Phase-Fair locks (see Brandenburg & Anderson, especially - * http://www.cs.unc.edu/~bbb/diss/). * * Conceptually, the primary state of the lock includes a sequence * number that is odd when write-locked and even otherwise. @@ -189,50 +196,41 @@ public class StampedLock implements java * reader count value (RBITS) as a spinlock protecting overflow * updates. * - * Waiting readers and writers use different queues. The writer - * queue is a modified form of CLH lock. (For discussion of CLH, - * see the internal documentation of AbstractQueuedSynchronizer.) - * The reader "queue" is a form of Treiber stack, that supports - * simpler/faster operations because order within a queue doesn't - * matter and all are signalled at once. However the sequence of - * threads within the queue vs the current stamp does matter (see - * Shirako et al) so each carries its incoming stamp value. - * Waiting writers never need to track sequence values, so they - * don't. - * - * These queue mechanics hardwire the scheduling policy. Ignoring - * trylocks, cancellation, and spinning, they implement Phase-Fair - * preferences: - * 1. Unlocked writers prefer to signal waiting readers - * 2. Fully unlocked readers prefer to signal waiting writers - * 3. When read-locked and a waiting writer exists, the writer - * is preferred to incoming readers + * Waiters use a modified form of CLH lock used in + * AbstractQueuedSynchronizer (see its internal documentation for + * a fuller account), where each node it tagged (field mode) as + * either a reader or writer. Sets of waiting readers are grouped + * (linked) under a common node (field cowait) so act as a single + * node with respect to most CLH mechanics. By virtue of its + * structure, wait nodes need not actually carry sequence numbers; + * we know each is >= its predecessor. These queue mechanics + * simplify the scheduling policy to a mainly-FIFO scheme that + * incorporates elements of Phase-Fair locks (see Brandenburg & + * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In + * particular, we use the phase-fair anti-barging rule: If an + * incoming reader arrives while read lock is held but there is a + * queued writer, this incoming reader is queued. (This rule is + * responsible for some of the complexity of method acquireRead, + * but without it, the lock becomes highly unfair.) * * These rules apply to threads actually queued. All tryLock forms * opportunistically try to acquire locks regardless of preference - * rules, and so may "barge" their way in. Additionally, initial - * phases of the await* methods (invoked from readLock() and - * writeLock()) use controlled spins that have similar effect. - * Phase-fair preferences may also be broken on cancellations due - * to timeouts and interrupts. Rule #3 (incoming readers when a - * waiting writer) is approximated with varying precision in - * different contexts -- some checks do not account for - * in-progress spins/signals, and others do not account for - * cancellations. - * - * Controlled, randomized spinning is used in the two await - * methods to reduce (increasingly expensive) context switching - * while also avoiding sustained memory thrashing among many - * threads. Both await methods use a similar spin strategy: If - * the associated queue appears to be empty, then the thread - * spin-waits up to SPINS times (where each iteration decreases - * spin count with 50% probability) before enqueuing, and then, if - * it is the first thread to be enqueued, spins again up to SPINS - * times before blocking. If, upon wakening it fails to obtain - * lock, and is still (or becomes) the first waiting thread (which - * indicates that some other thread barged and obtained lock), it - * escalates spins (up to MAX_HEAD_SPINS) to reduce the likelihood - * of continually losing to barging threads. + * rules, and so may "barge" their way in. Randomized spinning is + * used in the acquire methods to reduce (increasingly expensive) + * context switching while also avoiding sustained memory + * thrashing among many threads. We limit spins to the head of + * queue. A thread spin-waits up to SPINS times (where each + * iteration decreases spin count with 50% probability) before + * blocking. If, upon wakening it fails to obtain lock, and is + * still (or becomes) the first waiting thread (which indicates + * that some other thread barged and obtained lock), it escalates + * spins (up to MAX_HEAD_SPINS) to reduce the likelihood of + * continually losing to barging threads. + * + * Nearly all of these mechanics are carried out in methods + * acquireWrite and acquireRead, that, as typical of such code, + * sprawl out because actions and retries rely on consitent sets + * of locally cahced reads. * * As noted in Boehm's paper (above), sequence validation (mainly * method validate()) requires stricter ordering rules than apply @@ -258,10 +256,10 @@ public class StampedLock implements java private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** Maximum number of retries before blocking on acquisition */ - private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1; + private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; /** Maximum number of retries before re-blocking */ - private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 12 : 1; + private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 12 : 0; /** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 @@ -280,37 +278,39 @@ public class StampedLock implements java // Initial value for lock state; avoid failure value zero private static final long ORIGIN = WBIT << 1; - // Special value from cancelled await methods so caller can throw IE + // Special value from cancelled acquire methods so caller can throw IE private static final long INTERRUPTED = 1L; - // Values for writer status; order matters + // Values for node status; order matters private static final int WAITING = -1; private static final int CANCELLED = 1; - /** Wait nodes for readers */ - static final class RNode { - final long seq; // stamp value upon enqueue - volatile Thread waiter; // null if no longer waiting - volatile RNode next; - RNode(long s, Thread w) { seq = s; waiter = w; } - } + // Modes for nodes (int not boolean to allow arithmetic) + private static final int RMODE = 0; + private static final int WMODE = 1; - /** Wait nodes for writers */ + /** Wait nodes */ static final class WNode { - volatile int status; // 0, WAITING, or CANCELLED volatile WNode prev; volatile WNode next; - volatile Thread thread; - WNode(Thread t, WNode p) { thread = t; prev = p; } + volatile WNode cowait; // list of linked readers + volatile Thread thread; // non-null while possibly parked + volatile int status; // 0, WAITING, or CANCELLED + final int mode; // RMODE or WMODE + WNode(int m, WNode p) { mode = m; prev = p; } } - /** Head of writer CLH queue */ + /** Head of CLH queue */ private transient volatile WNode whead; - /** Tail (last) of writer CLH queue */ + /** Tail (last) of CLH queue */ private transient volatile WNode wtail; - /** Head of read queue */ - private transient volatile RNode rhead; - /** The state of the lock -- high bits hold sequence, low bits read count */ + + // views + transient ReadLockView readLockView; + transient WriteLockView writeLockView; + transient ReadWriteLockView readWriteLockView; + + /** Lock sequence/state */ private transient volatile long state; /** extra reader count when state read count saturated */ private transient int readerOverflow; @@ -329,11 +329,10 @@ public class StampedLock implements java * @return a stamp that can be used to unlock or convert mode */ public long writeLock() { - long s, next; - if (((s = state) & ABITS) == 0L && - U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) - return next; - return awaitWrite(false, 0L); + long s, next; // bypass acquireWrite in fully onlocked case only + return ((((s = state) & ABITS) == 0L && + U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? + next : acquireWrite(false, 0L)); } /** @@ -344,15 +343,16 @@ public class StampedLock implements java */ public long tryWriteLock() { long s, next; - if (((s = state) & ABITS) == 0L && - U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) - return next; - return 0L; + return ((((s = state) & ABITS) == 0L && + U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? + next : 0L); } /** * Exclusively acquires the lock if it is available within the * given time and the current thread has not been interrupted. + * Behavior under timeout and interruption matches that specified + * for method {@link Lock#tryLock(long,TimeUnit)}. * * @return a stamp that can be used to unlock or convert mode, * or zero if the lock is not available @@ -363,15 +363,14 @@ public class StampedLock implements java throws InterruptedException { long nanos = unit.toNanos(time); if (!Thread.interrupted()) { - long s, next, deadline; - if (((s = state) & ABITS) == 0L && - U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) + long next, deadline; + if ((next = tryWriteLock()) != 0) return next; if (nanos <= 0L) return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; - if ((next = awaitWrite(true, deadline)) != INTERRUPTED) + if ((next = acquireWrite(true, deadline)) != INTERRUPTED) return next; } throw new InterruptedException(); @@ -380,20 +379,18 @@ public class StampedLock implements java /** * Exclusively acquires the lock, blocking if necessary * until available or the current thread is interrupted. + * Behavior under interruption matches that specified + * for method {@link Lock#lockInterruptibly()}. * * @return a stamp that can be used to unlock or convert mode * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ public long writeLockInterruptibly() throws InterruptedException { - if (!Thread.interrupted()) { - long s, next; - if (((s = state) & ABITS) == 0L && - U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) - return next; - if ((next = awaitWrite(true, 0L)) != INTERRUPTED) - return next; - } + long next; + if (!Thread.interrupted() && + (next = acquireWrite(true, 0L)) != INTERRUPTED) + return next; throw new InterruptedException(); } @@ -404,20 +401,10 @@ public class StampedLock implements java * @return a stamp that can be used to unlock or convert mode */ public long readLock() { - for (;;) { - long s, m, next; - if ((m = (s = state) & ABITS) == 0L || - (m < WBIT && whead == wtail)) { - if (m < RFULL) { - if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } - else - return awaitRead(s, false, 0L); - } + long s, next; // bypass acquireRead on fully onlocked case only + return ((((s = state) & ABITS) == 0L && + U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? + next : acquireRead(false, 0L)); } /** @@ -443,6 +430,8 @@ public class StampedLock implements java /** * Non-exclusively acquires the lock if it is available within the * given time and the current thread has not been interrupted. + * Behavior under timeout and interruption matches that specified + * for method {@link Lock#tryLock(long,TimeUnit)}. * * @return a stamp that can be used to unlock or convert mode, * or zero if the lock is not available @@ -451,27 +440,17 @@ public class StampedLock implements java */ public long tryReadLock(long time, TimeUnit unit) throws InterruptedException { + long next, deadline; long nanos = unit.toNanos(time); if (!Thread.interrupted()) { - for (;;) { - long s, m, next, deadline; - if ((m = (s = state) & ABITS) == WBIT || - (m != 0L && whead != wtail)) { - if (nanos <= 0L) - return 0L; - if ((deadline = System.nanoTime() + nanos) == 0L) - deadline = 1L; - if ((next = awaitRead(s, true, deadline)) != INTERRUPTED) - return next; - break; - } - else if (m < RFULL) { - if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } + if ((next = tryReadLock()) != 0) + return next; + if (nanos <= 0L) + return 0L; + if ((deadline = System.nanoTime() + nanos) == 0L) + deadline = 1L; + if ((next = acquireRead(true, deadline)) != INTERRUPTED) + return next; } throw new InterruptedException(); } @@ -479,29 +458,18 @@ public class StampedLock implements java /** * Non-exclusively acquires the lock, blocking if necessary * until available or the current thread is interrupted. + * Behavior under interruption matches that specified + * for method {@link Lock#lockInterruptibly()}. * * @return a stamp that can be used to unlock or convert mode * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ public long readLockInterruptibly() throws InterruptedException { - if (!Thread.interrupted()) { - for (;;) { - long s, next, m; - if ((m = (s = state) & ABITS) == WBIT || - (m != 0L && whead != wtail)) { - if ((next = awaitRead(s, true, 0L)) != INTERRUPTED) - return next; - break; - } - else if (m < RFULL) { - if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } - } + long next; + if (!Thread.interrupted() && + (next = acquireRead(true, 0L)) != INTERRUPTED) + return next; throw new InterruptedException(); } @@ -539,10 +507,12 @@ public class StampedLock implements java * not match the current state of this lock */ public void unlockWrite(long stamp) { + WNode h; if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); state = (stamp += WBIT) == 0L ? ORIGIN : stamp; - readerPrefSignal(); + if ((h = whead) != null && h.status != 0) + release(h); } /** @@ -554,15 +524,15 @@ public class StampedLock implements java * not match the current state of this lock */ public void unlockRead(long stamp) { - long s, m; + long s, m; WNode h; if ((stamp & RBITS) != 0L) { while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) break; else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { - if (m == RUNIT) - writerPrefSignal(); + if (m == RUNIT && (h = whead) != null && h.status != 0) + release(h); return; } } @@ -584,7 +554,7 @@ public class StampedLock implements java * not match the current state of this lock */ public void unlock(long stamp) { - long a = stamp & ABITS, m, s; + long a = stamp & ABITS, m, s; WNode h; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) break; @@ -592,15 +562,16 @@ public class StampedLock implements java if (a != m) break; state = (s += WBIT) == 0L ? ORIGIN : s; - readerPrefSignal(); + if ((h = whead) != null && h.status != 0) + release(h); return; } else if (a == 0L || a >= WBIT) break; else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { - if (m == RUNIT) - writerPrefSignal(); + if (m == RUNIT && (h = whead) != null && h.status != 0) + release(h); return; } } @@ -611,7 +582,7 @@ public class StampedLock implements java } /** - * If the lock state matches the given stamp then performs one of + * If the lock state matches the given stamp, performs one of * the following actions. If the stamp represents holding a write * lock, returns it. Or, if a read lock, if the write lock is * available, releases the read lock and returns a write stamp. @@ -648,7 +619,7 @@ public class StampedLock implements java } /** - * If the lock state matches the given stamp then performs one of + * If the lock state matches the given stamp, performs one of * the following actions. If the stamp represents holding a write * lock, releases it and obtains a read lock. Or, if a read lock, * returns it. Or, if an optimistic read, acquires a read lock and @@ -659,7 +630,7 @@ public class StampedLock implements java * @return a valid read stamp, or zero on failure */ public long tryConvertToReadLock(long stamp) { - long a = stamp & ABITS, m, s, next; + long a = stamp & ABITS, m, s, next; WNode h; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) { if (a != 0L) @@ -675,7 +646,8 @@ public class StampedLock implements java if (a != m) break; state = next = s + (WBIT + RUNIT); - readerPrefSignal(); + if ((h = whead) != null && h.status != 0) + release(h); return next; } else if (a != 0L && a < WBIT) @@ -697,9 +669,11 @@ public class StampedLock implements java * @return a valid optimistic read stamp, or zero on failure */ public long tryConvertToOptimisticRead(long stamp) { - long a = stamp & ABITS, m, s, next; - while (((s = U.getLongVolatile(this, STATE)) & - SBITS) == (stamp & SBITS)) { + long a = stamp & ABITS, m, s, next; WNode h; + for (;;) { + s = U.getLongVolatile(this, STATE); // see above + if ((s & SBITS) != (stamp & SBITS)) + break; if ((m = s & ABITS) == 0L) { if (a != 0L) break; @@ -709,15 +683,16 @@ public class StampedLock implements java if (a != m) break; state = next = (s += WBIT) == 0L ? ORIGIN : s; - readerPrefSignal(); + if ((h = whead) != null && h.status != 0) + release(h); return next; } else if (a == 0L || a >= WBIT) break; else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) { - if (m == RUNIT) - writerPrefSignal(); + if (m == RUNIT && (h = whead) != null && h.status != 0) + release(h); return next & SBITS; } } @@ -735,10 +710,11 @@ public class StampedLock implements java * @return true if the lock was held, else false */ public boolean tryUnlockWrite() { - long s; + long s; WNode h; if (((s = state) & WBIT) != 0L) { state = (s += WBIT) == 0L ? ORIGIN : s; - readerPrefSignal(); + if ((h = whead) != null && h.status != 0) + release(h); return true; } return false; @@ -752,12 +728,12 @@ public class StampedLock implements java * @return true if the read lock was held, else false */ public boolean tryUnlockRead() { - long s, m; + long s, m; WNode h; while ((m = (s = state) & ABITS) != 0L && m < WBIT) { if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { - if (m == RUNIT) - writerPrefSignal(); + if (m == RUNIT && (h = whead) != null && h.status != 0) + release(h); return true; } } @@ -791,6 +767,92 @@ public class StampedLock implements java state = ORIGIN; // reset to unlocked state } + /** + * Returns a plain {@link Lock} view of this StampedLock in which + * the {@link Lock#lock} method is mapped to {@link #readLock}, + * and similarly for other methods. The returned Lock does not + * support a {@link Condition}; method {@link + * Lock#newCondition()} throws {@code + * UnsupportedOperationException}. + * + * @return the lock + */ + public Lock asReadLock() { + ReadLockView v; + return ((v = readLockView) != null ? v : + (readLockView = new ReadLockView())); + } + + /** + * Returns a plain {@link Lock} view of this StampedLock in which + * the {@link Lock#lock} method is mapped to {@link #writeLock}, + * and similarly for other methods. The returned Lock does not + * support a {@link Condition}; method {@link + * Lock#newCondition()} throws {@code + * UnsupportedOperationException}. + * + * @return the lock + */ + public Lock asWriteLock() { + WriteLockView v; + return ((v = writeLockView) != null ? v : + (writeLockView = new WriteLockView())); + } + + /** + * Returns a {@link ReadWriteLock} view of this StampedLock in + * which the {@link ReadWriteLock#readLock()} method is mapped to + * {@link #asReadLock()}, and {@link ReadWriteLock#writeLock()} to + * {@link #asWriteLock()}. + * + * @return the lock + */ + public ReadWriteLock asReadWriteLock() { + ReadWriteLockView v; + return ((v = readWriteLockView) != null ? v : + (readWriteLockView = new ReadWriteLockView())); + } + + // view classes + + final class ReadLockView implements Lock { + public void lock() { readLock(); } + public void lockInterruptibly() throws InterruptedException { + readLockInterruptibly(); + } + public boolean tryLock() { return tryReadLock() != 0L; } + public boolean tryLock(long time, TimeUnit unit) + throws InterruptedException { + return tryReadLock(time, unit) != 0L; + } + // note that we give up ability to check mode so just use current state + public void unlock() { unlockRead(state); } + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + + final class WriteLockView implements Lock { + public void lock() { writeLock(); } + public void lockInterruptibly() throws InterruptedException { + writeLockInterruptibly(); + } + public boolean tryLock() { return tryWriteLock() != 0L; } + public boolean tryLock(long time, TimeUnit unit) + throws InterruptedException { + return tryWriteLock(time, unit) != 0L; + } + public void unlock() { unlockWrite(state); } + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + + final class ReadWriteLockView implements ReadWriteLock { + public Lock readLock() { return asReadLock(); } + public Lock writeLock() { return asWriteLock(); } + } + // internals /** @@ -842,354 +904,338 @@ public class StampedLock implements java } /* - * The two versions of signal implement the phase-fair policy. - * They include almost the same code, but repacked in different - * ways. Integrating the policy with the mechanics eliminates - * state rechecks that would be needed with separate reader and - * writer signal methods. Both methods assume that they are - * called when the lock is last known to be available, and - * continue until the lock is unavailable, or at least one thread - * is signalled, or there are no more waiting threads. Signalling - * a reader entails popping (CASing) from rhead and unparking - * unless the thread already cancelled (indicated by a null waiter - * field). Signalling a writer requires finding the first node, - * i.e., the successor of whead. This is normally just head.next, - * but may require traversal from wtail if next pointers are - * lagging. These methods may fail to wake up an acquiring thread - * when one or more have been cancelled, but the cancel methods - * themselves provide extra safeguards to ensure liveness. - */ - - private void readerPrefSignal() { - boolean readers = false; - RNode p; WNode h, q; long s; Thread w; - while ((p = rhead) != null) { - if (((s = state) & WBIT) != 0L) - return; - if (p.seq == (s & SBITS)) - break; - readers = true; - if (U.compareAndSwapObject(this, RHEAD, p, p.next) && - (w = p.waiter) != null && - U.compareAndSwapObject(p, WAITER, w, null)) - U.unpark(w); - } - if (!readers && (h = whead) != null && h.status != 0 && - (state & ABITS) == 0L) { - U.compareAndSwapInt(h, STATUS, WAITING, 0); + * Wakes up the successor of h (normally whead). This is normally + * just h.next, but may require traversal from wtail if next + * pointers are lagging. This may fail to wake up an acquiring + * thread when one or more have been cancelled, but the cancel + * methods themselves provide extra safeguards to ensure liveness. + */ + private void release(WNode h) { + if (h != null) { + WNode q; Thread w; + U.compareAndSwapInt(h, WSTATUS, WAITING, 0); if ((q = h.next) == null || q.status == CANCELLED) { for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } - if (q != null && (w = q.thread) != null) - U.unpark(w); - } - } - - private void writerPrefSignal() { - RNode p; WNode h, q; long s; Thread w; - if ((h = whead) != null && h.status != 0) { - U.compareAndSwapInt(h, STATUS, WAITING, 0); - if ((q = h.next) == null || q.status == CANCELLED) { - for (WNode t = wtail; t != null && t != h; t = t.prev) - if (t.status <= 0) - q = t; - } - if (q != null && (w = q.thread) != null) - U.unpark(w); - } - else { - while ((p = rhead) != null && ((s = state) & WBIT) == 0L && - p.seq != (s & SBITS)) { - if (U.compareAndSwapObject(this, RHEAD, p, p.next) && - (w = p.waiter) != null && - U.compareAndSwapObject(p, WAITER, w, null)) - U.unpark(w); + if (q != null) { + for (WNode r = q;;) { // release co-waiters too + if ((w = r.thread) != null) { + r.thread = null; + U.unpark(w); + } + if ((r = q.cowait) == null) + break; + U.compareAndSwapObject(q, WCOWAIT, r, r.cowait); + } } } } /** - * RNG for local spins. The first call from await{Read,Write} - * produces a thread-local value. Unless zero, subsequent calls - * use an xorShift to further reduce memory traffic. - */ - private static int nextRandom(int r) { - if (r == 0) - return ThreadLocalRandom.current().nextInt(); - r ^= r << 1; // xorshift - r ^= r >>> 3; - r ^= r << 10; - return r; - } - - /** - * Possibly spins trying to obtain write lock, then enqueues and - * blocks while not head of write queue or cannot acquire lock, - * possibly spinning when at head; cancelling on timeout or - * interrupt. + * See above for explanation. * * @param interruptible true if should check interrupts and if so * return INTERRUPTED * @param deadline if nonzero, the System.nanoTime value to timeout * at (and return zero) + * @return next state, or INTERRUPTED */ - private long awaitWrite(boolean interruptible, long deadline) { - WNode node = null; - for (int r = 0, spins = -1;;) { - WNode p; long s, next; + private long acquireWrite(boolean interruptible, long deadline) { + WNode node = null, p; + for (int spins = -1;;) { // spin while enqueuing + long s, ns; if (((s = state) & ABITS) == 0L) { - if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) - return next; + if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) + return ns; } - else if (spins < 0) - spins = whead == wtail ? SPINS : 0; else if (spins > 0) { - if ((r = nextRandom(r)) >= 0) + if (ThreadLocalRandom.current().nextInt() >= 0) --spins; } else if ((p = wtail) == null) { // initialize queue - if (U.compareAndSwapObject(this, WHEAD, null, - new WNode(null, null))) - wtail = whead; + WNode h = new WNode(WMODE, null); + if (U.compareAndSwapObject(this, WHEAD, null, h)) + wtail = h; } + else if (spins < 0) + spins = (p == whead) ? SPINS : 0; else if (node == null) - node = new WNode(Thread.currentThread(), p); + node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; - for (int headSpins = SPINS;;) { - WNode np, pp; int ps; - while ((np = node.prev) != p && np != null) - (p = np).next = node; // stale - if (p == whead) { - for (int k = headSpins;;) { - if (((s = state) & ABITS) == 0L) { - if (U.compareAndSwapLong(this, STATE, - s, next = s + WBIT)) { - whead = node; - node.thread = null; - node.prev = null; - return next; - } - break; + break; + } + } + + for (int spins = SPINS;;) { + WNode np, pp; int ps; long s, ns; Thread w; + while ((np = node.prev) != p && np != null) + (p = np).next = node; // stale + if (whead == p) { + for (int k = spins;;) { // spin at head + if (((s = state) & ABITS) == 0L && + U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { + whead = node; + node.prev = null; + return ns; + } + else if (ThreadLocalRandom.current().nextInt() >= 0 && + --k <= 0) + break; + } + if (spins < MAX_HEAD_SPINS) + spins <<= 1; + } + if ((ps = p.status) == 0) + U.compareAndSwapInt(p, WSTATUS, 0, WAITING); + else if (ps == CANCELLED) { + if ((pp = p.prev) != null) { + node.prev = pp; + pp.next = node; + } + } + else { + long time; // 0 argument to park means no timeout + if (deadline == 0L) + time = 0L; + else if ((time = deadline - System.nanoTime()) <= 0L) + return cancelWaiter(node, null, false); + node.thread = Thread.currentThread(); + if (node.prev == p && p.status == WAITING && // recheck + (p != whead || (state & ABITS) != 0L)) { + U.park(false, time); + if (interruptible && Thread.interrupted()) + return cancelWaiter(node, null, true); + } + node.thread = null; + } + } + } + + /** + * See above for explanation. + * + * @param interruptible true if should check interrupts and if so + * return INTERRUPTED + * @param deadline if nonzero, the System.nanoTime value to timeout + * at (and return zero) + * @return next state, or INTERRUPTED + */ + private long acquireRead(boolean interruptible, long deadline) { + WNode node = null, group = null, p; + for (int spins = -1;;) { + for (;;) { + long s, m, ns; WNode h, q; Thread w; // anti-barging guard + if (group == null && (h = whead) != null && + (q = h.next) != null && q.mode != RMODE) + break; + if ((m = (s = state) & ABITS) == WBIT) + break; + if (m < RFULL ? + U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : + (ns = tryIncReaderOverflow(s)) != 0L) { + if (group != null) { // help release others + for (WNode r = group;;) { + if ((w = r.thread) != null) { + r.thread = null; + U.unpark(w); } - if ((r = nextRandom(r)) >= 0 && --k <= 0) + if ((r = group.cowait) == null) break; + U.compareAndSwapObject(group, WCOWAIT, r, r.cowait); } - if (headSpins < MAX_HEAD_SPINS) - headSpins <<= 1; } - if ((ps = p.status) == 0) - U.compareAndSwapInt(p, STATUS, 0, WAITING); - else if (ps == CANCELLED) { - if ((pp = p.prev) != null) { - node.prev = pp; - pp.next = node; - } - } - else { - long time; // 0 argument to park means no timeout + return ns; + } + } + if (spins > 0) { + if (ThreadLocalRandom.current().nextInt() >= 0) + --spins; + } + else if ((p = wtail) == null) { + WNode h = new WNode(WMODE, null); + if (U.compareAndSwapObject(this, WHEAD, null, h)) + wtail = h; + } + else if (spins < 0) + spins = (p == whead) ? SPINS : 0; + else if (node == null) + node = new WNode(WMODE, p); + else if (node.prev != p) + node.prev = p; + else if (p.mode == RMODE && p != whead) { + WNode pp = p.prev; // become co-waiter with group p + if (pp != null && p == wtail && + U.compareAndSwapObject(p, WCOWAIT, + node.cowait = p.cowait, node)) { + node.thread = Thread.currentThread(); + for (long time;;) { if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) - return cancelWriter(node, false); - if (node.prev == p && p.status == WAITING && - (p != whead || (state & ABITS) != 0L)) // recheck - U.park(false, time); + return cancelWaiter(node, p, false); + if (node.thread == null) + break; + if (p.prev != pp || p.status == CANCELLED || + p == whead || p.prev != pp) { + node.thread = null; + break; + } + if (node.thread == null) // must recheck + break; + U.park(false, time); if (interruptible && Thread.interrupted()) - return cancelWriter(node, true); + return cancelWaiter(node, p, true); } + group = p; } + node = null; // throw away + } + else if (U.compareAndSwapObject(this, WTAIL, p, node)) { + p.next = node; + break; } } - } - /** - * If node non-null, forces cancel status and unsplices from queue - * if possible. This is a variant of cancellation methods in - * AbstractQueuedSynchronizer (see its detailed explanation in AQS - * internal documentation) that more conservatively wakes up other - * threads that may have had their links changed, so as to preserve - * liveness in the main signalling methods. - */ - private long cancelWriter(WNode node, boolean interrupted) { - if (node != null) { - node.thread = null; - node.status = CANCELLED; - for (WNode pred = node.prev; pred != null; ) { - WNode succ, pp; Thread w; - while ((succ = node.next) == null || succ.status == CANCELLED) { - WNode q = null; - for (WNode t = wtail; t != null && t != node; t = t.prev) - if (t.status != CANCELLED) - q = t; - if (succ == q || - U.compareAndSwapObject(node, WNEXT, succ, succ = q)) { - if (succ == null && node == wtail) - U.compareAndSwapObject(this, WTAIL, node, pred); - break; + for (int spins = SPINS;;) { + WNode np, pp, r; int ps; long m, s, ns; Thread w; + while ((np = node.prev) != p && np != null) + (p = np).next = node; + if (whead == p) { + for (int k = spins;;) { + if ((m = (s = state) & ABITS) != WBIT) { + if (m < RFULL ? + U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT): + (ns = tryIncReaderOverflow(s)) != 0L) { + whead = node; + node.prev = null; + while ((r = node.cowait) != null) { + if (U.compareAndSwapObject(node, WCOWAIT, + r, r.cowait) && + (w = r.thread) != null) { + r.thread = null; + U.unpark(w); // release co-waiter + } + } + return ns; + } } + else if (ThreadLocalRandom.current().nextInt() >= 0 && + --k <= 0) + break; } - if (pred.next == node) - U.compareAndSwapObject(pred, WNEXT, node, succ); - if (succ != null && (w = succ.thread) != null) - U.unpark(w); - if (pred.status != CANCELLED || (pp = pred.prev) == null) - break; - node.prev = pp; // repeat for new pred - U.compareAndSwapObject(pp, WNEXT, pred, succ); - pred = pp; + if (spins < MAX_HEAD_SPINS) + spins <<= 1; } - } - writerPrefSignal(); - return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L; - } - - /** - * Waits for read lock or timeout or interrupt. The form of - * awaitRead differs from awaitWrite mainly because it must - * restart (with a new wait node) if the thread was unqueued and - * unparked but could not the obtain lock. We also need to help - * with preference rules by not trying to acquire the lock before - * enqueuing if there is a known waiting writer, but also helping - * to release those threads that are still queued from the last - * release. - */ - private long awaitRead(long stamp, boolean interruptible, long deadline) { - long seq = stamp & SBITS; - RNode node = null; - boolean queued = false; - for (int r = 0, headSpins = SPINS, spins = -1;;) { - long s, m, next; RNode p; WNode wh; Thread w; - if ((m = (s = state) & ABITS) != WBIT && - ((s & SBITS) != seq || (wh = whead) == null || - wh.status == 0)) { - if (m < RFULL ? - U.compareAndSwapLong(this, STATE, s, next = s + RUNIT) : - (next = tryIncReaderOverflow(s)) != 0L) { - if (node != null && (w = node.waiter) != null) - U.compareAndSwapObject(node, WAITER, w, null); - if ((p = rhead) != null && (s & SBITS) != p.seq && - U.compareAndSwapObject(this, RHEAD, p, p.next) && - (w = p.waiter) != null && - U.compareAndSwapObject(p, WAITER, w, null)) - U.unpark(w); // help signal other waiters - return next; + if ((ps = p.status) == 0) + U.compareAndSwapInt(p, WSTATUS, 0, WAITING); + else if (ps == CANCELLED) { + if ((pp = p.prev) != null) { + node.prev = pp; + pp.next = node; } } - else if (m != WBIT && (p = rhead) != null && - (s & SBITS) != p.seq) { // help release old readers - if (U.compareAndSwapObject(this, RHEAD, p, p.next) && - (w = p.waiter) != null && - U.compareAndSwapObject(p, WAITER, w, null)) - U.unpark(w); - } - else if (queued && node != null && node.waiter == null) { - node = null; // restart - queued = false; - spins = -1; - } - else if (spins < 0) { - if (rhead != node) - spins = 0; - else if ((spins = headSpins) < MAX_HEAD_SPINS && node != null) - headSpins <<= 1; - } - else if (spins > 0) { - if ((r = nextRandom(r)) >= 0) - --spins; - } - else if (node == null) - node = new RNode(seq, Thread.currentThread()); - else if (!queued) { - if (queued = U.compareAndSwapObject(this, RHEAD, - node.next = rhead, node)) - spins = -1; - } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) - return cancelReader(node, false); - if ((state & WBIT) != 0L && node.waiter != null) // recheck + return cancelWaiter(node, null, false); + node.thread = Thread.currentThread(); + if (node.prev == p && p.status == WAITING && + (p != whead || (state & ABITS) != WBIT)) { U.park(false, time); - if (interruptible && Thread.interrupted()) - return cancelReader(node, true); + if (interruptible && Thread.interrupted()) + return cancelWaiter(node, null, true); + } + node.thread = null; } } } /** * If node non-null, forces cancel status and unsplices from queue - * if possible, by traversing entire queue looking for cancelled - * nodes. + * if possible. This is a variant of cancellation methods in + * AbstractQueuedSynchronizer (see its detailed explanation in AQS + * internal documentation) that more conservatively wakes up other + * threads that may have had their links changed, so as to preserve + * liveness in the main signalling methods. */ - private long cancelReader(RNode node, boolean interrupted) { - Thread w; - if (node != null && (w = node.waiter) != null && - U.compareAndSwapObject(node, WAITER, w, null)) { - for (RNode pred = null, p = rhead; p != null;) { - RNode q = p.next; - if (p.waiter == null) { - if (pred == null) { - U.compareAndSwapObject(this, RHEAD, p, q); - p = rhead; - } - else { - U.compareAndSwapObject(pred, RNEXT, p, q); - p = pred.next; + private long cancelWaiter(WNode node, WNode group, boolean interrupted) { + if (node != null) { + node.thread = null; + node.status = CANCELLED; + if (group != null) { + for (WNode p = group, q; p != null; p = q) { + if ((q = p.cowait) != null && q.status == CANCELLED) { + U.compareAndSwapObject(p, WCOWAIT, q, q.cowait); + break; } } - else { - pred = p; - p = q; + } + else { + for (WNode pred = node.prev; pred != null; ) { + WNode succ, pp; Thread w; + while ((succ = node.next) == null || + succ.status == CANCELLED) { + WNode q = null; + for (WNode t = wtail; t != null && t != node; t = t.prev) + if (t.status != CANCELLED) + q = t; + if (succ == q || + U.compareAndSwapObject(node, WNEXT, + succ, succ = q)) { + if (succ == null && node == wtail) + U.compareAndSwapObject(this, WTAIL, node, pred); + break; + } + } + if (pred.next == node) + U.compareAndSwapObject(pred, WNEXT, node, succ); + if (succ != null && (w = succ.thread) != null) + U.unpark(w); + if (pred.status != CANCELLED || (pp = pred.prev) == null) + break; + node.prev = pp; // repeat for new pred + U.compareAndSwapObject(pp, WNEXT, pred, succ); + pred = pp; } } } - readerPrefSignal(); + release(whead); return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L; } // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long STATE; - private static final long RHEAD; private static final long WHEAD; private static final long WTAIL; - private static final long RNEXT; private static final long WNEXT; - private static final long WPREV; - private static final long WAITER; - private static final long STATUS; + private static final long WSTATUS; + private static final long WCOWAIT; static { try { U = getUnsafe(); Class k = StampedLock.class; - Class rk = RNode.class; Class wk = WNode.class; STATE = U.objectFieldOffset (k.getDeclaredField("state")); - RHEAD = U.objectFieldOffset - (k.getDeclaredField("rhead")); WHEAD = U.objectFieldOffset (k.getDeclaredField("whead")); WTAIL = U.objectFieldOffset (k.getDeclaredField("wtail")); - RNEXT = U.objectFieldOffset - (rk.getDeclaredField("next")); - WAITER = U.objectFieldOffset - (rk.getDeclaredField("waiter")); - STATUS = U.objectFieldOffset + WSTATUS = U.objectFieldOffset (wk.getDeclaredField("status")); WNEXT = U.objectFieldOffset (wk.getDeclaredField("next")); - WPREV = U.objectFieldOffset - (wk.getDeclaredField("prev")); + WCOWAIT = U.objectFieldOffset + (wk.getDeclaredField("cowait")); } catch (Exception e) { throw new Error(e);