--- jsr166/src/jsr166e/StripedAdder.java 2011/07/23 16:32:53 1.4 +++ jsr166/src/jsr166e/StripedAdder.java 2011/07/29 14:23:35 1.11 @@ -5,7 +5,6 @@ */ package jsr166e; -import java.util.Arrays; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -27,9 +26,9 @@ import java.io.ObjectOutputStream; * update a common sum that is used for purposes such as collecting * statistics. In this case, performance may be significantly faster * than using a shared {@link AtomicLong}, at the expense of using - * much more space. On the other hand, if it is known that only one - * thread can ever update the sum, performance may be significantly - * slower than just updating a local variable. + * more space. On the other hand, if it is known that only one thread + * can ever update the sum, performance may be significantly slower + * than just updating a local variable. * *

A StripedAdder may optionally be constructed with a given * expected contention level; i.e., the number of threads that are @@ -44,13 +43,36 @@ public class StripedAdder implements Ser /* * A StripedAdder maintains a table of Atomic long variables. The - * table is indexed by per-thread hash codes that are initialized - * to random values. + * table is indexed by per-thread hash codes. * - * The table doubles in size upon contention (as indicated by - * failed CASes when performing add()), but is capped at the - * nearest power of two >= #CPUS. This reflects the idea that, - * when there are more threads than CPUs, then if each thread were + * Table entries are of class Adder; a variant of AtomicLong + * padded to reduce cache contention on most processors. Padding + * is overkill for most Atomics because they are usually + * irregularly scattered in memory and thus don't interfere much + * with each other. But Atomic objects residing in arrays will + * tend to be placed adjacent to each other, and so will most + * often share cache lines (with a huge negative performance + * impact) without this precaution. + * + * Because Adders are relatively large, we avoid creating them + * until they are needed. On the other hand, we try to create them + * on any sign of contention. + * + * Per-thread hash codes are initialized to random values. + * Collisions are indicated by failed CASes when performing an add + * operation (see method retryAdd). Upon a collision, if the table + * size is less than the capacity, it is doubled in size unless + * some other thread holds lock. If a hashed slot is empty, and + * lock is available, a new Adder is created. Otherwise, if the + * slot exists, a CAS is tried. Retries proceed by "double + * hashing", using a secondary hash (Marsaglia XorShift) to try to + * find a free slot. + * + * By default, the table is lazily initialized. Upon first use, + * the table is set to size 1, and contains a single Adder. The + * maximum table size is bounded by nearest power of two >= the + * number of CPUS. The table size is capped because, when there + * are more threads than CPUs, supposing that each thread were * bound to a CPU, there would exist a perfect hash function * mapping threads to slots that eliminates collisions. When we * reach capacity, we search for this mapping by randomly varying @@ -61,84 +83,72 @@ public class StripedAdder implements Ser * limitations, observed contention is typically low in these * cases. * - * Table entries are of class Adder; a form of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are most often - * irregularly scattered in memory and thus don't interfere much - * with each other. But Atomic objects residing in arrays will - * tend to be placed adjacent to each other, and so will most - * often share cache lines without this precaution. Adders are - * constructed upon first use, which further improves per-thread - * locality and helps reduce (an already large) footprint. - * * A single spinlock is used for resizing the table as well as - * populating slots with new Adders. Upon lock contention, threads + * populating slots with new Adders. After initialization, there + * is no need for a blocking lock: Upon lock contention, threads * try other slots rather than blocking. After initialization, at * least one slot exists, so retries will eventually find a - * candidate Adder. During these retries, there is increased + * candidate Adder. During these retries, there is increased * contention and reduced locality, which is still better than * alternatives. */ - /** - * Number of processors, to place a cap on table growth. - */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); + private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** - * Padded version of AtomicLong + * Padded variant of AtomicLong. The value field is placed + * between pads, hoping that the JVM doesn't reorder them. + * Updates are via inlined CAS in methods add and retryAdd. */ - static final class Adder extends AtomicLong { - long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd; - Adder(long x) { super(x); } + static final class Adder { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Adder(long x) { value = x; } } /** - * Holder for the thread-local hash code. The code starts off with - * a given random value, but may be set to a different value upon - * collisions in retryAdd. + * Holder for the thread-local hash code. The code is initially + * random, but may be set to a different value upon collisions. */ static final class HashCode { + static final Random rng = new Random(); int code; - HashCode(int h) { code = h; } + HashCode() { + int h = rng.nextInt(); // Avoid zero, because of xorShift rehash + code = (h == 0) ? 1 : h; + } } /** * The corresponding ThreadLocal class */ static final class ThreadHashCode extends ThreadLocal { - static final Random rng = new Random(); - public HashCode initialValue() { - int h = rng.nextInt(); - return new HashCode((h == 0) ? 1 : h); // ensure nonzero - } + public HashCode initialValue() { return new HashCode(); } } /** * Static per-thread hash codes. Shared across all StripedAdders - * because adjustments due to collisions in one table are likely - * to be appropriate for others. + * to reduce ThreadLocal pollution and because adjustments due to + * collisions in one table are likely to be appropriate for + * others. */ static final ThreadHashCode threadHashCode = new ThreadHashCode(); /** - * Table of adders. Minimum size 2. Size grows to be at most NCPU. + * Table of adders. When non-null, size is a power of 2. */ private transient volatile Adder[] adders; /** - * Serves as a lock when resizing and/or creating Adders. There - * is no need for a blocking lock: When busy, other threads try - * other slots. + * Spinlock (locked via CAS) used when resizing and/or creating Adders. */ - private final AtomicInteger mutex; + private volatile int busy; /** * Creates a new adder with zero sum. */ public StripedAdder() { - this.mutex = new AtomicInteger(); - // remaining initialization on first call to add. } /** @@ -150,14 +160,13 @@ public class StripedAdder implements Ser */ public StripedAdder(int expectedContention) { int cap = (expectedContention < NCPU) ? expectedContention : NCPU; - int size = 2; + int size = 1; while (size < cap) size <<= 1; Adder[] as = new Adder[size]; for (int i = 0; i < size; ++i) as[i] = new Adder(0); this.adders = as; - this.mutex = new AtomicInteger(); } /** @@ -166,60 +175,132 @@ public class StripedAdder implements Ser * @param x the value to add */ public void add(long x) { - Adder[] as; Adder a; int n; long v; // locals to hold volatile reads + Adder[] as; Adder a; int n; // locals to hold volatile reads HashCode hc = threadHashCode.get(); - if ((as = adders) == null || (n = as.length) < 1 || - (a = as[hc.code & (n - 1)]) == null || - !a.compareAndSet(v = a.get(), v + x)) - retryAdd(x, hc); + int h = hc.code; + boolean contended; + if ((as = adders) != null && (n = as.length) > 0 && + (a = as[(n - 1) & h]) != null) { + long v = a.value; + if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x)) + return; + contended = true; + } + else + contended = false; + retryAdd(x, hc, contended); } /** * Handle cases of add involving initialization, resizing, - * creating new Adders, and/or contention. + * creating new Adders, and/or contention. See above for + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. + * + * @param x the value to add + * @param hc the hash code holder + * @param precontended true if CAS failed before call */ - private void retryAdd(long x, HashCode hc) { + private void retryAdd(long x, HashCode hc, boolean precontended) { int h = hc.code; - final AtomicInteger mutex = this.mutex; - AtomicInteger lock = null; // nonnull when held - try { - for (;;) { - Adder[] as; Adder a; long v; int n, k; // locals for volatiles - boolean needLock = true; - if ((as = adders) == null || (n = as.length) < 1) { - if (lock != null) // default-initialize - adders = new Adder[2]; + boolean collide = false; // true if last slot nonempty + for (;;) { + Adder[] as; Adder a; int n; + if ((as = adders) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Adder + Adder r = new Adder(x); // Optimistically create + if (busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + boolean created = false; + try { // Recheck under lock + Adder[] rs; int m, j; + if ((rs = adders) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; } - else if ((a = as[k = h & (n - 1)]) == null) { - if (lock != null) { // attach new adder - as[k] = new Adder(x); + else if (precontended) // CAS already known to fail + precontended = false; // Continue after rehash + else { + long v = a.value; + if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x)) break; + if (!collide) + collide = true; + else if (n >= NCPU || adders != as) + collide = false; // Can't expand + else if (busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + collide = false; + try { + if (adders == as) { // Expand table + Adder[] rs = new Adder[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + adders = rs; + } + } finally { + busy = 0; + } + continue; } } - else if (a.compareAndSet(v = a.get(), v + x)) - break; - else if (n >= NCPU) // cannot expand - needLock = false; - else if (lock != null) // expand table - adders = Arrays.copyOf(as, n << 1); - - if (lock == null) { - if (needLock && mutex.get() == 0 && - mutex.compareAndSet(0, 1)) - lock = mutex; - else { // try elsewhere - h ^= h << 13; // Marsaglia XorShift - h ^= h >>> 17; - h ^= h << 5; + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; + } + else if (adders == as) { // Try to default-initialize + Adder[] rs = new Adder[1]; + rs[0] = new Adder(x); + boolean init = false; + while (adders == as) { + if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + try { + if (adders == as) { + adders = rs; + init = true; + } + } finally { + busy = 0; + } + break; } + if (adders != as) + break; + Thread.yield(); // Back off } + if (init) + break; } - } finally { - if (lock != null) - lock.set(0); } - if (hc.code != h) // avoid unneeded writes - hc.code = h; + hc.code = h; // Record index for next time + } + + /** + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); } /** @@ -237,49 +318,20 @@ public class StripedAdder implements Ser for (int i = 0; i < n; ++i) { Adder a = as[i]; if (a != null) - sum += a.get(); + sum += a.value; } } return sum; } /** - * Resets each of the variables to zero. This is effective in - * fully resetting the sum only if there are no concurrent - * updates. - */ - public void reset() { - Adder[] as = adders; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Adder a = as[i]; - if (a != null) - a.set(0L); - } - } - } - - /** - * Equivalent to {@code add(1)}. - */ - public void increment() { - add(1L); - } - - /** - * Equivalent to {@code add(-1)}. - */ - public void decrement() { - add(-1L); - } - - /** - * Equivalent to {@link #sum} followed by {@link #reset}. + * Resets each of the variables to zero, returning the estimated + * previous sum. This is effective in fully resetting the sum only + * if there are no concurrent updates. * - * @return the estimated sum + * @return the estimated previous sum */ - public long sumAndReset() { + public long reset() { long sum = 0L; Adder[] as = adders; if (as != null) { @@ -287,8 +339,8 @@ public class StripedAdder implements Ser for (int i = 0; i < n; ++i) { Adder a = as[i]; if (a != null) { - sum += a.get(); - a.set(0L); + sum += a.value; + a.value = 0L; } } } @@ -304,8 +356,54 @@ public class StripedAdder implements Ser private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); - mutex.set(0); + busy = 0; add(s.readLong()); } + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long busyOffset; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class sk = StripedAdder.class; + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + Class ak = Adder.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } + } + }