ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.4
Committed: Sat Jul 23 16:32:53 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.3: +87 -78 lines
Log Message:
Lazier initialization

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Arrays;
9     import java.util.Random;
10     import java.util.concurrent.atomic.AtomicInteger;
11     import java.util.concurrent.atomic.AtomicLong;
12     import java.io.IOException;
13     import java.io.Serializable;
14     import java.io.ObjectInputStream;
15     import java.io.ObjectOutputStream;
16    
17     /**
18     * A set of variables that together maintain a sum. When updates
19 dl 1.3 * (method {@link #add}) are contended across threads, this set of
20     * adder variables may grow dynamically to reduce contention. Method
21     * {@link #sum} returns the current combined total across these
22     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23     * updates may occur while the sum is being calculated), and so cannot
24     * be used alone for fine-grained synchronization control.
25 jsr166 1.2 *
26 dl 1.1 * <p> This class may be applicable when many threads frequently
27     * update a common sum that is used for purposes such as collecting
28     * statistics. In this case, performance may be significantly faster
29     * than using a shared {@link AtomicLong}, at the expense of using
30 dl 1.4 * much more space. On the other hand, if it is known that only one
31     * thread can ever update the sum, performance may be significantly
32     * slower than just updating a local variable.
33 dl 1.1 *
34 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
35     * expected contention level; i.e., the number of threads that are
36     * expected to concurrently update the sum. Supplying an accurate
37     * value may improve performance by reducing the need for dynamic
38     * adjustment.
39     *
40 jsr166 1.2 * @author Doug Lea
41 dl 1.1 */
42     public class StripedAdder implements Serializable {
43     private static final long serialVersionUID = 7249069246863182397L;
44    
45     /*
46 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
47 dl 1.3 * table is indexed by per-thread hash codes that are initialized
48     * to random values.
49     *
50     * The table doubles in size upon contention (as indicated by
51     * failed CASes when performing add()), but is capped at the
52     * nearest power of two >= #CPUS. This reflects the idea that,
53     * when there are more threads than CPUs, then if each thread were
54     * bound to a CPU, there would exist a perfect hash function
55     * mapping threads to slots that eliminates collisions. When we
56     * reach capacity, we search for this mapping by randomly varying
57     * the hash codes of colliding threads. Because search is random,
58     * and failures only become known via CAS failures, convergence
59     * will be slow, and because threads are typically not bound to
60     * CPUS forever, may not occur at all. However, despite these
61 dl 1.4 * limitations, observed contention is typically low in these
62 dl 1.3 * cases.
63     *
64     * Table entries are of class Adder; a form of AtomicLong padded
65     * to reduce cache contention on most processors. Padding is
66     * overkill for most Atomics because they are most often
67     * irregularly scattered in memory and thus don't interfere much
68     * with each other. But Atomic objects residing in arrays will
69     * tend to be placed adjacent to each other, and so will most
70 dl 1.4 * often share cache lines without this precaution. Adders are
71     * constructed upon first use, which further improves per-thread
72     * locality and helps reduce (an already large) footprint.
73 dl 1.3 *
74     * A single spinlock is used for resizing the table as well as
75 dl 1.1 * populating slots with new Adders. Upon lock contention, threads
76 dl 1.4 * try other slots rather than blocking. After initialization, at
77     * least one slot exists, so retries will eventually find a
78 dl 1.3 * candidate Adder. During these retries, there is increased
79     * contention and reduced locality, which is still better than
80     * alternatives.
81 dl 1.1 */
82    
83 jsr166 1.2 /**
84 dl 1.1 * Number of processors, to place a cap on table growth.
85     */
86     static final int NCPU = Runtime.getRuntime().availableProcessors();
87    
88     /**
89 dl 1.3 * Padded version of AtomicLong
90 dl 1.1 */
91     static final class Adder extends AtomicLong {
92     long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
93     Adder(long x) { super(x); }
94     }
95    
96 jsr166 1.2 /**
97 dl 1.3 * Holder for the thread-local hash code. The code starts off with
98 dl 1.4 * a given random value, but may be set to a different value upon
99     * collisions in retryAdd.
100 dl 1.1 */
101     static final class HashCode {
102     int code;
103     HashCode(int h) { code = h; }
104     }
105    
106     /**
107     * The corresponding ThreadLocal class
108     */
109     static final class ThreadHashCode extends ThreadLocal<HashCode> {
110     static final Random rng = new Random();
111 jsr166 1.2 public HashCode initialValue() {
112 dl 1.1 int h = rng.nextInt();
113     return new HashCode((h == 0) ? 1 : h); // ensure nonzero
114     }
115     }
116    
117     /**
118     * Static per-thread hash codes. Shared across all StripedAdders
119     * because adjustments due to collisions in one table are likely
120     * to be appropriate for others.
121     */
122     static final ThreadHashCode threadHashCode = new ThreadHashCode();
123    
124     /**
125 dl 1.3 * Table of adders. Minimum size 2. Size grows to be at most NCPU.
126 dl 1.1 */
127     private transient volatile Adder[] adders;
128    
129     /**
130     * Serves as a lock when resizing and/or creating Adders. There
131     * is no need for a blocking lock: When busy, other threads try
132     * other slots.
133     */
134     private final AtomicInteger mutex;
135    
136     /**
137 dl 1.3 * Creates a new adder with zero sum.
138 dl 1.1 */
139     public StripedAdder() {
140 dl 1.4 this.mutex = new AtomicInteger();
141     // remaining initialization on first call to add.
142 dl 1.3 }
143    
144     /**
145     * Creates a new adder with zero sum, and with stripes presized
146     * for the given expected contention level.
147     *
148     * @param expectedContention the expected number of threads that
149     * will concurrently update the sum.
150     */
151     public StripedAdder(int expectedContention) {
152     int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
153     int size = 2;
154     while (size < cap)
155     size <<= 1;
156     Adder[] as = new Adder[size];
157 dl 1.4 for (int i = 0; i < size; ++i)
158     as[i] = new Adder(0);
159 dl 1.1 this.adders = as;
160     this.mutex = new AtomicInteger();
161     }
162    
163     /**
164     * Adds the given value.
165     *
166     * @param x the value to add
167     */
168     public void add(long x) {
169 dl 1.4 Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
170 dl 1.1 HashCode hc = threadHashCode.get();
171 dl 1.4 if ((as = adders) == null || (n = as.length) < 1 ||
172     (a = as[hc.code & (n - 1)]) == null ||
173     !a.compareAndSet(v = a.get(), v + x))
174     retryAdd(x, hc);
175     }
176    
177     /**
178     * Handle cases of add involving initialization, resizing,
179     * creating new Adders, and/or contention.
180     */
181     private void retryAdd(long x, HashCode hc) {
182     int h = hc.code;
183     final AtomicInteger mutex = this.mutex;
184     AtomicInteger lock = null; // nonnull when held
185     try {
186     for (;;) {
187     Adder[] as; Adder a; long v; int n, k; // locals for volatiles
188     boolean needLock = true;
189     if ((as = adders) == null || (n = as.length) < 1) {
190     if (lock != null) // default-initialize
191     adders = new Adder[2];
192 dl 1.1 }
193 dl 1.4 else if ((a = as[k = h & (n - 1)]) == null) {
194     if (lock != null) { // attach new adder
195     as[k] = new Adder(x);
196     break;
197 dl 1.1 }
198     }
199 dl 1.4 else if (a.compareAndSet(v = a.get(), v + x))
200 dl 1.1 break;
201 dl 1.4 else if (n >= NCPU) // cannot expand
202     needLock = false;
203     else if (lock != null) // expand table
204     adders = Arrays.copyOf(as, n << 1);
205    
206     if (lock == null) {
207     if (needLock && mutex.get() == 0 &&
208     mutex.compareAndSet(0, 1))
209     lock = mutex;
210     else { // try elsewhere
211     h ^= h << 13; // Marsaglia XorShift
212     h ^= h >>> 17;
213     h ^= h << 5;
214     }
215 dl 1.1 }
216     }
217 dl 1.4 } finally {
218     if (lock != null)
219     lock.set(0);
220 dl 1.1 }
221 dl 1.4 if (hc.code != h) // avoid unneeded writes
222     hc.code = h;
223 dl 1.1 }
224    
225     /**
226     * Returns an estimate of the current sum. The result is
227     * calculated by summing multiple variables, so may not be
228     * accurate if updates occur concurrently with this method.
229 jsr166 1.2 *
230 dl 1.1 * @return the estimated sum
231     */
232     public long sum() {
233 dl 1.4 long sum = 0L;
234 dl 1.1 Adder[] as = adders;
235 dl 1.4 if (as != null) {
236     int n = as.length;
237     for (int i = 0; i < n; ++i) {
238     Adder a = as[i];
239     if (a != null)
240     sum += a.get();
241     }
242 dl 1.1 }
243     return sum;
244     }
245    
246     /**
247     * Resets each of the variables to zero. This is effective in
248     * fully resetting the sum only if there are no concurrent
249     * updates.
250     */
251     public void reset() {
252     Adder[] as = adders;
253 dl 1.4 if (as != null) {
254     int n = as.length;
255     for (int i = 0; i < n; ++i) {
256     Adder a = as[i];
257     if (a != null)
258     a.set(0L);
259     }
260 dl 1.1 }
261     }
262    
263     /**
264     * Equivalent to {@code add(1)}.
265     */
266     public void increment() {
267     add(1L);
268     }
269    
270     /**
271     * Equivalent to {@code add(-1)}.
272     */
273     public void decrement() {
274     add(-1L);
275     }
276    
277     /**
278     * Equivalent to {@link #sum} followed by {@link #reset}.
279     *
280     * @return the estimated sum
281     */
282     public long sumAndReset() {
283 dl 1.4 long sum = 0L;
284 dl 1.1 Adder[] as = adders;
285 dl 1.4 if (as != null) {
286     int n = as.length;
287     for (int i = 0; i < n; ++i) {
288     Adder a = as[i];
289     if (a != null) {
290     sum += a.get();
291     a.set(0L);
292     }
293 dl 1.1 }
294     }
295     return sum;
296     }
297    
298     private void writeObject(java.io.ObjectOutputStream s)
299     throws java.io.IOException {
300     s.defaultWriteObject();
301     s.writeLong(sum());
302     }
303    
304     private void readObject(ObjectInputStream s)
305     throws IOException, ClassNotFoundException {
306     s.defaultReadObject();
307     mutex.set(0);
308 dl 1.4 add(s.readLong());
309 dl 1.1 }
310    
311     }