--- jsr166/src/jsr166e/StripedAdder.java 2011/07/26 17:16:36 1.6 +++ jsr166/src/jsr166e/StripedAdder.java 2011/07/28 15:05:55 1.8 @@ -5,7 +5,6 @@ */ package jsr166e; -import java.util.Arrays; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -46,11 +45,18 @@ public class StripedAdder implements Ser * A StripedAdder maintains a table of Atomic long variables. The * table is indexed by per-thread hash codes. * - * By default, the table is lazily initialized, to minimize - * footprint until adders are used. On first use, the table is set - * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is - * bounded by the number of CPUS (if larger than the default - * size). + * Table entries are of class Adder; a variant of AtomicLong + * padded to reduce cache contention on most processors. Padding + * is overkill for most Atomics because they are usually + * irregularly scattered in memory and thus don't interfere much + * with each other. But Atomic objects residing in arrays will + * tend to be placed adjacent to each other, and so will most + * often share cache lines (with a huge negative performance + * impact) without this precaution. + * + * Because Adders are relatively large, we avoid creating them + * until they are needed. On the other hand, we try to create them + * on any sign of contention. * * Per-thread hash codes are initialized to random values. * Collisions are indicated by failed CASes when performing an add @@ -62,29 +68,24 @@ public class StripedAdder implements Ser * hashing", using a secondary hash (Marsaglia XorShift) to try to * find a free slot. * - * The table size is capped because, when there are more threads - * than CPUs, supposing that each thread were bound to a CPU, - * there would exist a perfect hash function mapping threads to - * slots that eliminates collisions. When we reach capacity, we - * search for this mapping by randomly varying the hash codes of + * By default, the table is lazily initialized. Upon first use, + * the table is set to size 2 (the minimum non-empty size), but + * containing only a single Adder. The maximum table size is + * bounded by nearest power of two >= the number of CPUS. The + * table size is capped because, when there are more threads than + * CPUs, supposing that each thread were bound to a CPU, there + * would exist a perfect hash function mapping threads to slots + * that eliminates collisions. When we reach capacity, we search + * for this mapping by randomly varying the hash codes of * colliding threads. Because search is random, and failures only * become known via CAS failures, convergence will be slow, and * because threads are typically not bound to CPUS forever, may * not occur at all. However, despite these limitations, observed * contention is typically low in these cases. * - * Table entries are of class Adder; a form of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines without this precaution. Adders are by default - * constructed upon first use, which further improves per-thread - * locality and helps reduce footprint. - * * A single spinlock is used for resizing the table as well as - * populating slots with new Adders. Upon lock contention, threads + * populating slots with new Adders. After initialization, there + * is no need for a blocking lock: Upon lock contention, threads * try other slots rather than blocking. After initialization, at * least one slot exists, so retries will eventually find a * candidate Adder. During these retries, there is increased @@ -92,24 +93,19 @@ public class StripedAdder implements Ser * alternatives. */ - /** - * Padded version of AtomicLong - */ - static final class Adder extends AtomicLong { - long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - Adder(long x) { super(x); } - } - private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** - * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon - * first use under default constructor, and must be a power of - * two. There is not much point in making size a lot smaller than - * that of Adders though. CAP is the maximum allowed table size. - */ - private static final int DEFAULT_INITIAL_SIZE = 8; - private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE); + * Padded variant of AtomicLong. The value field is placed + * between pads, hoping that the JVM doesn't reorder them. + * Updates are via inlined CAS in methods add and retryAdd. + */ + static final class Adder { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Adder(long x) { value = x; } + } /** * Holder for the thread-local hash code. The code is initially @@ -119,8 +115,8 @@ public class StripedAdder implements Ser static final Random rng = new Random(); int code; HashCode() { - int h = rng.nextInt(); - code = (h == 0) ? 1 : h; // ensure nonzero + int h = rng.nextInt(); // Avoid zero, because of xorShift rehash + code = (h == 0) ? 1 : h; } } @@ -140,25 +136,19 @@ public class StripedAdder implements Ser static final ThreadHashCode threadHashCode = new ThreadHashCode(); /** - * Table of adders. Size is power of two, grows to be at most CAP. + * Table of adders. When non-null, size is a power of 2, at least 2. */ private transient volatile Adder[] adders; /** - * Serves as a lock when resizing and/or creating Adders. There - * is no need for a blocking lock: Except during initialization - * races, when busy, other threads try other slots. However, - * during (double-checked) initializations, we use the - * "synchronized" lock on this object. + * Spinlock (locked via CAS) used when resizing and/or creating Adders. */ - private final AtomicInteger mutex; + private volatile int busy; /** * Creates a new adder with zero sum. */ public StripedAdder() { - this.mutex = new AtomicInteger(); - // remaining initialization on first call to add. } /** @@ -169,15 +159,14 @@ public class StripedAdder implements Ser * will concurrently update the sum. */ public StripedAdder(int expectedContention) { - int cap = (expectedContention < CAP) ? expectedContention : CAP; - int size = 1; + int cap = (expectedContention < NCPU) ? expectedContention : NCPU; + int size = 2; while (size < cap) size <<= 1; Adder[] as = new Adder[size]; for (int i = 0; i < size; ++i) as[i] = new Adder(0); this.adders = as; - this.mutex = new AtomicInteger(); } /** @@ -186,73 +175,106 @@ public class StripedAdder implements Ser * @param x the value to add */ public void add(long x) { - Adder[] as; Adder a; int n; long v; // locals to hold volatile reads + Adder[] as; Adder a; int n; // locals to hold volatile reads HashCode hc = threadHashCode.get(); int h = hc.code; - if ((as = adders) == null || (n = as.length) < 1 || - (a = as[(n - 1) & h]) == null || - !a.compareAndSet(v = a.get(), v + x)) - retryAdd(x, hc); + boolean collide; + if ((as = adders) != null && (n = as.length) > 0 && + (a = as[(n - 1) & h]) != null) { + long v = a.value; + if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x)) + return; + collide = true; + } + else + collide = false; + retryAdd(x, hc, collide); } /** * Handle cases of add involving initialization, resizing, * creating new Adders, and/or contention. See above for - * explanation. + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. */ - private void retryAdd(long x, HashCode hc) { + private void retryAdd(long x, HashCode hc, boolean collide) { int h = hc.code; - final AtomicInteger mutex = this.mutex; - int collisions = 1 - mutex.get(); // first guess: collides if not locked for (;;) { - Adder[] as; Adder a; long v; int k, n; - while ((as = adders) == null || (n = as.length) < 1) { - synchronized(mutex) { // Try to initialize - if (adders == null) { - Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE]; - rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0); - adders = rs; + Adder[] as; Adder a; int n; + if ((as = adders) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) != null) { + boolean shared = true; // Slot exists + if (collide && n < NCPU && busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + try { + if (adders == as) { // Expand table + Adder[] rs = new Adder[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + adders = rs; + shared = false; + } + } finally { + busy = 0; + } + if (shared || (h & n) != 0) { + collide = false; + continue; // Array or index changed + } + } + long v = a.value; + if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x)) + break; + collide = shared; + } + else { // Try to attach new Adder + if (busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + boolean created = false; + try { // Recheck under lock + Adder[] rs; int m, j; + if ((rs = adders) != null && (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = new Adder(x); + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty } + collide = false; } - collisions = 0; + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; } - - if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot - if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) { + else if (busy == 0) { // Default-initialize + Adder r = new Adder(x); + Adder[] rs = new Adder[2]; + rs[h & 1] = r; + if (adders == as && busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + boolean init = false; try { - if (adders == as && as[k] == null) - a = as[k] = new Adder(x); + if (adders == as) { + adders = rs; + init = true; + } } finally { - mutex.set(0); + busy = 0; } - if (a != null) + if (init) break; } - collisions = 0; } - else if (collisions != 0 && n < CAP && // Try to expand table - mutex.get() == 0 && mutex.compareAndSet(0, 1)) { - try { - if (adders == as) { - Adder[] rs = new Adder[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - adders = rs; - } - } finally { - mutex.set(0); - } - collisions = 0; - } - else if (a.compareAndSet(v = a.get(), v + x)) - break; - else - collisions = 1; - h ^= h << 13; // Rehash - h ^= h >>> 17; - h ^= h << 5; + else if (adders == as) // Lost initialization race + Thread.yield(); } - hc.code = h; + hc.code = h; // Record index for next time } /** @@ -270,7 +292,7 @@ public class StripedAdder implements Ser for (int i = 0; i < n; ++i) { Adder a = as[i]; if (a != null) - sum += a.get(); + sum += a.value; } } return sum; @@ -288,7 +310,7 @@ public class StripedAdder implements Ser for (int i = 0; i < n; ++i) { Adder a = as[i]; if (a != null) - a.set(0L); + a.value = 0L; } } } @@ -320,8 +342,8 @@ public class StripedAdder implements Ser for (int i = 0; i < n; ++i) { Adder a = as[i]; if (a != null) { - sum += a.get(); - a.set(0L); + sum += a.value; + a.value = 0L; } } } @@ -337,8 +359,54 @@ public class StripedAdder implements Ser private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); - mutex.set(0); + busy = 0; add(s.readLong()); } + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long busyOffset; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class sk = StripedAdder.class; + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + Class ak = Adder.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } + } + }