--- jsr166/src/jsr166e/StripedAdder.java 2011/07/23 16:32:53 1.4 +++ jsr166/src/jsr166e/StripedAdder.java 2011/07/26 17:16:36 1.6 @@ -44,92 +44,112 @@ public class StripedAdder implements Ser /* * A StripedAdder maintains a table of Atomic long variables. The - * table is indexed by per-thread hash codes that are initialized - * to random values. + * table is indexed by per-thread hash codes. * - * The table doubles in size upon contention (as indicated by - * failed CASes when performing add()), but is capped at the - * nearest power of two >= #CPUS. This reflects the idea that, - * when there are more threads than CPUs, then if each thread were - * bound to a CPU, there would exist a perfect hash function - * mapping threads to slots that eliminates collisions. When we - * reach capacity, we search for this mapping by randomly varying - * the hash codes of colliding threads. Because search is random, - * and failures only become known via CAS failures, convergence - * will be slow, and because threads are typically not bound to - * CPUS forever, may not occur at all. However, despite these - * limitations, observed contention is typically low in these - * cases. + * By default, the table is lazily initialized, to minimize + * footprint until adders are used. On first use, the table is set + * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is + * bounded by the number of CPUS (if larger than the default + * size). + * + * Per-thread hash codes are initialized to random values. + * Collisions are indicated by failed CASes when performing an add + * operation (see method retryAdd). Upon a collision, if the table + * size is less than the capacity, it is doubled in size unless + * some other thread holds lock. If a hashed slot is empty, and + * lock is available, a new Adder is created. Otherwise, if the + * slot exists, a CAS is tried. Retries proceed by "double + * hashing", using a secondary hash (Marsaglia XorShift) to try to + * find a free slot. + * + * The table size is capped because, when there are more threads + * than CPUs, supposing that each thread were bound to a CPU, + * there would exist a perfect hash function mapping threads to + * slots that eliminates collisions. When we reach capacity, we + * search for this mapping by randomly varying the hash codes of + * colliding threads. Because search is random, and failures only + * become known via CAS failures, convergence will be slow, and + * because threads are typically not bound to CPUS forever, may + * not occur at all. However, despite these limitations, observed + * contention is typically low in these cases. * * Table entries are of class Adder; a form of AtomicLong padded * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are most often - * irregularly scattered in memory and thus don't interfere much - * with each other. But Atomic objects residing in arrays will - * tend to be placed adjacent to each other, and so will most - * often share cache lines without this precaution. Adders are + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines without this precaution. Adders are by default * constructed upon first use, which further improves per-thread - * locality and helps reduce (an already large) footprint. + * locality and helps reduce footprint. * * A single spinlock is used for resizing the table as well as * populating slots with new Adders. Upon lock contention, threads * try other slots rather than blocking. After initialization, at * least one slot exists, so retries will eventually find a - * candidate Adder. During these retries, there is increased + * candidate Adder. During these retries, there is increased * contention and reduced locality, which is still better than * alternatives. */ /** - * Number of processors, to place a cap on table growth. - */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** * Padded version of AtomicLong */ static final class Adder extends AtomicLong { - long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd; + long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; Adder(long x) { super(x); } } + private static final int NCPU = Runtime.getRuntime().availableProcessors(); + /** - * Holder for the thread-local hash code. The code starts off with - * a given random value, but may be set to a different value upon - * collisions in retryAdd. + * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon + * first use under default constructor, and must be a power of + * two. There is not much point in making size a lot smaller than + * that of Adders though. CAP is the maximum allowed table size. + */ + private static final int DEFAULT_INITIAL_SIZE = 8; + private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE); + + /** + * Holder for the thread-local hash code. The code is initially + * random, but may be set to a different value upon collisions. */ static final class HashCode { + static final Random rng = new Random(); int code; - HashCode(int h) { code = h; } + HashCode() { + int h = rng.nextInt(); + code = (h == 0) ? 1 : h; // ensure nonzero + } } /** * The corresponding ThreadLocal class */ static final class ThreadHashCode extends ThreadLocal { - static final Random rng = new Random(); - public HashCode initialValue() { - int h = rng.nextInt(); - return new HashCode((h == 0) ? 1 : h); // ensure nonzero - } + public HashCode initialValue() { return new HashCode(); } } /** * Static per-thread hash codes. Shared across all StripedAdders - * because adjustments due to collisions in one table are likely - * to be appropriate for others. + * to reduce ThreadLocal pollution and because adjustments due to + * collisions in one table are likely to be appropriate for + * others. */ static final ThreadHashCode threadHashCode = new ThreadHashCode(); /** - * Table of adders. Minimum size 2. Size grows to be at most NCPU. + * Table of adders. Size is power of two, grows to be at most CAP. */ private transient volatile Adder[] adders; /** * Serves as a lock when resizing and/or creating Adders. There - * is no need for a blocking lock: When busy, other threads try - * other slots. + * is no need for a blocking lock: Except during initialization + * races, when busy, other threads try other slots. However, + * during (double-checked) initializations, we use the + * "synchronized" lock on this object. */ private final AtomicInteger mutex; @@ -149,8 +169,8 @@ public class StripedAdder implements Ser * will concurrently update the sum. */ public StripedAdder(int expectedContention) { - int cap = (expectedContention < NCPU) ? expectedContention : NCPU; - int size = 2; + int cap = (expectedContention < CAP) ? expectedContention : CAP; + int size = 1; while (size < cap) size <<= 1; Adder[] as = new Adder[size]; @@ -168,58 +188,71 @@ public class StripedAdder implements Ser public void add(long x) { Adder[] as; Adder a; int n; long v; // locals to hold volatile reads HashCode hc = threadHashCode.get(); + int h = hc.code; if ((as = adders) == null || (n = as.length) < 1 || - (a = as[hc.code & (n - 1)]) == null || + (a = as[(n - 1) & h]) == null || !a.compareAndSet(v = a.get(), v + x)) retryAdd(x, hc); } /** * Handle cases of add involving initialization, resizing, - * creating new Adders, and/or contention. + * creating new Adders, and/or contention. See above for + * explanation. */ private void retryAdd(long x, HashCode hc) { int h = hc.code; final AtomicInteger mutex = this.mutex; - AtomicInteger lock = null; // nonnull when held - try { - for (;;) { - Adder[] as; Adder a; long v; int n, k; // locals for volatiles - boolean needLock = true; - if ((as = adders) == null || (n = as.length) < 1) { - if (lock != null) // default-initialize - adders = new Adder[2]; + int collisions = 1 - mutex.get(); // first guess: collides if not locked + for (;;) { + Adder[] as; Adder a; long v; int k, n; + while ((as = adders) == null || (n = as.length) < 1) { + synchronized(mutex) { // Try to initialize + if (adders == null) { + Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE]; + rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0); + adders = rs; + } } - else if ((a = as[k = h & (n - 1)]) == null) { - if (lock != null) { // attach new adder - as[k] = new Adder(x); - break; + collisions = 0; + } + + if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot + if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) { + try { + if (adders == as && as[k] == null) + a = as[k] = new Adder(x); + } finally { + mutex.set(0); } + if (a != null) + break; } - else if (a.compareAndSet(v = a.get(), v + x)) - break; - else if (n >= NCPU) // cannot expand - needLock = false; - else if (lock != null) // expand table - adders = Arrays.copyOf(as, n << 1); - - if (lock == null) { - if (needLock && mutex.get() == 0 && - mutex.compareAndSet(0, 1)) - lock = mutex; - else { // try elsewhere - h ^= h << 13; // Marsaglia XorShift - h ^= h >>> 17; - h ^= h << 5; + collisions = 0; + } + else if (collisions != 0 && n < CAP && // Try to expand table + mutex.get() == 0 && mutex.compareAndSet(0, 1)) { + try { + if (adders == as) { + Adder[] rs = new Adder[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + adders = rs; } + } finally { + mutex.set(0); } + collisions = 0; } - } finally { - if (lock != null) - lock.set(0); + else if (a.compareAndSet(v = a.get(), v + x)) + break; + else + collisions = 1; + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; } - if (hc.code != h) // avoid unneeded writes - hc.code = h; + hc.code = h; } /**