ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.9
Committed: Thu Jul 28 19:27:07 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.8: +19 -34 lines
Log Message:
Simplify API

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Random;
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicLong;
11     import java.io.IOException;
12     import java.io.Serializable;
13     import java.io.ObjectInputStream;
14     import java.io.ObjectOutputStream;
15    
16     /**
17     * A set of variables that together maintain a sum. When updates
18 dl 1.3 * (method {@link #add}) are contended across threads, this set of
19     * adder variables may grow dynamically to reduce contention. Method
20     * {@link #sum} returns the current combined total across these
21     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22     * updates may occur while the sum is being calculated), and so cannot
23     * be used alone for fine-grained synchronization control.
24 jsr166 1.2 *
25 dl 1.1 * <p> This class may be applicable when many threads frequently
26     * update a common sum that is used for purposes such as collecting
27     * statistics. In this case, performance may be significantly faster
28     * than using a shared {@link AtomicLong}, at the expense of using
29 dl 1.4 * much more space. On the other hand, if it is known that only one
30     * thread can ever update the sum, performance may be significantly
31     * slower than just updating a local variable.
32 dl 1.1 *
33 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
34     * expected contention level; i.e., the number of threads that are
35     * expected to concurrently update the sum. Supplying an accurate
36     * value may improve performance by reducing the need for dynamic
37     * adjustment.
38     *
39 jsr166 1.2 * @author Doug Lea
40 dl 1.1 */
41     public class StripedAdder implements Serializable {
42     private static final long serialVersionUID = 7249069246863182397L;
43    
44     /*
45 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
46 dl 1.6 * table is indexed by per-thread hash codes.
47 dl 1.3 *
48 dl 1.8 * Table entries are of class Adder; a variant of AtomicLong
49     * padded to reduce cache contention on most processors. Padding
50     * is overkill for most Atomics because they are usually
51     * irregularly scattered in memory and thus don't interfere much
52     * with each other. But Atomic objects residing in arrays will
53     * tend to be placed adjacent to each other, and so will most
54     * often share cache lines (with a huge negative performance
55     * impact) without this precaution.
56     *
57     * Because Adders are relatively large, we avoid creating them
58     * until they are needed. On the other hand, we try to create them
59     * on any sign of contention.
60 dl 1.6 *
61     * Per-thread hash codes are initialized to random values.
62     * Collisions are indicated by failed CASes when performing an add
63     * operation (see method retryAdd). Upon a collision, if the table
64     * size is less than the capacity, it is doubled in size unless
65     * some other thread holds lock. If a hashed slot is empty, and
66     * lock is available, a new Adder is created. Otherwise, if the
67     * slot exists, a CAS is tried. Retries proceed by "double
68     * hashing", using a secondary hash (Marsaglia XorShift) to try to
69     * find a free slot.
70     *
71 dl 1.8 * By default, the table is lazily initialized. Upon first use,
72     * the table is set to size 2 (the minimum non-empty size), but
73     * containing only a single Adder. The maximum table size is
74     * bounded by nearest power of two >= the number of CPUS. The
75     * table size is capped because, when there are more threads than
76     * CPUs, supposing that each thread were bound to a CPU, there
77     * would exist a perfect hash function mapping threads to slots
78     * that eliminates collisions. When we reach capacity, we search
79     * for this mapping by randomly varying the hash codes of
80 dl 1.6 * colliding threads. Because search is random, and failures only
81     * become known via CAS failures, convergence will be slow, and
82     * because threads are typically not bound to CPUS forever, may
83     * not occur at all. However, despite these limitations, observed
84     * contention is typically low in these cases.
85 dl 1.3 *
86     * A single spinlock is used for resizing the table as well as
87 dl 1.8 * populating slots with new Adders. After initialization, there
88     * is no need for a blocking lock: Upon lock contention, threads
89 dl 1.4 * try other slots rather than blocking. After initialization, at
90     * least one slot exists, so retries will eventually find a
91 dl 1.6 * candidate Adder. During these retries, there is increased
92 dl 1.3 * contention and reduced locality, which is still better than
93     * alternatives.
94 dl 1.1 */
95    
96 dl 1.6 private static final int NCPU = Runtime.getRuntime().availableProcessors();
97 dl 1.5
98     /**
99 dl 1.8 * Padded variant of AtomicLong. The value field is placed
100     * between pads, hoping that the JVM doesn't reorder them.
101     * Updates are via inlined CAS in methods add and retryAdd.
102     */
103     static final class Adder {
104     volatile long p0, p1, p2, p3, p4, p5, p6;
105     volatile long value;
106     volatile long q0, q1, q2, q3, q4, q5, q6;
107     Adder(long x) { value = x; }
108     }
109 dl 1.1
110 jsr166 1.2 /**
111 dl 1.6 * Holder for the thread-local hash code. The code is initially
112     * random, but may be set to a different value upon collisions.
113 dl 1.1 */
114     static final class HashCode {
115 dl 1.6 static final Random rng = new Random();
116 dl 1.1 int code;
117 dl 1.6 HashCode() {
118 dl 1.8 int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119     code = (h == 0) ? 1 : h;
120 dl 1.6 }
121 dl 1.1 }
122    
123     /**
124     * The corresponding ThreadLocal class
125     */
126     static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 dl 1.6 public HashCode initialValue() { return new HashCode(); }
128 dl 1.1 }
129    
130     /**
131     * Static per-thread hash codes. Shared across all StripedAdders
132 dl 1.6 * to reduce ThreadLocal pollution and because adjustments due to
133     * collisions in one table are likely to be appropriate for
134     * others.
135 dl 1.1 */
136     static final ThreadHashCode threadHashCode = new ThreadHashCode();
137    
138     /**
139 dl 1.8 * Table of adders. When non-null, size is a power of 2, at least 2.
140 dl 1.1 */
141     private transient volatile Adder[] adders;
142    
143     /**
144 dl 1.8 * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145 dl 1.1 */
146 dl 1.8 private volatile int busy;
147 dl 1.1
148     /**
149 dl 1.3 * Creates a new adder with zero sum.
150 dl 1.1 */
151     public StripedAdder() {
152 dl 1.3 }
153    
154     /**
155     * Creates a new adder with zero sum, and with stripes presized
156     * for the given expected contention level.
157     *
158     * @param expectedContention the expected number of threads that
159     * will concurrently update the sum.
160     */
161     public StripedAdder(int expectedContention) {
162 dl 1.8 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163     int size = 2;
164     while (size < cap)
165     size <<= 1;
166     Adder[] as = new Adder[size];
167     for (int i = 0; i < size; ++i)
168     as[i] = new Adder(0);
169     this.adders = as;
170 dl 1.1 }
171    
172     /**
173     * Adds the given value.
174     *
175     * @param x the value to add
176     */
177     public void add(long x) {
178 dl 1.8 Adder[] as; Adder a; int n; // locals to hold volatile reads
179 dl 1.1 HashCode hc = threadHashCode.get();
180 dl 1.6 int h = hc.code;
181 dl 1.8 boolean collide;
182     if ((as = adders) != null && (n = as.length) > 0 &&
183     (a = as[(n - 1) & h]) != null) {
184     long v = a.value;
185     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186     return;
187     collide = true;
188     }
189     else
190     collide = false;
191     retryAdd(x, hc, collide);
192 dl 1.4 }
193    
194     /**
195     * Handle cases of add involving initialization, resizing,
196 dl 1.6 * creating new Adders, and/or contention. See above for
197 dl 1.8 * explanation. This method suffers the usual non-modularity
198     * problems of optimistic retry code, relying on rechecked sets of
199     * reads.
200 dl 1.4 */
201 dl 1.8 private void retryAdd(long x, HashCode hc, boolean collide) {
202 dl 1.4 int h = hc.code;
203 dl 1.6 for (;;) {
204 dl 1.8 Adder[] as; Adder a; int n;
205     if ((as = adders) != null && (n = as.length) > 0) {
206     if ((a = as[(n - 1) & h]) != null) {
207     boolean shared = true; // Slot exists
208     if (collide && n < NCPU && busy == 0 &&
209     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
210     try {
211     if (adders == as) { // Expand table
212     Adder[] rs = new Adder[n << 1];
213     for (int i = 0; i < n; ++i)
214     rs[i] = as[i];
215     adders = rs;
216     shared = false;
217     }
218     } finally {
219     busy = 0;
220     }
221     if (shared || (h & n) != 0) {
222     collide = false;
223     continue; // Array or index changed
224     }
225     }
226     long v = a.value;
227     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
228     break;
229     collide = shared;
230     }
231     else { // Try to attach new Adder
232     if (busy == 0 &&
233     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
234     boolean created = false;
235     try { // Recheck under lock
236     Adder[] rs; int m, j;
237     if ((rs = adders) != null && (m = rs.length) > 0 &&
238     rs[j = (m - 1) & h] == null) {
239     rs[j] = new Adder(x);
240     created = true;
241     }
242     } finally {
243     busy = 0;
244     }
245     if (created)
246     break;
247     continue; // Slot is now non-empty
248 dl 1.6 }
249 dl 1.8 collide = false;
250 dl 1.6 }
251 dl 1.8 h ^= h << 13; // Rehash
252     h ^= h >>> 17;
253     h ^= h << 5;
254 dl 1.6 }
255 dl 1.8 else if (busy == 0) { // Default-initialize
256     Adder r = new Adder(x);
257     Adder[] rs = new Adder[2];
258     rs[h & 1] = r;
259     if (adders == as && busy == 0 &&
260     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
261     boolean init = false;
262 dl 1.5 try {
263 dl 1.8 if (adders == as) {
264     adders = rs;
265     init = true;
266     }
267 dl 1.5 } finally {
268 dl 1.8 busy = 0;
269 dl 1.5 }
270 dl 1.8 if (init)
271 dl 1.6 break;
272 dl 1.1 }
273 dl 1.5 }
274 dl 1.8 else if (adders == as) // Lost initialization race
275     Thread.yield();
276 dl 1.1 }
277 dl 1.8 hc.code = h; // Record index for next time
278 dl 1.1 }
279    
280     /**
281 dl 1.9 * Equivalent to {@code add(1)}.
282     */
283     public void increment() {
284     add(1L);
285     }
286    
287     /**
288     * Equivalent to {@code add(-1)}.
289     */
290     public void decrement() {
291     add(-1L);
292     }
293    
294     /**
295 dl 1.1 * Returns an estimate of the current sum. The result is
296     * calculated by summing multiple variables, so may not be
297     * accurate if updates occur concurrently with this method.
298 jsr166 1.2 *
299 dl 1.1 * @return the estimated sum
300     */
301     public long sum() {
302 dl 1.4 long sum = 0L;
303 dl 1.1 Adder[] as = adders;
304 dl 1.4 if (as != null) {
305     int n = as.length;
306     for (int i = 0; i < n; ++i) {
307     Adder a = as[i];
308     if (a != null)
309 dl 1.8 sum += a.value;
310 dl 1.4 }
311 dl 1.1 }
312     return sum;
313     }
314    
315     /**
316 dl 1.9 * Resets each of the variables to zero, returning the estimated
317     * previous sum. This is effective in fully resetting the sum only
318     * if there are no concurrent updates.
319 dl 1.1 *
320 dl 1.9 * @return the estimated previous sum
321 dl 1.1 */
322 dl 1.9 public long reset() {
323 dl 1.4 long sum = 0L;
324 dl 1.1 Adder[] as = adders;
325 dl 1.4 if (as != null) {
326     int n = as.length;
327     for (int i = 0; i < n; ++i) {
328     Adder a = as[i];
329     if (a != null) {
330 dl 1.8 sum += a.value;
331     a.value = 0L;
332 dl 1.4 }
333 dl 1.1 }
334     }
335     return sum;
336     }
337    
338     private void writeObject(java.io.ObjectOutputStream s)
339     throws java.io.IOException {
340     s.defaultWriteObject();
341     s.writeLong(sum());
342     }
343    
344     private void readObject(ObjectInputStream s)
345     throws IOException, ClassNotFoundException {
346     s.defaultReadObject();
347 dl 1.8 busy = 0;
348 dl 1.4 add(s.readLong());
349 dl 1.1 }
350    
351 dl 1.8 // Unsafe mechanics
352     private static final sun.misc.Unsafe UNSAFE;
353     private static final long busyOffset;
354     private static final long valueOffset;
355     static {
356     try {
357     UNSAFE = getUnsafe();
358     Class<?> sk = StripedAdder.class;
359     busyOffset = UNSAFE.objectFieldOffset
360     (sk.getDeclaredField("busy"));
361     Class<?> ak = Adder.class;
362     valueOffset = UNSAFE.objectFieldOffset
363     (ak.getDeclaredField("value"));
364     } catch (Exception e) {
365     throw new Error(e);
366     }
367     }
368    
369     /**
370     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
371     * Replace with a simple call to Unsafe.getUnsafe when integrating
372     * into a jdk.
373     *
374     * @return a sun.misc.Unsafe
375     */
376     private static sun.misc.Unsafe getUnsafe() {
377     try {
378     return sun.misc.Unsafe.getUnsafe();
379     } catch (SecurityException se) {
380     try {
381     return java.security.AccessController.doPrivileged
382     (new java.security
383     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
384     public sun.misc.Unsafe run() throws Exception {
385     java.lang.reflect.Field f = sun.misc
386     .Unsafe.class.getDeclaredField("theUnsafe");
387     f.setAccessible(true);
388     return (sun.misc.Unsafe) f.get(null);
389     }});
390     } catch (java.security.PrivilegedActionException e) {
391     throw new RuntimeException("Could not initialize intrinsics",
392     e.getCause());
393     }
394     }
395     }
396    
397 dl 1.1 }