ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.3
Committed: Fri Jul 22 13:25:12 2011 UTC (12 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.2: +72 -26 lines
Log Message:
Add non-default constructor; improve documentation

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Arrays;
9     import java.util.Random;
10     import java.util.concurrent.atomic.AtomicInteger;
11     import java.util.concurrent.atomic.AtomicLong;
12     import java.io.IOException;
13     import java.io.Serializable;
14     import java.io.ObjectInputStream;
15     import java.io.ObjectOutputStream;
16    
17     /**
18     * A set of variables that together maintain a sum. When updates
19 dl 1.3 * (method {@link #add}) are contended across threads, this set of
20     * adder variables may grow dynamically to reduce contention. Method
21     * {@link #sum} returns the current combined total across these
22     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23     * updates may occur while the sum is being calculated), and so cannot
24     * be used alone for fine-grained synchronization control.
25 jsr166 1.2 *
26 dl 1.1 * <p> This class may be applicable when many threads frequently
27     * update a common sum that is used for purposes such as collecting
28     * statistics. In this case, performance may be significantly faster
29     * than using a shared {@link AtomicLong}, at the expense of using
30     * significantly more space. On the other hand, if it is known that
31     * only one thread can ever update the sum, performance may be
32     * significantly slower than just updating a local variable.
33     *
34 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
35     * expected contention level; i.e., the number of threads that are
36     * expected to concurrently update the sum. Supplying an accurate
37     * value may improve performance by reducing the need for dynamic
38     * adjustment.
39     *
40 jsr166 1.2 * @author Doug Lea
41 dl 1.1 */
42     public class StripedAdder implements Serializable {
43     private static final long serialVersionUID = 7249069246863182397L;
44    
45     /*
46 dl 1.3 * Overview: We maintain a table of Atomic long variables. The
47     * table is indexed by per-thread hash codes that are initialized
48     * to random values.
49     *
50     * The table doubles in size upon contention (as indicated by
51     * failed CASes when performing add()), but is capped at the
52     * nearest power of two >= #CPUS. This reflects the idea that,
53     * when there are more threads than CPUs, then if each thread were
54     * bound to a CPU, there would exist a perfect hash function
55     * mapping threads to slots that eliminates collisions. When we
56     * reach capacity, we search for this mapping by randomly varying
57     * the hash codes of colliding threads. Because search is random,
58     * and failures only become known via CAS failures, convergence
59     * will be slow, and because threads are typically not bound to
60     * CPUS forever, may not occur at all. However, despite these
61     * limitations, observed contention is typically very low in these
62     * cases.
63     *
64     * Table entries are of class Adder; a form of AtomicLong padded
65     * to reduce cache contention on most processors. Padding is
66     * overkill for most Atomics because they are most often
67     * irregularly scattered in memory and thus don't interfere much
68     * with each other. But Atomic objects residing in arrays will
69     * tend to be placed adjacent to each other, and so will most
70     * often share cache lines without this precaution. Except for
71     * slot adders[0], Adders are constructed upon first use, which
72     * further improves per-thread locality and helps reduce (an
73     * already large) footprint.
74     *
75     * A single spinlock is used for resizing the table as well as
76 dl 1.1 * populating slots with new Adders. Upon lock contention, threads
77 dl 1.3 * try other slots rather than blocking. We guarantee that at
78     * least one slot (0) exists, so retries will eventually find a
79     * candidate Adder. During these retries, there is increased
80     * contention and reduced locality, which is still better than
81     * alternatives.
82 dl 1.1 */
83    
84 jsr166 1.2 /**
85 dl 1.1 * Number of processors, to place a cap on table growth.
86     */
87     static final int NCPU = Runtime.getRuntime().availableProcessors();
88    
89     /**
90 dl 1.3 * Padded version of AtomicLong
91 dl 1.1 */
92     static final class Adder extends AtomicLong {
93     long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
94     Adder(long x) { super(x); }
95     }
96    
97 jsr166 1.2 /**
98 dl 1.3 * Holder for the thread-local hash code. The code starts off with
99     * a given random value, but may be set to a different
100     * pseudo-random value (using a cheaper but adequate xorshift
101     * generator) upon collisions.
102 dl 1.1 */
103     static final class HashCode {
104     int code;
105     HashCode(int h) { code = h; }
106     }
107    
108     /**
109     * The corresponding ThreadLocal class
110     */
111     static final class ThreadHashCode extends ThreadLocal<HashCode> {
112     static final Random rng = new Random();
113 jsr166 1.2 public HashCode initialValue() {
114 dl 1.1 int h = rng.nextInt();
115     return new HashCode((h == 0) ? 1 : h); // ensure nonzero
116     }
117     }
118    
119     /**
120     * Static per-thread hash codes. Shared across all StripedAdders
121     * because adjustments due to collisions in one table are likely
122     * to be appropriate for others.
123     */
124     static final ThreadHashCode threadHashCode = new ThreadHashCode();
125    
126     /**
127 dl 1.3 * Table of adders. Minimum size 2. Size grows to be at most NCPU.
128 dl 1.1 */
129     private transient volatile Adder[] adders;
130    
131     /**
132     * Serves as a lock when resizing and/or creating Adders. There
133     * is no need for a blocking lock: When busy, other threads try
134     * other slots.
135     */
136     private final AtomicInteger mutex;
137    
138     /**
139 dl 1.3 * Marsaglia XorShift random generator for rehashing on collisions
140 dl 1.1 */
141 jsr166 1.2 private static int xorShift(int r) {
142 dl 1.1 r ^= r << 13;
143     r ^= r >>> 17;
144     return r ^ (r << 5);
145     }
146    
147     /**
148 dl 1.3 * Creates a new adder with zero sum.
149 dl 1.1 */
150     public StripedAdder() {
151 dl 1.3 this(2);
152     }
153    
154     /**
155     * Creates a new adder with zero sum, and with stripes presized
156     * for the given expected contention level.
157     *
158     * @param expectedContention the expected number of threads that
159     * will concurrently update the sum.
160     */
161     public StripedAdder(int expectedContention) {
162     int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163     int size = 2;
164     while (size < cap)
165     size <<= 1;
166     Adder[] as = new Adder[size];
167 dl 1.1 as[0] = new Adder(0); // ensure at least one available adder
168     this.adders = as;
169     this.mutex = new AtomicInteger();
170     }
171    
172     /**
173     * Adds the given value.
174     *
175     * @param x the value to add
176     */
177     public void add(long x) {
178     HashCode hc = threadHashCode.get();
179     for (int h = hc.code;;) {
180     Adder[] as = adders;
181     int n = as.length;
182     Adder a = as[h & (n - 1)];
183     if (a != null) {
184     long v = a.get();
185     if (a.compareAndSet(v, v + x))
186     break;
187     if (n >= NCPU) { // Collision when table at max
188     h = hc.code = xorShift(h); // change code
189     continue;
190     }
191     }
192     final AtomicInteger mutex = this.mutex;
193     if (mutex.get() != 0)
194     h = xorShift(h); // Try elsewhere
195     else if (mutex.compareAndSet(0, 1)) {
196     boolean created = false;
197     try {
198     Adder[] rs = adders;
199     if (a != null && rs == as) // Resize table
200     rs = adders = Arrays.copyOf(as, as.length << 1);
201     int j = h & (rs.length - 1);
202     if (rs[j] == null) { // Create adder
203     rs[j] = new Adder(x);
204     created = true;
205     }
206     } finally {
207     mutex.set(0);
208     }
209     if (created) {
210 dl 1.3 hc.code = h; // Use this adder next time
211 dl 1.1 break;
212     }
213     }
214     }
215     }
216    
217     /**
218     * Returns an estimate of the current sum. The result is
219     * calculated by summing multiple variables, so may not be
220     * accurate if updates occur concurrently with this method.
221 jsr166 1.2 *
222 dl 1.1 * @return the estimated sum
223     */
224     public long sum() {
225     long sum = 0;
226     Adder[] as = adders;
227     int n = as.length;
228     for (int i = 0; i < n; ++i) {
229     Adder a = as[i];
230     if (a != null)
231     sum += a.get();
232     }
233     return sum;
234     }
235    
236     /**
237     * Resets each of the variables to zero. This is effective in
238     * fully resetting the sum only if there are no concurrent
239     * updates.
240     */
241     public void reset() {
242     Adder[] as = adders;
243     int n = as.length;
244     for (int i = 0; i < n; ++i) {
245     Adder a = as[i];
246     if (a != null)
247     a.set(0L);
248     }
249     }
250    
251     /**
252     * Equivalent to {@code add(1)}.
253     */
254     public void increment() {
255     add(1L);
256     }
257    
258     /**
259     * Equivalent to {@code add(-1)}.
260     */
261     public void decrement() {
262     add(-1L);
263     }
264    
265     /**
266     * Equivalent to {@link #sum} followed by {@link #reset}.
267     *
268     * @return the estimated sum
269     */
270     public long sumAndReset() {
271     long sum = 0;
272     Adder[] as = adders;
273     int n = as.length;
274     for (int i = 0; i < n; ++i) {
275     Adder a = as[i];
276     if (a != null) {
277     sum += a.get();
278     a.set(0L);
279     }
280     }
281     return sum;
282     }
283    
284     private void writeObject(java.io.ObjectOutputStream s)
285     throws java.io.IOException {
286     s.defaultWriteObject();
287     s.writeLong(sum());
288     }
289    
290     private void readObject(ObjectInputStream s)
291     throws IOException, ClassNotFoundException {
292     s.defaultReadObject();
293     long c = s.readLong();
294     Adder[] as = new Adder[2];
295     as[0] = new Adder(c);
296     this.adders = as;
297     mutex.set(0);
298     }
299    
300     }
301    
302