--- jsr166/src/jsr166e/StripedAdder.java 2011/07/26 18:30:35 1.7 +++ jsr166/src/jsr166e/StripedAdder.java 2011/07/30 16:26:34 1.12 @@ -5,7 +5,6 @@ */ package jsr166e; -import java.util.Arrays; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -15,27 +14,33 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** - * A set of variables that together maintain a sum. When updates - * (method {@link #add}) are contended across threads, this set of - * adder variables may grow dynamically to reduce contention. Method - * {@link #sum} returns the current combined total across these - * adders. This value is NOT an atomic snapshot (concurrent - * updates may occur while the sum is being calculated), and so cannot - * be used alone for fine-grained synchronization control. + * One or more variables that together maintain an initially zero sum. + * When updates (method {@link #add}) are contended across threads, + * the set of variables may grow dynamically to reduce contention. * - *

This class may be applicable when many threads frequently - * update a common sum that is used for purposes such as collecting - * statistics. In this case, performance may be significantly faster - * than using a shared {@link AtomicLong}, at the expense of using - * much more space. On the other hand, if it is known that only one - * thread can ever update the sum, performance may be significantly - * slower than just updating a local variable. + *

This class is usually preferable to {@link AtomicLong} when + * multiple threads update a common sum that is used for purposes such + * as collecting statistics, not for fine-grained synchronization + * control. Under high update contention, throughput of this class is + * expected to be significantly higher, at the expense of higher space + * consumption. Under low contention, this class imposes very little + * time and space overhead compared to AtomicLong. On the other hand, + * in contexts where it is statically known that only one thread can + * ever update a sum, time and space overhead is noticeably greater + * than just updating a local variable. * - *

A StripedAdder may optionally be constructed with a given - * expected contention level; i.e., the number of threads that are - * expected to concurrently update the sum. Supplying an accurate - * value may improve performance by reducing the need for dynamic - * adjustment. + *

Method {@link #sum} returns the current combined total across + * the variables maintaining the sum. This value is NOT an + * atomic snapshot: Concurrent updates may occur while the sum is + * being calculated. However, updates cannot be "lost", so invocation + * of sum in the absence of concurrent updates always + * returns an accurate result. The sum may also be reset + * to zero, as an alternative to creating a new adder. However, + * method {@link #reset} is intrinsically racy, so should only be used + * when it is known that no threads are concurrently updating the sum. + * + *

jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic * * @author Doug Lea */ @@ -43,73 +48,79 @@ public class StripedAdder implements Ser private static final long serialVersionUID = 7249069246863182397L; /* - * A StripedAdder maintains a table of Atomic long variables. The - * table is indexed by per-thread hash codes. + * A StripedAdder maintains a lazily-initialized table of + * atomically updated variables, plus an extra "base" field. The + * table size is a power of two. Indexing uses masked per-thread + * hash codes * - * By default, the table is lazily initialized, to minimize - * footprint until adders are used. On first use, the table is set - * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is - * bounded by the number of CPUS (if larger than the default - * size). + * Table entries are of class Cell; a variant of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines (with a huge negative performance impact) without + * this precaution. + * + * In part because Cells are relatively large, we avoid creating + * them until they are needed. When there is no contention, all + * updates are made to the base field. Upon first contention (a + * failed CAS on base update), the table is initialized to size 2. + * The table size is doubled upon further contention until + * reaching the nearest power of two greater than or equal to the + * number of CPUS. * * Per-thread hash codes are initialized to random values. - * Collisions are indicated by failed CASes when performing an add - * operation (see method retryAdd). Upon a collision, if the table - * size is less than the capacity, it is doubled in size unless - * some other thread holds lock. If a hashed slot is empty, and - * lock is available, a new Adder is created. Otherwise, if the - * slot exists, a CAS is tried. Retries proceed by "double - * hashing", using a secondary hash (Marsaglia XorShift) to try to - * find a free slot. + * Contention and/or table collisions are indicated by failed + * CASes when performing an add operation (see method + * retryAdd). Upon a collision, if the table size is less than the + * capacity, it is doubled in size unless some other thread holds + * the lock. If a hashed slot is empty, and lock is available, a + * new Cell is created. Otherwise, if the slot exists, a CAS is + * tried. Retries proceed by "double hashing", using a secondary + * hash (Marsaglia XorShift) to try to find a free slot. * * The table size is capped because, when there are more threads * than CPUs, supposing that each thread were bound to a CPU, * there would exist a perfect hash function mapping threads to * slots that eliminates collisions. When we reach capacity, we * search for this mapping by randomly varying the hash codes of - * colliding threads. Because search is random, and failures only - * become known via CAS failures, convergence will be slow, and - * because threads are typically not bound to CPUS forever, may - * not occur at all. However, despite these limitations, observed - * contention is typically low in these cases. + * colliding threads. Because search is random, and collisions + * only become known via CAS failures, convergence can be slow, + * and because threads are typically not bound to CPUS forever, + * may not occur at all. However, despite these limitations, + * observed contention rates are typically low in these cases. * - * Table entries are of class Adder; a form of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines without this precaution. Adders are by default - * constructed upon first use, which further improves per-thread - * locality and helps reduce footprint. + * A single spinlock is used for initializing and resizing the + * table, as well as populating slots with new Cells. There is no + * need for a blocking lock: Upon lock contention, threads try + * other slots (or the base) rather than blocking. During these + * retries, there is increased contention and reduced locality, + * which is still better than alternatives. + * + * It is possible for a Cell to become unused when threads that + * once hashed to it terminate, as well as in the case where + * doubling the table causes no thread to hash to it under + * expanded mask. We do not try to detect or remove such cells, + * under the assumption that for long-running adders, observed + * contention levels will recur, so the cells will eventually be + * needed again; and for short-lived ones, it does not matter. * - * A single spinlock is used for resizing the table as well as - * populating slots with new Adders. Upon lock contention, threads - * try other slots rather than blocking. After initialization, at - * least one slot exists, so retries will eventually find a - * candidate Adder. During these retries, there is increased - * contention and reduced locality, which is still better than - * alternatives. - */ - - /** - * Padded version of AtomicLong */ - static final class Adder extends AtomicLong { - long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - Adder(long x) { super(x); } - } private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** - * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon - * first use under default constructor, and must be a power of - * two. There is not much point in making size a lot smaller than - * that of Adders though. CAP is the maximum allowed table size. - */ - private static final int DEFAULT_INITIAL_SIZE = 8; - private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE); + * Padded variant of AtomicLong. The value field is placed + * between pads, hoping that the JVM doesn't reorder them. + * Updates are via inlined CAS in methods add and retryAdd. + */ + static final class Cell { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Cell(long x) { value = x; } + } /** * Holder for the thread-local hash code. The code is initially @@ -119,8 +130,8 @@ public class StripedAdder implements Ser static final Random rng = new Random(); int code; HashCode() { - int h = rng.nextInt(); - code = (h == 0) ? 1 : h; // ensure nonzero + int h = rng.nextInt(); // Avoid zero to allow xorShift rehash + code = (h == 0) ? 1 : h; } } @@ -140,55 +151,25 @@ public class StripedAdder implements Ser static final ThreadHashCode threadHashCode = new ThreadHashCode(); /** - * Common placeholder for empty arrays. + * Table of cells. When non-null, size is a power of 2. */ - static final Adder[] EMPTY_ARRAY = new Adder[0]; + private transient volatile Cell[] cells; /** - * Table of adders. Size is either zero or a power of two, grows - * to be at most CAP. + * Base sum, used mainly when there is no contention, but also as + * a fallback during table initializion races. Updated via CAS. */ - private transient volatile Adder[] adders; + private transient volatile long base; /** - * Serves as a lock when resizing and/or creating Adders. There - * is no need for a blocking lock: Except during initialization - * races, when busy, other threads try other slots. However, - * during (double-checked) initializations, we use the - * "synchronized" lock on this object. + * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ - private final AtomicInteger mutex; + private transient volatile int busy; /** - * Creates a new adder with zero sum. + * Creates a new adder with initial sum of zero. */ public StripedAdder() { - this.adders = EMPTY_ARRAY; - this.mutex = new AtomicInteger(); - // remaining initialization on first call to add. - } - - /** - * Creates a new adder with zero sum, and with stripes presized - * for the given expected contention level. - * - * @param expectedContention the expected number of threads that - * will concurrently update the sum. - */ - public StripedAdder(int expectedContention) { - if (expectedContention > 0) { - int cap = (expectedContention < CAP) ? expectedContention : CAP; - int size = 1; - while (size < cap) - size <<= 1; - Adder[] as = new Adder[size]; - for (int i = 0; i < size; ++i) - as[i] = new Adder(0); - this.adders = as; - } - else - this.adders = EMPTY_ARRAY; - this.mutex = new AtomicInteger(); } /** @@ -197,142 +178,195 @@ public class StripedAdder implements Ser * @param x the value to add */ public void add(long x) { - Adder[] as; Adder a; int n; long v; // locals to hold volatile reads - HashCode hc = threadHashCode.get(); - int h = hc.code; - if ((as = adders) == null || (n = as.length) < 1 || - (a = as[(n - 1) & h]) == null || - !a.compareAndSet(v = a.get(), v + x)) - retryAdd(x, hc); + Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended; + if ((as = cells) != null || + !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) { + int h = (hc = threadHashCode.get()).code; + if (as != null && (n = as.length) > 0 && + (a = as[(n - 1) & h]) != null) { + if (UNSAFE.compareAndSwapLong(a, valueOffset, + v = a.value, v + x)) + return; + contended = true; + } + else + contended = false; + retryAdd(x, hc, contended); + } } /** * Handle cases of add involving initialization, resizing, - * creating new Adders, and/or contention. See above for - * explanation. + * creating new Cells, and/or contention. See above for + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. + * + * @param x the value to add + * @param hc the hash code holder + * @param precontended true if CAS failed before call */ - private void retryAdd(long x, HashCode hc) { + private void retryAdd(long x, HashCode hc, boolean precontended) { int h = hc.code; - final AtomicInteger mutex = this.mutex; - int collisions = 1 - mutex.get(); // first guess: collides if not locked + boolean collide = false; // true if last slot nonempty for (;;) { - Adder[] as; Adder a; long v; int k, n; - while ((as = adders) == null || (n = as.length) < 1) { - synchronized(mutex) { // Try to initialize - if (adders == as) { - Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE]; - rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0); - adders = rs; + Cell[] as; Cell a; int n; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } } + collide = false; } - collisions = 0; - } - - if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot - if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) { - try { - if (adders == as && as[k] == null) - a = as[k] = new Adder(x); - } finally { - mutex.set(0); - } - if (a != null) + else if (precontended) // CAS already known to fail + precontended = false; // Continue after rehash + else { + long v = a.value; + if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x)) break; + if (!collide) + collide = true; + else if (n >= NCPU || cells != as) + collide = false; // Can't expand + else if (busy == 0 && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + collide = false; + try { + if (cells == as) { // Expand table + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + busy = 0; + } + continue; + } } - collisions = 0; + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; } - else if (collisions != 0 && n < CAP && // Try to expand table - mutex.get() == 0 && mutex.compareAndSet(0, 1)) { - try { - if (adders == as) { - Adder[] rs = new Adder[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - adders = rs; + else if (busy == 0 && cells == as && + UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) { + boolean init = false; + try { // Initialize + if (cells == as) { + Cell r = new Cell(x); + Cell[] rs = new Cell[2]; + rs[h & 1] = r; + cells = rs; + init = true; } } finally { - mutex.set(0); + busy = 0; } - collisions = 0; + if (init) + break; + } + else { // Lost initialization race + long b = base; // Fall back on using base + if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x)) + break; } - else if (a.compareAndSet(v = a.get(), v + x)) - break; - else - collisions = 1; - h ^= h << 13; // Rehash - h ^= h >>> 17; - h ^= h << 5; } - hc.code = h; + hc.code = h; // Record index for next time } /** - * Returns an estimate of the current sum. The result is - * calculated by summing multiple variables, so may not be - * accurate if updates occur concurrently with this method. + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); + } + + /** + * Returns the current sum. The result is only guaranteed to be + * accurate in the absence of concurrent updates. Otherwise, it + * may fail to reflect one or more updates occuring while + * calculating the result. * - * @return the estimated sum + * @return the sum */ public long sum() { - long sum = 0L; - Adder[] as = adders; + Cell[] as = cells; + long sum = base; if (as != null) { int n = as.length; for (int i = 0; i < n; ++i) { - Adder a = as[i]; + Cell a = as[i]; if (a != null) - sum += a.get(); + sum += a.value; } } return sum; } /** - * Resets each of the variables to zero. This is effective in - * fully resetting the sum only if there are no concurrent - * updates. + * Resets variables maintaining the sum to zero. This is + * effective in setting the sum to zero only if there are no + * concurrent updates. */ public void reset() { - Adder[] as = adders; + Cell[] as = cells; + base = 0L; if (as != null) { int n = as.length; for (int i = 0; i < n; ++i) { - Adder a = as[i]; + Cell a = as[i]; if (a != null) - a.set(0L); + a.value = 0L; } } } /** - * Equivalent to {@code add(1)}. - */ - public void increment() { - add(1L); - } - - /** - * Equivalent to {@code add(-1)}. - */ - public void decrement() { - add(-1L); - } - - /** - * Equivalent to {@link #sum} followed by {@link #reset}. + * Equivalent in effect to {@link #sum} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * not guaranteed to be the final sum occurring before + * the reset. * - * @return the estimated sum + * @return the sum */ - public long sumAndReset() { - long sum = 0L; - Adder[] as = adders; + public long sumThenReset() { + Cell[] as = cells; + long sum = base; + base = 0L; if (as != null) { int n = as.length; for (int i = 0; i < n; ++i) { - Adder a = as[i]; + Cell a = as[i]; if (a != null) { - sum += a.get(); - a.set(0L); + sum += a.value; + a.value = 0L; } } } @@ -348,8 +382,58 @@ public class StripedAdder implements Ser private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); - mutex.set(0); - add(s.readLong()); + busy = 0; + cells = null; + base = s.readLong(); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long baseOffset; + private static final long busyOffset; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class sk = StripedAdder.class; + baseOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } } }