ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.12
Committed: Sat Jul 30 16:26:34 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.11: +179 -149 lines
Log Message:
API, spec and implementation improvements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Random;
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicLong;
11     import java.io.IOException;
12     import java.io.Serializable;
13     import java.io.ObjectInputStream;
14     import java.io.ObjectOutputStream;
15    
16     /**
17 dl 1.12 * One or more variables that together maintain an initially zero sum.
18     * When updates (method {@link #add}) are contended across threads,
19     * the set of variables may grow dynamically to reduce contention.
20 jsr166 1.2 *
21 dl 1.12 * <p> This class is usually preferable to {@link AtomicLong} when
22     * multiple threads update a common sum that is used for purposes such
23     * as collecting statistics, not for fine-grained synchronization
24     * control. Under high update contention, throughput of this class is
25     * expected to be significantly higher, at the expense of higher space
26     * consumption. Under low contention, this class imposes very little
27     * time and space overhead compared to AtomicLong. On the other hand,
28     * in contexts where it is statically known that only one thread can
29     * ever update a sum, time and space overhead is noticeably greater
30 dl 1.10 * than just updating a local variable.
31 dl 1.1 *
32 dl 1.12 * <p> Method {@link #sum} returns the current combined total across
33     * the variables maintaining the sum. This value is <em>NOT</em> an
34     * atomic snapshot: Concurrent updates may occur while the sum is
35     * being calculated. However, updates cannot be "lost", so invocation
36     * of <code>sum</code> in the absence of concurrent updates always
37     * returns an accurate result. The sum may also be <code>reset</code>
38     * to zero, as an alternative to creating a new adder. However,
39     * method {@link #reset} is intrinsically racy, so should only be used
40     * when it is known that no threads are concurrently updating the sum.
41     *
42     * <p><em>jsr166e note: This class is targeted to be placed in
43     * java.util.concurrent.atomic<em>
44 dl 1.3 *
45 jsr166 1.2 * @author Doug Lea
46 dl 1.1 */
47     public class StripedAdder implements Serializable {
48     private static final long serialVersionUID = 7249069246863182397L;
49    
50     /*
51 dl 1.12 * A StripedAdder maintains a lazily-initialized table of
52     * atomically updated variables, plus an extra "base" field. The
53     * table size is a power of two. Indexing uses masked per-thread
54     * hash codes
55     *
56     * Table entries are of class Cell; a variant of AtomicLong padded
57     * to reduce cache contention on most processors. Padding is
58     * overkill for most Atomics because they are usually irregularly
59     * scattered in memory and thus don't interfere much with each
60     * other. But Atomic objects residing in arrays will tend to be
61     * placed adjacent to each other, and so will most often share
62     * cache lines (with a huge negative performance impact) without
63     * this precaution.
64     *
65     * In part because Cells are relatively large, we avoid creating
66     * them until they are needed. When there is no contention, all
67     * updates are made to the base field. Upon first contention (a
68     * failed CAS on base update), the table is initialized to size 2.
69     * The table size is doubled upon further contention until
70     * reaching the nearest power of two greater than or equal to the
71     * number of CPUS.
72 dl 1.3 *
73 dl 1.12 * Per-thread hash codes are initialized to random values.
74     * Contention and/or table collisions are indicated by failed
75     * CASes when performing an add operation (see method
76     * retryAdd). Upon a collision, if the table size is less than the
77     * capacity, it is doubled in size unless some other thread holds
78     * the lock. If a hashed slot is empty, and lock is available, a
79     * new Cell is created. Otherwise, if the slot exists, a CAS is
80     * tried. Retries proceed by "double hashing", using a secondary
81     * hash (Marsaglia XorShift) to try to find a free slot.
82     *
83     * The table size is capped because, when there are more threads
84     * than CPUs, supposing that each thread were bound to a CPU,
85     * there would exist a perfect hash function mapping threads to
86     * slots that eliminates collisions. When we reach capacity, we
87     * search for this mapping by randomly varying the hash codes of
88     * colliding threads. Because search is random, and collisions
89     * only become known via CAS failures, convergence can be slow,
90     * and because threads are typically not bound to CPUS forever,
91     * may not occur at all. However, despite these limitations,
92     * observed contention rates are typically low in these cases.
93     *
94     * A single spinlock is used for initializing and resizing the
95     * table, as well as populating slots with new Cells. There is no
96     * need for a blocking lock: Upon lock contention, threads try
97     * other slots (or the base) rather than blocking. During these
98     * retries, there is increased contention and reduced locality,
99     * which is still better than alternatives.
100     *
101     * It is possible for a Cell to become unused when threads that
102     * once hashed to it terminate, as well as in the case where
103     * doubling the table causes no thread to hash to it under
104     * expanded mask. We do not try to detect or remove such cells,
105     * under the assumption that for long-running adders, observed
106     * contention levels will recur, so the cells will eventually be
107     * needed again; and for short-lived ones, it does not matter.
108 dl 1.6 *
109 dl 1.1 */
110    
111 dl 1.6 private static final int NCPU = Runtime.getRuntime().availableProcessors();
112 dl 1.5
113     /**
114 dl 1.8 * Padded variant of AtomicLong. The value field is placed
115     * between pads, hoping that the JVM doesn't reorder them.
116     * Updates are via inlined CAS in methods add and retryAdd.
117     */
118 dl 1.12 static final class Cell {
119 dl 1.8 volatile long p0, p1, p2, p3, p4, p5, p6;
120     volatile long value;
121     volatile long q0, q1, q2, q3, q4, q5, q6;
122 dl 1.12 Cell(long x) { value = x; }
123 dl 1.8 }
124 dl 1.1
125 jsr166 1.2 /**
126 dl 1.6 * Holder for the thread-local hash code. The code is initially
127     * random, but may be set to a different value upon collisions.
128 dl 1.1 */
129     static final class HashCode {
130 dl 1.6 static final Random rng = new Random();
131 dl 1.1 int code;
132 dl 1.6 HashCode() {
133 dl 1.12 int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134 dl 1.8 code = (h == 0) ? 1 : h;
135 dl 1.6 }
136 dl 1.1 }
137    
138     /**
139     * The corresponding ThreadLocal class
140     */
141     static final class ThreadHashCode extends ThreadLocal<HashCode> {
142 dl 1.6 public HashCode initialValue() { return new HashCode(); }
143 dl 1.1 }
144    
145     /**
146     * Static per-thread hash codes. Shared across all StripedAdders
147 dl 1.6 * to reduce ThreadLocal pollution and because adjustments due to
148     * collisions in one table are likely to be appropriate for
149     * others.
150 dl 1.1 */
151     static final ThreadHashCode threadHashCode = new ThreadHashCode();
152    
153     /**
154 dl 1.12 * Table of cells. When non-null, size is a power of 2.
155 dl 1.1 */
156 dl 1.12 private transient volatile Cell[] cells;
157 dl 1.1
158     /**
159 dl 1.12 * Base sum, used mainly when there is no contention, but also as
160     * a fallback during table initializion races. Updated via CAS.
161 dl 1.1 */
162 dl 1.12 private transient volatile long base;
163 dl 1.1
164     /**
165 dl 1.12 * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166 dl 1.1 */
167 dl 1.12 private transient volatile int busy;
168 dl 1.3
169     /**
170 dl 1.12 * Creates a new adder with initial sum of zero.
171 dl 1.3 */
172 dl 1.12 public StripedAdder() {
173 dl 1.1 }
174    
175     /**
176     * Adds the given value.
177     *
178     * @param x the value to add
179     */
180     public void add(long x) {
181 dl 1.12 Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182     if ((as = cells) != null ||
183     !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184     int h = (hc = threadHashCode.get()).code;
185     if (as != null && (n = as.length) > 0 &&
186     (a = as[(n - 1) & h]) != null) {
187     if (UNSAFE.compareAndSwapLong(a, valueOffset,
188     v = a.value, v + x))
189     return;
190     contended = true;
191     }
192     else
193     contended = false;
194     retryAdd(x, hc, contended);
195 dl 1.8 }
196 dl 1.4 }
197    
198     /**
199     * Handle cases of add involving initialization, resizing,
200 dl 1.12 * creating new Cells, and/or contention. See above for
201 dl 1.8 * explanation. This method suffers the usual non-modularity
202     * problems of optimistic retry code, relying on rechecked sets of
203     * reads.
204 dl 1.10 *
205     * @param x the value to add
206     * @param hc the hash code holder
207     * @param precontended true if CAS failed before call
208 dl 1.4 */
209 dl 1.10 private void retryAdd(long x, HashCode hc, boolean precontended) {
210 dl 1.4 int h = hc.code;
211 dl 1.10 boolean collide = false; // true if last slot nonempty
212 dl 1.6 for (;;) {
213 dl 1.12 Cell[] as; Cell a; int n;
214     if ((as = cells) != null && (n = as.length) > 0) {
215 dl 1.10 if ((a = as[(n - 1) & h]) == null) {
216 dl 1.12 if (busy == 0) { // Try to attach new Cell
217     Cell r = new Cell(x); // Optimistically create
218 dl 1.11 if (busy == 0 &&
219     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220     boolean created = false;
221     try { // Recheck under lock
222 dl 1.12 Cell[] rs; int m, j;
223     if ((rs = cells) != null &&
224 dl 1.11 (m = rs.length) > 0 &&
225     rs[j = (m - 1) & h] == null) {
226     rs[j] = r;
227     created = true;
228     }
229     } finally {
230     busy = 0;
231 dl 1.8 }
232 dl 1.11 if (created)
233     break;
234     continue; // Slot is now non-empty
235 dl 1.8 }
236 dl 1.6 }
237 dl 1.8 collide = false;
238 dl 1.6 }
239 dl 1.10 else if (precontended) // CAS already known to fail
240     precontended = false; // Continue after rehash
241     else {
242     long v = a.value;
243     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
244     break;
245     if (!collide)
246     collide = true;
247 dl 1.12 else if (n >= NCPU || cells != as)
248 dl 1.11 collide = false; // Can't expand
249 dl 1.10 else if (busy == 0 &&
250     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251     collide = false;
252     try {
253 dl 1.12 if (cells == as) { // Expand table
254     Cell[] rs = new Cell[n << 1];
255 dl 1.10 for (int i = 0; i < n; ++i)
256     rs[i] = as[i];
257 dl 1.12 cells = rs;
258 dl 1.10 }
259     } finally {
260     busy = 0;
261     }
262     continue;
263     }
264     }
265 dl 1.8 h ^= h << 13; // Rehash
266     h ^= h >>> 17;
267     h ^= h << 5;
268 dl 1.6 }
269 dl 1.12 else if (busy == 0 && cells == as &&
270     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 dl 1.10 boolean init = false;
272 dl 1.12 try { // Initialize
273     if (cells == as) {
274     Cell r = new Cell(x);
275     Cell[] rs = new Cell[2];
276     rs[h & 1] = r;
277     cells = rs;
278     init = true;
279 dl 1.5 }
280 dl 1.12 } finally {
281     busy = 0;
282 dl 1.1 }
283 dl 1.10 if (init)
284     break;
285 dl 1.5 }
286 dl 1.12 else { // Lost initialization race
287     long b = base; // Fall back on using base
288     if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289     break;
290     }
291 dl 1.1 }
292 dl 1.8 hc.code = h; // Record index for next time
293 dl 1.1 }
294    
295     /**
296 dl 1.9 * Equivalent to {@code add(1)}.
297     */
298     public void increment() {
299     add(1L);
300     }
301    
302     /**
303     * Equivalent to {@code add(-1)}.
304     */
305     public void decrement() {
306     add(-1L);
307     }
308    
309     /**
310 dl 1.12 * Returns the current sum. The result is only guaranteed to be
311     * accurate in the absence of concurrent updates. Otherwise, it
312     * may fail to reflect one or more updates occuring while
313     * calculating the result.
314 jsr166 1.2 *
315 dl 1.12 * @return the sum
316 dl 1.1 */
317     public long sum() {
318 dl 1.12 Cell[] as = cells;
319     long sum = base;
320 dl 1.4 if (as != null) {
321     int n = as.length;
322     for (int i = 0; i < n; ++i) {
323 dl 1.12 Cell a = as[i];
324 dl 1.4 if (a != null)
325 dl 1.8 sum += a.value;
326 dl 1.4 }
327 dl 1.1 }
328     return sum;
329     }
330    
331     /**
332 dl 1.12 * Resets variables maintaining the sum to zero. This is
333     * effective in setting the sum to zero only if there are no
334     * concurrent updates.
335     */
336     public void reset() {
337     Cell[] as = cells;
338     base = 0L;
339     if (as != null) {
340     int n = as.length;
341     for (int i = 0; i < n; ++i) {
342     Cell a = as[i];
343     if (a != null)
344     a.value = 0L;
345     }
346     }
347     }
348    
349     /**
350     * Equivalent in effect to {@link #sum} followed by {@link
351     * #reset}. This method may apply for example during quiescent
352     * points between multithreaded computations. If there are
353     * updates concurrent with this method, the returned value is
354     * <em>not</em> guaranteed to be the final sum occurring before
355     * the reset.
356     *
357     * @return the sum
358     */
359     public long sumThenReset() {
360     Cell[] as = cells;
361     long sum = base;
362     base = 0L;
363 dl 1.4 if (as != null) {
364     int n = as.length;
365     for (int i = 0; i < n; ++i) {
366 dl 1.12 Cell a = as[i];
367 dl 1.4 if (a != null) {
368 dl 1.8 sum += a.value;
369     a.value = 0L;
370 dl 1.4 }
371 dl 1.1 }
372     }
373     return sum;
374     }
375    
376     private void writeObject(java.io.ObjectOutputStream s)
377     throws java.io.IOException {
378     s.defaultWriteObject();
379     s.writeLong(sum());
380     }
381    
382     private void readObject(ObjectInputStream s)
383     throws IOException, ClassNotFoundException {
384     s.defaultReadObject();
385 dl 1.8 busy = 0;
386 dl 1.12 cells = null;
387     base = s.readLong();
388 dl 1.1 }
389    
390 dl 1.8 // Unsafe mechanics
391     private static final sun.misc.Unsafe UNSAFE;
392 dl 1.12 private static final long baseOffset;
393 dl 1.8 private static final long busyOffset;
394     private static final long valueOffset;
395     static {
396     try {
397     UNSAFE = getUnsafe();
398     Class<?> sk = StripedAdder.class;
399 dl 1.12 baseOffset = UNSAFE.objectFieldOffset
400     (sk.getDeclaredField("base"));
401 dl 1.8 busyOffset = UNSAFE.objectFieldOffset
402     (sk.getDeclaredField("busy"));
403 dl 1.12 Class<?> ak = Cell.class;
404 dl 1.8 valueOffset = UNSAFE.objectFieldOffset
405     (ak.getDeclaredField("value"));
406     } catch (Exception e) {
407     throw new Error(e);
408     }
409     }
410    
411     /**
412     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
413     * Replace with a simple call to Unsafe.getUnsafe when integrating
414     * into a jdk.
415     *
416     * @return a sun.misc.Unsafe
417     */
418     private static sun.misc.Unsafe getUnsafe() {
419     try {
420     return sun.misc.Unsafe.getUnsafe();
421     } catch (SecurityException se) {
422     try {
423     return java.security.AccessController.doPrivileged
424     (new java.security
425     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
426     public sun.misc.Unsafe run() throws Exception {
427     java.lang.reflect.Field f = sun.misc
428     .Unsafe.class.getDeclaredField("theUnsafe");
429     f.setAccessible(true);
430     return (sun.misc.Unsafe) f.get(null);
431     }});
432     } catch (java.security.PrivilegedActionException e) {
433     throw new RuntimeException("Could not initialize intrinsics",
434     e.getCause());
435     }
436     }
437     }
438    
439 dl 1.1 }