ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.3 by dl, Fri Jul 22 13:25:12 2011 UTC vs.
Revision 1.12 by dl, Sat Jul 30 16:26:34 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 15 | Line 14 | import java.io.ObjectInputStream;
14   import java.io.ObjectOutputStream;
15  
16   /**
17 < * A set of variables that together maintain a sum.  When updates
18 < * (method {@link #add}) are contended across threads, this set of
19 < * adder variables may grow dynamically to reduce contention. Method
21 < * {@link #sum} returns the current combined total across these
22 < * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 < * updates may occur while the sum is being calculated), and so cannot
24 < * be used alone for fine-grained synchronization control.
17 > * One or more variables that together maintain an initially zero sum.
18 > * When updates (method {@link #add}) are contended across threads,
19 > * the set of variables may grow dynamically to reduce contention.
20   *
21 < * <p> This class may be applicable when many threads frequently
22 < * update a common sum that is used for purposes such as collecting
23 < * statistics. In this case, performance may be significantly faster
24 < * than using a shared {@link AtomicLong}, at the expense of using
25 < * significantly more space.  On the other hand, if it is known that
26 < * only one thread can ever update the sum, performance may be
27 < * significantly slower than just updating a local variable.
21 > * <p> This class is usually preferable to {@link AtomicLong} when
22 > * multiple threads update a common sum that is used for purposes such
23 > * as collecting statistics, not for fine-grained synchronization
24 > * control.  Under high update contention, throughput of this class is
25 > * expected to be significantly higher, at the expense of higher space
26 > * consumption.  Under low contention, this class imposes very little
27 > * time and space overhead compared to AtomicLong.  On the other hand,
28 > * in contexts where it is statically known that only one thread can
29 > * ever update a sum, time and space overhead is noticeably greater
30 > * than just updating a local variable.
31   *
32 < * <p>A StripedAdder may optionally be constructed with a given
33 < * expected contention level; i.e., the number of threads that are
34 < * expected to concurrently update the sum. Supplying an accurate
35 < * value may improve performance by reducing the need for dynamic
36 < * adjustment.
32 > * <p> Method {@link #sum} returns the current combined total across
33 > * the variables maintaining the sum.  This value is <em>NOT</em> an
34 > * atomic snapshot: Concurrent updates may occur while the sum is
35 > * being calculated.  However, updates cannot be "lost", so invocation
36 > * of <code>sum</code> in the absence of concurrent updates always
37 > * returns an accurate result.  The sum may also be <code>reset</code>
38 > * to zero, as an alternative to creating a new adder.  However,
39 > * method {@link #reset} is intrinsically racy, so should only be used
40 > * when it is known that no threads are concurrently updating the sum.
41 > *
42 > * <p><em>jsr166e note: This class is targeted to be placed in
43 > * java.util.concurrent.atomic<em>
44   *
45   * @author Doug Lea
46   */
# Line 43 | Line 48 | public class StripedAdder implements Ser
48      private static final long serialVersionUID = 7249069246863182397L;
49  
50      /*
51 <     * Overview: We maintain a table of Atomic long variables. The
52 <     * table is indexed by per-thread hash codes that are initialized
53 <     * to random values.
54 <     *
50 <     * The table doubles in size upon contention (as indicated by
51 <     * failed CASes when performing add()), but is capped at the
52 <     * nearest power of two >= #CPUS. This reflects the idea that,
53 <     * when there are more threads than CPUs, then if each thread were
54 <     * bound to a CPU, there would exist a perfect hash function
55 <     * mapping threads to slots that eliminates collisions. When we
56 <     * reach capacity, we search for this mapping by randomly varying
57 <     * the hash codes of colliding threads.  Because search is random,
58 <     * and failures only become known via CAS failures, convergence
59 <     * will be slow, and because threads are typically not bound to
60 <     * CPUS forever, may not occur at all. However, despite these
61 <     * limitations, observed contention is typically very low in these
62 <     * cases.
51 >     * A StripedAdder maintains a lazily-initialized table of
52 >     * atomically updated variables, plus an extra "base" field. The
53 >     * table size is a power of two. Indexing uses masked per-thread
54 >     * hash codes
55       *
56 <     * Table entries are of class Adder; a form of AtomicLong padded
56 >     * Table entries are of class Cell; a variant of AtomicLong padded
57       * to reduce cache contention on most processors. Padding is
58 <     * overkill for most Atomics because they are most often
59 <     * irregularly scattered in memory and thus don't interfere much
60 <     * with each other. But Atomic objects residing in arrays will
61 <     * tend to be placed adjacent to each other, and so will most
62 <     * often share cache lines without this precaution.  Except for
63 <     * slot adders[0], Adders are constructed upon first use, which
64 <     * further improves per-thread locality and helps reduce (an
65 <     * already large) footprint.
58 >     * overkill for most Atomics because they are usually irregularly
59 >     * scattered in memory and thus don't interfere much with each
60 >     * other. But Atomic objects residing in arrays will tend to be
61 >     * placed adjacent to each other, and so will most often share
62 >     * cache lines (with a huge negative performance impact) without
63 >     * this precaution.
64 >     *
65 >     * In part because Cells are relatively large, we avoid creating
66 >     * them until they are needed.  When there is no contention, all
67 >     * updates are made to the base field.  Upon first contention (a
68 >     * failed CAS on base update), the table is initialized to size 2.
69 >     * The table size is doubled upon further contention until
70 >     * reaching the nearest power of two greater than or equal to the
71 >     * number of CPUS.
72 >     *
73 >     * Per-thread hash codes are initialized to random values.
74 >     * Contention and/or table collisions are indicated by failed
75 >     * CASes when performing an add operation (see method
76 >     * retryAdd). Upon a collision, if the table size is less than the
77 >     * capacity, it is doubled in size unless some other thread holds
78 >     * the lock. If a hashed slot is empty, and lock is available, a
79 >     * new Cell is created. Otherwise, if the slot exists, a CAS is
80 >     * tried.  Retries proceed by "double hashing", using a secondary
81 >     * hash (Marsaglia XorShift) to try to find a free slot.
82 >     *
83 >     * The table size is capped because, when there are more threads
84 >     * than CPUs, supposing that each thread were bound to a CPU,
85 >     * there would exist a perfect hash function mapping threads to
86 >     * slots that eliminates collisions. When we reach capacity, we
87 >     * search for this mapping by randomly varying the hash codes of
88 >     * colliding threads.  Because search is random, and collisions
89 >     * only become known via CAS failures, convergence can be slow,
90 >     * and because threads are typically not bound to CPUS forever,
91 >     * may not occur at all. However, despite these limitations,
92 >     * observed contention rates are typically low in these cases.
93 >     *
94 >     * A single spinlock is used for initializing and resizing the
95 >     * table, as well as populating slots with new Cells.  There is no
96 >     * need for a blocking lock: Upon lock contention, threads try
97 >     * other slots (or the base) rather than blocking.  During these
98 >     * retries, there is increased contention and reduced locality,
99 >     * which is still better than alternatives.
100 >     *
101 >     * It is possible for a Cell to become unused when threads that
102 >     * once hashed to it terminate, as well as in the case where
103 >     * doubling the table causes no thread to hash to it under
104 >     * expanded mask.  We do not try to detect or remove such cells,
105 >     * under the assumption that for long-running adders, observed
106 >     * contention levels will recur, so the cells will eventually be
107 >     * needed again; and for short-lived ones, it does not matter.
108       *
75     * A single spinlock is used for resizing the table as well as
76     * populating slots with new Adders. Upon lock contention, threads
77     * try other slots rather than blocking. We guarantee that at
78     * least one slot (0) exists, so retries will eventually find a
79     * candidate Adder. During these retries, there is increased
80     * contention and reduced locality, which is still better than
81     * alternatives.
109       */
110  
111 <    /**
85 <     * Number of processors, to place a cap on table growth.
86 <     */
87 <    static final int NCPU = Runtime.getRuntime().availableProcessors();
111 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
112  
113      /**
114 <     * Padded version of AtomicLong
114 >     * Padded variant of AtomicLong.  The value field is placed
115 >     * between pads, hoping that the JVM doesn't reorder them.
116 >     * Updates are via inlined CAS in methods add and retryAdd.
117       */
118 <    static final class Adder extends AtomicLong {
119 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
120 <        Adder(long x) { super(x); }
118 >    static final class Cell {
119 >        volatile long p0, p1, p2, p3, p4, p5, p6;
120 >        volatile long value;
121 >        volatile long q0, q1, q2, q3, q4, q5, q6;
122 >        Cell(long x) { value = x; }
123      }
124  
125      /**
126 <     * Holder for the thread-local hash code. The code starts off with
127 <     * a given random value, but may be set to a different
100 <     * pseudo-random value (using a cheaper but adequate xorshift
101 <     * generator) upon collisions.
126 >     * Holder for the thread-local hash code. The code is initially
127 >     * random, but may be set to a different value upon collisions.
128       */
129      static final class HashCode {
130 +        static final Random rng = new Random();
131          int code;
132 <        HashCode(int h) { code = h; }
132 >        HashCode() {
133 >            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134 >            code = (h == 0) ? 1 : h;
135 >        }
136      }
137  
138      /**
139       * The corresponding ThreadLocal class
140       */
141      static final class ThreadHashCode extends ThreadLocal<HashCode> {
142 <        static final Random rng = new Random();
113 <        public HashCode initialValue() {
114 <            int h = rng.nextInt();
115 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
116 <        }
142 >        public HashCode initialValue() { return new HashCode(); }
143      }
144  
145      /**
146       * Static per-thread hash codes. Shared across all StripedAdders
147 <     * because adjustments due to collisions in one table are likely
148 <     * to be appropriate for others.
147 >     * to reduce ThreadLocal pollution and because adjustments due to
148 >     * collisions in one table are likely to be appropriate for
149 >     * others.
150       */
151      static final ThreadHashCode threadHashCode = new ThreadHashCode();
152  
153      /**
154 <     * Table of adders. Minimum size 2. Size grows to be at most NCPU.
154 >     * Table of cells. When non-null, size is a power of 2.
155       */
156 <    private transient volatile Adder[] adders;
156 >    private transient volatile Cell[] cells;
157  
158      /**
159 <     * Serves as a lock when resizing and/or creating Adders.  There
160 <     * is no need for a blocking lock: When busy, other threads try
134 <     * other slots.
159 >     * Base sum, used mainly when there is no contention, but also as
160 >     * a fallback during table initializion races. Updated via CAS.
161       */
162 <    private final AtomicInteger mutex;
162 >    private transient volatile long base;
163  
164      /**
165 <     * Marsaglia XorShift random generator for rehashing on collisions
165 >     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166       */
167 <    private static int xorShift(int r) {
142 <        r ^= r << 13;
143 <        r ^= r >>> 17;
144 <        return r ^ (r << 5);
145 <    }
167 >    private transient volatile int busy;
168  
169      /**
170 <     * Creates a new adder with zero sum.
170 >     * Creates a new adder with initial sum of zero.
171       */
172      public StripedAdder() {
151        this(2);
173      }
174  
175      /**
176 <     * Creates a new adder with zero sum, and with stripes presized
156 <     * for the given expected contention level.
176 >     * Adds the given value.
177       *
178 <     * @param expectedContention the expected number of threads that
159 <     * will concurrently update the sum.
178 >     * @param x the value to add
179       */
180 <    public StripedAdder(int expectedContention) {
181 <        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
182 <        int size = 2;
183 <        while (size < cap)
184 <            size <<= 1;
185 <        Adder[] as = new Adder[size];
186 <        as[0] = new Adder(0); // ensure at least one available adder
187 <        this.adders = as;
188 <        this.mutex = new AtomicInteger();
180 >    public void add(long x) {
181 >        Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182 >        if ((as = cells) != null ||
183 >            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184 >            int h = (hc = threadHashCode.get()).code;
185 >            if (as != null && (n = as.length) > 0 &&
186 >                (a = as[(n - 1) & h]) != null) {
187 >                if (UNSAFE.compareAndSwapLong(a, valueOffset,
188 >                                              v = a.value, v + x))
189 >                    return;
190 >                contended = true;
191 >            }
192 >            else
193 >                contended = false;
194 >            retryAdd(x, hc, contended);
195 >        }
196      }
197  
198      /**
199 <     * Adds the given value.
199 >     * Handle cases of add involving initialization, resizing,
200 >     * creating new Cells, and/or contention. See above for
201 >     * explanation. This method suffers the usual non-modularity
202 >     * problems of optimistic retry code, relying on rechecked sets of
203 >     * reads.
204       *
205       * @param x the value to add
206 +     * @param hc the hash code holder
207 +     * @param precontended true if CAS failed before call
208       */
209 <    public void add(long x) {
210 <        HashCode hc = threadHashCode.get();
211 <        for (int h = hc.code;;) {
212 <            Adder[] as = adders;
213 <            int n = as.length;
214 <            Adder a = as[h & (n - 1)];
215 <            if (a != null) {
216 <                long v = a.get();
217 <                if (a.compareAndSet(v, v + x))
218 <                    break;
219 <                if (n >= NCPU) {                 // Collision when table at max
220 <                    h = hc.code = xorShift(h);   // change code
221 <                    continue;
209 >    private void retryAdd(long x, HashCode hc, boolean precontended) {
210 >        int h = hc.code;
211 >        boolean collide = false; // true if last slot nonempty
212 >        for (;;) {
213 >            Cell[] as; Cell a; int n;
214 >            if ((as = cells) != null && (n = as.length) > 0) {
215 >                if ((a = as[(n - 1) & h]) == null) {
216 >                    if (busy == 0) {            // Try to attach new Cell
217 >                        Cell r = new Cell(x);   // Optimistically create
218 >                        if (busy == 0 &&
219 >                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220 >                            boolean created = false;
221 >                            try {               // Recheck under lock
222 >                                Cell[] rs; int m, j;
223 >                                if ((rs = cells) != null &&
224 >                                    (m = rs.length) > 0 &&
225 >                                    rs[j = (m - 1) & h] == null) {
226 >                                    rs[j] = r;
227 >                                    created = true;
228 >                                }
229 >                            } finally {
230 >                                busy = 0;
231 >                            }
232 >                            if (created)
233 >                                break;
234 >                            continue;           // Slot is now non-empty
235 >                        }
236 >                    }
237 >                    collide = false;
238 >                }
239 >                else if (precontended)          // CAS already known to fail
240 >                    precontended = false;       // Continue after rehash
241 >                else {
242 >                    long v = a.value;
243 >                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
244 >                        break;
245 >                    if (!collide)
246 >                        collide = true;
247 >                    else if (n >= NCPU || cells != as)
248 >                        collide = false;        // Can't expand
249 >                    else if (busy == 0 &&
250 >                             UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251 >                        collide = false;
252 >                        try {
253 >                            if (cells == as) { // Expand table
254 >                                Cell[] rs = new Cell[n << 1];
255 >                                for (int i = 0; i < n; ++i)
256 >                                    rs[i] = as[i];
257 >                                cells = rs;
258 >                            }
259 >                        } finally {
260 >                            busy = 0;
261 >                        }
262 >                        continue;
263 >                    }
264                  }
265 +                h ^= h << 13;                   // Rehash
266 +                h ^= h >>> 17;
267 +                h ^= h << 5;
268              }
269 <            final AtomicInteger mutex = this.mutex;
270 <            if (mutex.get() != 0)
271 <                h = xorShift(h);                 // Try elsewhere
272 <            else if (mutex.compareAndSet(0, 1)) {
273 <                boolean created = false;
274 <                try {
275 <                    Adder[] rs = adders;
276 <                    if (a != null && rs == as)   // Resize table
277 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
278 <                    int j = h & (rs.length - 1);
202 <                    if (rs[j] == null) {         // Create adder
203 <                        rs[j] = new Adder(x);
204 <                        created = true;
269 >            else if (busy == 0 && cells == as &&
270 >                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 >                boolean init = false;
272 >                try {                           // Initialize
273 >                    if (cells == as) {
274 >                        Cell r = new Cell(x);
275 >                        Cell[] rs = new Cell[2];
276 >                        rs[h & 1] = r;
277 >                        cells = rs;
278 >                        init = true;
279                      }
280                  } finally {
281 <                    mutex.set(0);
281 >                    busy = 0;
282                  }
283 <                if (created) {
284 <                    hc.code = h;                 // Use this adder next time
283 >                if (init)
284 >                    break;
285 >            }
286 >            else {                              // Lost initialization race
287 >                long b = base;                  // Fall back on using base
288 >                if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289                      break;
212                }
290              }
291          }
292 +        hc.code = h;                            // Record index for next time
293      }
294  
295      /**
296 <     * Returns an estimate of the current sum.  The result is
219 <     * calculated by summing multiple variables, so may not be
220 <     * accurate if updates occur concurrently with this method.
221 <     *
222 <     * @return the estimated sum
296 >     * Equivalent to {@code add(1)}.
297       */
298 <    public long sum() {
299 <        long sum = 0;
226 <        Adder[] as = adders;
227 <        int n = as.length;
228 <        for (int i = 0; i < n; ++i) {
229 <            Adder a = as[i];
230 <            if (a != null)
231 <                sum += a.get();
232 <        }
233 <        return sum;
298 >    public void increment() {
299 >        add(1L);
300      }
301  
302      /**
303 <     * Resets each of the variables to zero. This is effective in
238 <     * fully resetting the sum only if there are no concurrent
239 <     * updates.
303 >     * Equivalent to {@code add(-1)}.
304       */
305 <    public void reset() {
306 <        Adder[] as = adders;
243 <        int n = as.length;
244 <        for (int i = 0; i < n; ++i) {
245 <            Adder a = as[i];
246 <            if (a != null)
247 <                a.set(0L);
248 <        }
305 >    public void decrement() {
306 >        add(-1L);
307      }
308  
309      /**
310 <     * Equivalent to {@code add(1)}.
310 >     * Returns the current sum.  The result is only guaranteed to be
311 >     * accurate in the absence of concurrent updates. Otherwise, it
312 >     * may fail to reflect one or more updates occuring while
313 >     * calculating the result.
314 >     *
315 >     * @return the sum
316       */
317 <    public void increment() {
318 <        add(1L);
317 >    public long sum() {
318 >        Cell[] as = cells;
319 >        long sum = base;
320 >        if (as != null) {
321 >            int n = as.length;
322 >            for (int i = 0; i < n; ++i) {
323 >                Cell a = as[i];
324 >                if (a != null)
325 >                    sum += a.value;
326 >            }
327 >        }
328 >        return sum;
329      }
330  
331      /**
332 <     * Equivalent to {@code add(-1)}.
332 >     * Resets variables maintaining the sum to zero.  This is
333 >     * effective in setting the sum to zero only if there are no
334 >     * concurrent updates.
335       */
336 <    public void decrement() {
337 <        add(-1L);
336 >    public void reset() {
337 >        Cell[] as = cells;
338 >        base = 0L;
339 >        if (as != null) {
340 >            int n = as.length;
341 >            for (int i = 0; i < n; ++i) {
342 >                Cell a = as[i];
343 >                if (a != null)
344 >                    a.value = 0L;
345 >            }
346 >        }
347      }
348  
349      /**
350 <     * Equivalent to {@link #sum} followed by {@link #reset}.
350 >     * Equivalent in effect to {@link #sum} followed by {@link
351 >     * #reset}. This method may apply for example during quiescent
352 >     * points between multithreaded computations.  If there are
353 >     * updates concurrent with this method, the returned value is
354 >     * <em>not</em> guaranteed to be the final sum occurring before
355 >     * the reset.
356       *
357 <     * @return the estimated sum
357 >     * @return the sum
358       */
359 <    public long sumAndReset() {
360 <        long sum = 0;
361 <        Adder[] as = adders;
362 <        int n = as.length;
363 <        for (int i = 0; i < n; ++i) {
364 <            Adder a = as[i];
365 <            if (a != null) {
366 <                sum += a.get();
367 <                a.set(0L);
359 >    public long sumThenReset() {
360 >        Cell[] as = cells;
361 >        long sum = base;
362 >        base = 0L;
363 >        if (as != null) {
364 >            int n = as.length;
365 >            for (int i = 0; i < n; ++i) {
366 >                Cell a = as[i];
367 >                if (a != null) {
368 >                    sum += a.value;
369 >                    a.value = 0L;
370 >                }
371              }
372          }
373          return sum;
# Line 290 | Line 382 | public class StripedAdder implements Ser
382      private void readObject(ObjectInputStream s)
383          throws IOException, ClassNotFoundException {
384          s.defaultReadObject();
385 <        long c = s.readLong();
386 <        Adder[] as = new Adder[2];
387 <        as[0] = new Adder(c);
388 <        this.adders = as;
389 <        mutex.set(0);
385 >        busy = 0;
386 >        cells = null;
387 >        base = s.readLong();
388 >    }
389 >
390 >    // Unsafe mechanics
391 >    private static final sun.misc.Unsafe UNSAFE;
392 >    private static final long baseOffset;
393 >    private static final long busyOffset;
394 >    private static final long valueOffset;
395 >    static {
396 >        try {
397 >            UNSAFE = getUnsafe();
398 >            Class<?> sk = StripedAdder.class;
399 >            baseOffset = UNSAFE.objectFieldOffset
400 >                (sk.getDeclaredField("base"));
401 >            busyOffset = UNSAFE.objectFieldOffset
402 >                (sk.getDeclaredField("busy"));
403 >            Class<?> ak = Cell.class;
404 >            valueOffset = UNSAFE.objectFieldOffset
405 >                (ak.getDeclaredField("value"));
406 >        } catch (Exception e) {
407 >            throw new Error(e);
408 >        }
409      }
410  
411 < }
412 <
411 >    /**
412 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
413 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
414 >     * into a jdk.
415 >     *
416 >     * @return a sun.misc.Unsafe
417 >     */
418 >    private static sun.misc.Unsafe getUnsafe() {
419 >        try {
420 >            return sun.misc.Unsafe.getUnsafe();
421 >        } catch (SecurityException se) {
422 >            try {
423 >                return java.security.AccessController.doPrivileged
424 >                    (new java.security
425 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
426 >                        public sun.misc.Unsafe run() throws Exception {
427 >                            java.lang.reflect.Field f = sun.misc
428 >                                .Unsafe.class.getDeclaredField("theUnsafe");
429 >                            f.setAccessible(true);
430 >                            return (sun.misc.Unsafe) f.get(null);
431 >                        }});
432 >            } catch (java.security.PrivilegedActionException e) {
433 >                throw new RuntimeException("Could not initialize intrinsics",
434 >                                           e.getCause());
435 >            }
436 >        }
437 >    }
438  
439 + }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines