ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/LongAdder.java
Revision: 1.4
Committed: Mon Aug 1 18:54:15 2011 UTC (12 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +2 -2 lines
Log Message:
<code> => @code

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Random;
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicLong;
11     import java.io.IOException;
12     import java.io.Serializable;
13     import java.io.ObjectInputStream;
14     import java.io.ObjectOutputStream;
15    
16     /**
17     * One or more variables that together maintain an initially zero sum.
18     * When updates (method {@link #add}) are contended across threads,
19     * the set of variables may grow dynamically to reduce contention.
20     *
21     * <p> This class is usually preferable to {@link AtomicLong} when
22     * multiple threads update a common sum that is used for purposes such
23     * as collecting statistics, not for fine-grained synchronization
24     * control. Under low update contention, the two classes have similar
25     * characteristics. But under high contention, expected throughput of
26     * this class is significantly higher, at the expense of higher space
27     * consumption.
28     *
29     * <p> Method {@link #sum} returns the current combined total across
30     * the variables maintaining the sum. This value is <em>NOT</em> an
31 jsr166 1.4 * atomic snapshot: Invocation of {@code sum} in the absence of
32 dl 1.1 * concurrent updates returns an accurate result, but concurrent
33     * updates that occur while the sum is being calculated might not be
34 jsr166 1.4 * incorporated. The sum may also be {@code reset} to zero, as
35 dl 1.1 * an alternative to creating a new adder. However, method {@link
36     * #reset} is intrinsically racy, so should only be used when it is
37     * known that no threads are concurrently updating the sum.
38     *
39     * <p><em>jsr166e note: This class is targeted to be placed in
40     * java.util.concurrent.atomic<em>
41     *
42     * @author Doug Lea
43     */
44     public class LongAdder implements Serializable {
45     private static final long serialVersionUID = 7249069246863182397L;
46    
47     /*
48     * A LongAdder maintains a lazily-initialized table of atomically
49     * updated variables, plus an extra "base" field. The table size
50     * is a power of two. Indexing uses masked per-thread hash codes
51     *
52     * Table entries are of class Cell; a variant of AtomicLong padded
53     * to reduce cache contention on most processors. Padding is
54     * overkill for most Atomics because they are usually irregularly
55     * scattered in memory and thus don't interfere much with each
56     * other. But Atomic objects residing in arrays will tend to be
57     * placed adjacent to each other, and so will most often share
58     * cache lines (with a huge negative performance impact) without
59     * this precaution.
60     *
61     * In part because Cells are relatively large, we avoid creating
62     * them until they are needed. When there is no contention, all
63     * updates are made to the base field. Upon first contention (a
64     * failed CAS on base update), the table is initialized to size 2.
65     * The table size is doubled upon further contention until
66     * reaching the nearest power of two greater than or equal to the
67     * number of CPUS.
68     *
69     * Per-thread hash codes are initialized to random values.
70     * Contention and/or table collisions are indicated by failed
71     * CASes when performing an add operation (see method
72     * retryAdd). Upon a collision, if the table size is less than the
73     * capacity, it is doubled in size unless some other thread holds
74     * the lock. If a hashed slot is empty, and lock is available, a
75     * new Cell is created. Otherwise, if the slot exists, a CAS is
76     * tried. Retries proceed by "double hashing", using a secondary
77     * hash (Marsaglia XorShift) to try to find a free slot.
78     *
79     * The table size is capped because, when there are more threads
80     * than CPUs, supposing that each thread were bound to a CPU,
81     * there would exist a perfect hash function mapping threads to
82     * slots that eliminates collisions. When we reach capacity, we
83     * search for this mapping by randomly varying the hash codes of
84     * colliding threads. Because search is random, and collisions
85     * only become known via CAS failures, convergence can be slow,
86     * and because threads are typically not bound to CPUS forever,
87     * may not occur at all. However, despite these limitations,
88     * observed contention rates are typically low in these cases.
89     *
90     * A single spinlock is used for initializing and resizing the
91     * table, as well as populating slots with new Cells. There is no
92     * need for a blocking lock: Upon lock contention, threads try
93     * other slots (or the base) rather than blocking. During these
94     * retries, there is increased contention and reduced locality,
95     * which is still better than alternatives.
96     *
97     * It is possible for a Cell to become unused when threads that
98     * once hashed to it terminate, as well as in the case where
99     * doubling the table causes no thread to hash to it under
100     * expanded mask. We do not try to detect or remove such cells,
101     * under the assumption that for long-running adders, observed
102     * contention levels will recur, so the cells will eventually be
103     * needed again; and for short-lived ones, it does not matter.
104     *
105     * JVM intrinsics note: It would be possible to use a release-only
106     * form of CAS here, if it were provided.
107     */
108    
109     /**
110     * Padded variant of AtomicLong. The value field is placed
111     * between pads, hoping that the JVM doesn't reorder them.
112     * Updates are via inlined CAS in methods add and retryAdd.
113     */
114     static final class Cell {
115     volatile long p0, p1, p2, p3, p4, p5, p6;
116     volatile long value;
117     volatile long q0, q1, q2, q3, q4, q5, q6;
118     Cell(long x) { value = x; }
119     }
120    
121     /**
122     * Holder for the thread-local hash code. The code is initially
123     * random, but may be set to a different value upon collisions.
124     */
125     static final class HashCode {
126     static final Random rng = new Random();
127     int code;
128     HashCode() {
129     int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
130     code = (h == 0) ? 1 : h;
131     }
132     }
133 jsr166 1.2
134 dl 1.1 /**
135     * The corresponding ThreadLocal class
136     */
137     static final class ThreadHashCode extends ThreadLocal<HashCode> {
138     public HashCode initialValue() { return new HashCode(); }
139     }
140    
141     /**
142     * Static per-thread hash codes. Shared across all LongAdders
143     * to reduce ThreadLocal pollution and because adjustments due to
144     * collisions in one table are likely to be appropriate for
145     * others.
146     */
147     static final ThreadHashCode threadHashCode = new ThreadHashCode();
148    
149 jsr166 1.3 /** Number of CPUS, to place bound on table size */
150 dl 1.1 private static final int NCPU = Runtime.getRuntime().availableProcessors();
151    
152     /**
153     * Table of cells. When non-null, size is a power of 2.
154     */
155     private transient volatile Cell[] cells;
156    
157     /**
158     * Base sum, used mainly when there is no contention, but also as
159 jsr166 1.3 * a fallback during table initialization races. Updated via CAS.
160 dl 1.1 */
161     private transient volatile long base;
162    
163     /**
164     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
165     */
166     private transient volatile int busy;
167    
168     /**
169     * Creates a new adder with initial sum of zero.
170     */
171     public LongAdder() {
172     }
173    
174     /**
175     * Adds the given value.
176     *
177     * @param x the value to add
178     */
179     public void add(long x) {
180     Cell[] as; long v; HashCode hc; Cell a; int n;
181     if ((as = cells) != null ||
182     !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
183     boolean uncontended = true;
184     int h = (hc = threadHashCode.get()).code;
185     if (as == null || (n = as.length) < 1 ||
186     (a = as[(n - 1) & h]) == null ||
187     !(uncontended = UNSAFE.compareAndSwapLong(a, valueOffset,
188     v = a.value, v + x)))
189     retryAdd(x, hc, uncontended);
190     }
191     }
192    
193     /**
194     * Handle cases of add involving initialization, resizing,
195     * creating new Cells, and/or contention. See above for
196     * explanation. This method suffers the usual non-modularity
197     * problems of optimistic retry code, relying on rechecked sets of
198     * reads.
199     *
200     * @param x the value to add
201     * @param hc the hash code holder
202     * @param wasUncontended false if CAS failed before call
203     */
204     private void retryAdd(long x, HashCode hc, boolean wasUncontended) {
205     int h = hc.code;
206     boolean collide = false; // True if last slot nonempty
207     for (;;) {
208     Cell[] as; Cell a; int n; long v;
209     if ((as = cells) != null && (n = as.length) > 0) {
210     if ((a = as[(n - 1) & h]) == null) {
211     if (busy == 0) { // Try to attach new Cell
212     Cell r = new Cell(x); // Optimistically create
213     if (busy == 0 &&
214     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
215     boolean created = false;
216     try { // Recheck under lock
217     Cell[] rs; int m, j;
218     if ((rs = cells) != null &&
219     (m = rs.length) > 0 &&
220     rs[j = (m - 1) & h] == null) {
221     rs[j] = r;
222     created = true;
223     }
224     } finally {
225     busy = 0;
226     }
227     if (created)
228     break;
229     continue; // Slot is now non-empty
230     }
231     }
232     collide = false;
233     }
234     else if (!wasUncontended) // CAS already known to fail
235     wasUncontended = true; // Continue after rehash
236 jsr166 1.2 else if (UNSAFE.compareAndSwapLong(a, valueOffset,
237 dl 1.1 v = a.value, v + x))
238     break;
239     else if (n >= NCPU || cells != as)
240     collide = false; // At max size or stale
241     else if (!collide)
242     collide = true;
243     else if (busy == 0 && // Try to expand table
244     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
245     try {
246     if (cells == as) {
247     Cell[] rs = new Cell[n << 1];
248     for (int i = 0; i < n; ++i)
249     rs[i] = as[i];
250     cells = rs;
251     }
252     } finally {
253     busy = 0;
254     }
255     collide = false;
256     continue; // Retry with expanded table
257     }
258     h ^= h << 13; // Rehash
259     h ^= h >>> 17;
260     h ^= h << 5;
261     }
262     else if (busy == 0 && cells == as &&
263     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
264     boolean init = false;
265     try { // Initialize table
266     if (cells == as) {
267     Cell[] rs = new Cell[2];
268     rs[h & 1] = new Cell(x);
269     cells = rs;
270     init = true;
271     }
272     } finally {
273     busy = 0;
274     }
275     if (init)
276     break;
277     }
278 jsr166 1.2 else if (UNSAFE.compareAndSwapLong(this, baseOffset,
279 dl 1.1 v = base, v + x))
280     break; // Fall back on using base
281     }
282     hc.code = h; // Record index for next time
283     }
284    
285     /**
286     * Equivalent to {@code add(1)}.
287     */
288     public void increment() {
289     add(1L);
290     }
291    
292     /**
293     * Equivalent to {@code add(-1)}.
294     */
295     public void decrement() {
296     add(-1L);
297     }
298    
299     /**
300     * Returns the current sum. The result is only guaranteed to be
301     * accurate in the absence of concurrent updates. Otherwise, it
302     * may fail to reflect one or more updates occuring while
303     * calculating the result.
304     *
305     * @return the sum
306     */
307     public long sum() {
308     Cell[] as = cells;
309     long sum = base;
310     if (as != null) {
311     int n = as.length;
312     for (int i = 0; i < n; ++i) {
313     Cell a = as[i];
314     if (a != null)
315     sum += a.value;
316     }
317     }
318     return sum;
319     }
320    
321     /**
322     * Resets variables maintaining the sum to zero. This is
323     * effective in setting the sum to zero only if there are no
324     * concurrent updates.
325     */
326     public void reset() {
327     Cell[] as = cells;
328     base = 0L;
329     if (as != null) {
330     int n = as.length;
331     for (int i = 0; i < n; ++i) {
332     Cell a = as[i];
333     if (a != null)
334     a.value = 0L;
335     }
336     }
337     }
338    
339     /**
340     * Equivalent in effect to {@link #sum} followed by {@link
341     * #reset}. This method may apply for example during quiescent
342     * points between multithreaded computations. If there are
343     * updates concurrent with this method, the returned value is
344     * <em>not</em> guaranteed to be the final sum occurring before
345     * the reset.
346     *
347     * @return the sum
348     */
349     public long sumThenReset() {
350     Cell[] as = cells;
351     long sum = base;
352     base = 0L;
353     if (as != null) {
354     int n = as.length;
355     for (int i = 0; i < n; ++i) {
356     Cell a = as[i];
357     if (a != null) {
358     sum += a.value;
359     a.value = 0L;
360     }
361     }
362     }
363     return sum;
364     }
365    
366     private void writeObject(java.io.ObjectOutputStream s)
367     throws java.io.IOException {
368     s.defaultWriteObject();
369     s.writeLong(sum());
370     }
371    
372     private void readObject(ObjectInputStream s)
373     throws IOException, ClassNotFoundException {
374     s.defaultReadObject();
375     busy = 0;
376     cells = null;
377     base = s.readLong();
378     }
379    
380     // Unsafe mechanics
381     private static final sun.misc.Unsafe UNSAFE;
382     private static final long baseOffset;
383     private static final long busyOffset;
384     private static final long valueOffset;
385     static {
386     try {
387     UNSAFE = getUnsafe();
388     Class<?> sk = LongAdder.class;
389     baseOffset = UNSAFE.objectFieldOffset
390     (sk.getDeclaredField("base"));
391     busyOffset = UNSAFE.objectFieldOffset
392     (sk.getDeclaredField("busy"));
393     Class<?> ak = Cell.class;
394     valueOffset = UNSAFE.objectFieldOffset
395     (ak.getDeclaredField("value"));
396     } catch (Exception e) {
397     throw new Error(e);
398     }
399     }
400    
401     /**
402     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
403     * Replace with a simple call to Unsafe.getUnsafe when integrating
404     * into a jdk.
405     *
406     * @return a sun.misc.Unsafe
407     */
408     private static sun.misc.Unsafe getUnsafe() {
409     try {
410     return sun.misc.Unsafe.getUnsafe();
411     } catch (SecurityException se) {
412     try {
413     return java.security.AccessController.doPrivileged
414     (new java.security
415     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
416     public sun.misc.Unsafe run() throws Exception {
417     java.lang.reflect.Field f = sun.misc
418     .Unsafe.class.getDeclaredField("theUnsafe");
419     f.setAccessible(true);
420     return (sun.misc.Unsafe) f.get(null);
421     }});
422     } catch (java.security.PrivilegedActionException e) {
423     throw new RuntimeException("Could not initialize intrinsics",
424     e.getCause());
425     }
426     }
427     }
428    
429     }