ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.7
Committed: Tue Jul 26 18:30:35 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.6: +21 -10 lines
Log Message:
Use common empty placeholder

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Arrays;
9     import java.util.Random;
10     import java.util.concurrent.atomic.AtomicInteger;
11     import java.util.concurrent.atomic.AtomicLong;
12     import java.io.IOException;
13     import java.io.Serializable;
14     import java.io.ObjectInputStream;
15     import java.io.ObjectOutputStream;
16    
17     /**
18     * A set of variables that together maintain a sum. When updates
19 dl 1.3 * (method {@link #add}) are contended across threads, this set of
20     * adder variables may grow dynamically to reduce contention. Method
21     * {@link #sum} returns the current combined total across these
22     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23     * updates may occur while the sum is being calculated), and so cannot
24     * be used alone for fine-grained synchronization control.
25 jsr166 1.2 *
26 dl 1.1 * <p> This class may be applicable when many threads frequently
27     * update a common sum that is used for purposes such as collecting
28     * statistics. In this case, performance may be significantly faster
29     * than using a shared {@link AtomicLong}, at the expense of using
30 dl 1.4 * much more space. On the other hand, if it is known that only one
31     * thread can ever update the sum, performance may be significantly
32     * slower than just updating a local variable.
33 dl 1.1 *
34 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
35     * expected contention level; i.e., the number of threads that are
36     * expected to concurrently update the sum. Supplying an accurate
37     * value may improve performance by reducing the need for dynamic
38     * adjustment.
39     *
40 jsr166 1.2 * @author Doug Lea
41 dl 1.1 */
42     public class StripedAdder implements Serializable {
43     private static final long serialVersionUID = 7249069246863182397L;
44    
45     /*
46 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
47 dl 1.6 * table is indexed by per-thread hash codes.
48 dl 1.3 *
49 dl 1.6 * By default, the table is lazily initialized, to minimize
50     * footprint until adders are used. On first use, the table is set
51     * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
52     * bounded by the number of CPUS (if larger than the default
53     * size).
54     *
55     * Per-thread hash codes are initialized to random values.
56     * Collisions are indicated by failed CASes when performing an add
57     * operation (see method retryAdd). Upon a collision, if the table
58     * size is less than the capacity, it is doubled in size unless
59     * some other thread holds lock. If a hashed slot is empty, and
60     * lock is available, a new Adder is created. Otherwise, if the
61     * slot exists, a CAS is tried. Retries proceed by "double
62     * hashing", using a secondary hash (Marsaglia XorShift) to try to
63     * find a free slot.
64     *
65     * The table size is capped because, when there are more threads
66     * than CPUs, supposing that each thread were bound to a CPU,
67     * there would exist a perfect hash function mapping threads to
68     * slots that eliminates collisions. When we reach capacity, we
69     * search for this mapping by randomly varying the hash codes of
70     * colliding threads. Because search is random, and failures only
71     * become known via CAS failures, convergence will be slow, and
72     * because threads are typically not bound to CPUS forever, may
73     * not occur at all. However, despite these limitations, observed
74     * contention is typically low in these cases.
75 dl 1.3 *
76     * Table entries are of class Adder; a form of AtomicLong padded
77     * to reduce cache contention on most processors. Padding is
78 dl 1.6 * overkill for most Atomics because they are usually irregularly
79     * scattered in memory and thus don't interfere much with each
80     * other. But Atomic objects residing in arrays will tend to be
81     * placed adjacent to each other, and so will most often share
82     * cache lines without this precaution. Adders are by default
83 dl 1.4 * constructed upon first use, which further improves per-thread
84 dl 1.6 * locality and helps reduce footprint.
85 dl 1.3 *
86     * A single spinlock is used for resizing the table as well as
87 dl 1.1 * populating slots with new Adders. Upon lock contention, threads
88 dl 1.4 * try other slots rather than blocking. After initialization, at
89     * least one slot exists, so retries will eventually find a
90 dl 1.6 * candidate Adder. During these retries, there is increased
91 dl 1.3 * contention and reduced locality, which is still better than
92     * alternatives.
93 dl 1.1 */
94    
95 jsr166 1.2 /**
96 dl 1.6 * Padded version of AtomicLong
97 dl 1.1 */
98 dl 1.6 static final class Adder extends AtomicLong {
99     long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
100     Adder(long x) { super(x); }
101     }
102 dl 1.1
103 dl 1.6 private static final int NCPU = Runtime.getRuntime().availableProcessors();
104 dl 1.5
105     /**
106 dl 1.6 * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
107     * first use under default constructor, and must be a power of
108     * two. There is not much point in making size a lot smaller than
109     * that of Adders though. CAP is the maximum allowed table size.
110 dl 1.1 */
111 dl 1.6 private static final int DEFAULT_INITIAL_SIZE = 8;
112     private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
113 dl 1.1
114 jsr166 1.2 /**
115 dl 1.6 * Holder for the thread-local hash code. The code is initially
116     * random, but may be set to a different value upon collisions.
117 dl 1.1 */
118     static final class HashCode {
119 dl 1.6 static final Random rng = new Random();
120 dl 1.1 int code;
121 dl 1.6 HashCode() {
122     int h = rng.nextInt();
123     code = (h == 0) ? 1 : h; // ensure nonzero
124     }
125 dl 1.1 }
126    
127     /**
128     * The corresponding ThreadLocal class
129     */
130     static final class ThreadHashCode extends ThreadLocal<HashCode> {
131 dl 1.6 public HashCode initialValue() { return new HashCode(); }
132 dl 1.1 }
133    
134     /**
135     * Static per-thread hash codes. Shared across all StripedAdders
136 dl 1.6 * to reduce ThreadLocal pollution and because adjustments due to
137     * collisions in one table are likely to be appropriate for
138     * others.
139 dl 1.1 */
140     static final ThreadHashCode threadHashCode = new ThreadHashCode();
141    
142     /**
143 dl 1.7 * Common placeholder for empty arrays.
144     */
145     static final Adder[] EMPTY_ARRAY = new Adder[0];
146    
147     /**
148     * Table of adders. Size is either zero or a power of two, grows
149     * to be at most CAP.
150 dl 1.1 */
151     private transient volatile Adder[] adders;
152    
153     /**
154     * Serves as a lock when resizing and/or creating Adders. There
155 dl 1.5 * is no need for a blocking lock: Except during initialization
156 dl 1.6 * races, when busy, other threads try other slots. However,
157     * during (double-checked) initializations, we use the
158     * "synchronized" lock on this object.
159 dl 1.1 */
160     private final AtomicInteger mutex;
161    
162     /**
163 dl 1.3 * Creates a new adder with zero sum.
164 dl 1.1 */
165     public StripedAdder() {
166 dl 1.7 this.adders = EMPTY_ARRAY;
167 dl 1.4 this.mutex = new AtomicInteger();
168     // remaining initialization on first call to add.
169 dl 1.3 }
170    
171     /**
172     * Creates a new adder with zero sum, and with stripes presized
173     * for the given expected contention level.
174     *
175     * @param expectedContention the expected number of threads that
176     * will concurrently update the sum.
177     */
178     public StripedAdder(int expectedContention) {
179 dl 1.7 if (expectedContention > 0) {
180     int cap = (expectedContention < CAP) ? expectedContention : CAP;
181     int size = 1;
182     while (size < cap)
183     size <<= 1;
184     Adder[] as = new Adder[size];
185     for (int i = 0; i < size; ++i)
186     as[i] = new Adder(0);
187     this.adders = as;
188     }
189     else
190     this.adders = EMPTY_ARRAY;
191 dl 1.1 this.mutex = new AtomicInteger();
192     }
193    
194     /**
195     * Adds the given value.
196     *
197     * @param x the value to add
198     */
199     public void add(long x) {
200 dl 1.4 Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
201 dl 1.1 HashCode hc = threadHashCode.get();
202 dl 1.6 int h = hc.code;
203 dl 1.4 if ((as = adders) == null || (n = as.length) < 1 ||
204 dl 1.6 (a = as[(n - 1) & h]) == null ||
205 dl 1.4 !a.compareAndSet(v = a.get(), v + x))
206     retryAdd(x, hc);
207     }
208    
209     /**
210     * Handle cases of add involving initialization, resizing,
211 dl 1.6 * creating new Adders, and/or contention. See above for
212     * explanation.
213 dl 1.4 */
214     private void retryAdd(long x, HashCode hc) {
215     int h = hc.code;
216     final AtomicInteger mutex = this.mutex;
217 dl 1.6 int collisions = 1 - mutex.get(); // first guess: collides if not locked
218     for (;;) {
219     Adder[] as; Adder a; long v; int k, n;
220     while ((as = adders) == null || (n = as.length) < 1) {
221     synchronized(mutex) { // Try to initialize
222 dl 1.7 if (adders == as) {
223 dl 1.6 Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
224     rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
225     adders = rs;
226     }
227     }
228     collisions = 0;
229     }
230    
231     if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
232 dl 1.5 if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
233     try {
234 dl 1.6 if (adders == as && as[k] == null)
235     a = as[k] = new Adder(x);
236 dl 1.5 } finally {
237     mutex.set(0);
238     }
239 dl 1.6 if (a != null)
240     break;
241 dl 1.1 }
242 dl 1.6 collisions = 0;
243 dl 1.5 }
244 dl 1.6 else if (collisions != 0 && n < CAP && // Try to expand table
245 dl 1.5 mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
246     try {
247     if (adders == as) {
248 dl 1.6 Adder[] rs = new Adder[n << 1];
249     for (int i = 0; i < n; ++i)
250     rs[i] = as[i];
251     adders = rs;
252 dl 1.1 }
253 dl 1.5 } finally {
254     mutex.set(0);
255 dl 1.1 }
256 dl 1.6 collisions = 0;
257 dl 1.5 }
258 dl 1.6 else if (a.compareAndSet(v = a.get(), v + x))
259     break;
260     else
261     collisions = 1;
262     h ^= h << 13; // Rehash
263     h ^= h >>> 17;
264     h ^= h << 5;
265 dl 1.1 }
266 dl 1.5 hc.code = h;
267 dl 1.1 }
268    
269     /**
270     * Returns an estimate of the current sum. The result is
271     * calculated by summing multiple variables, so may not be
272     * accurate if updates occur concurrently with this method.
273 jsr166 1.2 *
274 dl 1.1 * @return the estimated sum
275     */
276     public long sum() {
277 dl 1.4 long sum = 0L;
278 dl 1.1 Adder[] as = adders;
279 dl 1.4 if (as != null) {
280     int n = as.length;
281     for (int i = 0; i < n; ++i) {
282     Adder a = as[i];
283     if (a != null)
284     sum += a.get();
285     }
286 dl 1.1 }
287     return sum;
288     }
289    
290     /**
291     * Resets each of the variables to zero. This is effective in
292     * fully resetting the sum only if there are no concurrent
293     * updates.
294     */
295     public void reset() {
296     Adder[] as = adders;
297 dl 1.4 if (as != null) {
298     int n = as.length;
299     for (int i = 0; i < n; ++i) {
300     Adder a = as[i];
301     if (a != null)
302     a.set(0L);
303     }
304 dl 1.1 }
305     }
306    
307     /**
308     * Equivalent to {@code add(1)}.
309     */
310     public void increment() {
311     add(1L);
312     }
313    
314     /**
315     * Equivalent to {@code add(-1)}.
316     */
317     public void decrement() {
318     add(-1L);
319     }
320    
321     /**
322     * Equivalent to {@link #sum} followed by {@link #reset}.
323     *
324     * @return the estimated sum
325     */
326     public long sumAndReset() {
327 dl 1.4 long sum = 0L;
328 dl 1.1 Adder[] as = adders;
329 dl 1.4 if (as != null) {
330     int n = as.length;
331     for (int i = 0; i < n; ++i) {
332     Adder a = as[i];
333     if (a != null) {
334     sum += a.get();
335     a.set(0L);
336     }
337 dl 1.1 }
338     }
339     return sum;
340     }
341    
342     private void writeObject(java.io.ObjectOutputStream s)
343     throws java.io.IOException {
344     s.defaultWriteObject();
345     s.writeLong(sum());
346     }
347    
348     private void readObject(ObjectInputStream s)
349     throws IOException, ClassNotFoundException {
350     s.defaultReadObject();
351     mutex.set(0);
352 dl 1.4 add(s.readLong());
353 dl 1.1 }
354    
355     }