ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.3
Committed: Fri Jul 22 13:25:12 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.2: +72 -26 lines
Log Message:
Add non-default constructor; improve documentation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Arrays;
9 import java.util.Random;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.io.IOException;
13 import java.io.Serializable;
14 import java.io.ObjectInputStream;
15 import java.io.ObjectOutputStream;
16
17 /**
18 * A set of variables that together maintain a sum. When updates
19 * (method {@link #add}) are contended across threads, this set of
20 * adder variables may grow dynamically to reduce contention. Method
21 * {@link #sum} returns the current combined total across these
22 * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 * updates may occur while the sum is being calculated), and so cannot
24 * be used alone for fine-grained synchronization control.
25 *
26 * <p> This class may be applicable when many threads frequently
27 * update a common sum that is used for purposes such as collecting
28 * statistics. In this case, performance may be significantly faster
29 * than using a shared {@link AtomicLong}, at the expense of using
30 * significantly more space. On the other hand, if it is known that
31 * only one thread can ever update the sum, performance may be
32 * significantly slower than just updating a local variable.
33 *
34 * <p>A StripedAdder may optionally be constructed with a given
35 * expected contention level; i.e., the number of threads that are
36 * expected to concurrently update the sum. Supplying an accurate
37 * value may improve performance by reducing the need for dynamic
38 * adjustment.
39 *
40 * @author Doug Lea
41 */
42 public class StripedAdder implements Serializable {
43 private static final long serialVersionUID = 7249069246863182397L;
44
45 /*
46 * Overview: We maintain a table of Atomic long variables. The
47 * table is indexed by per-thread hash codes that are initialized
48 * to random values.
49 *
50 * The table doubles in size upon contention (as indicated by
51 * failed CASes when performing add()), but is capped at the
52 * nearest power of two >= #CPUS. This reflects the idea that,
53 * when there are more threads than CPUs, then if each thread were
54 * bound to a CPU, there would exist a perfect hash function
55 * mapping threads to slots that eliminates collisions. When we
56 * reach capacity, we search for this mapping by randomly varying
57 * the hash codes of colliding threads. Because search is random,
58 * and failures only become known via CAS failures, convergence
59 * will be slow, and because threads are typically not bound to
60 * CPUS forever, may not occur at all. However, despite these
61 * limitations, observed contention is typically very low in these
62 * cases.
63 *
64 * Table entries are of class Adder; a form of AtomicLong padded
65 * to reduce cache contention on most processors. Padding is
66 * overkill for most Atomics because they are most often
67 * irregularly scattered in memory and thus don't interfere much
68 * with each other. But Atomic objects residing in arrays will
69 * tend to be placed adjacent to each other, and so will most
70 * often share cache lines without this precaution. Except for
71 * slot adders[0], Adders are constructed upon first use, which
72 * further improves per-thread locality and helps reduce (an
73 * already large) footprint.
74 *
75 * A single spinlock is used for resizing the table as well as
76 * populating slots with new Adders. Upon lock contention, threads
77 * try other slots rather than blocking. We guarantee that at
78 * least one slot (0) exists, so retries will eventually find a
79 * candidate Adder. During these retries, there is increased
80 * contention and reduced locality, which is still better than
81 * alternatives.
82 */
83
84 /**
85 * Number of processors, to place a cap on table growth.
86 */
87 static final int NCPU = Runtime.getRuntime().availableProcessors();
88
89 /**
90 * Padded version of AtomicLong
91 */
92 static final class Adder extends AtomicLong {
93 long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
94 Adder(long x) { super(x); }
95 }
96
97 /**
98 * Holder for the thread-local hash code. The code starts off with
99 * a given random value, but may be set to a different
100 * pseudo-random value (using a cheaper but adequate xorshift
101 * generator) upon collisions.
102 */
103 static final class HashCode {
104 int code;
105 HashCode(int h) { code = h; }
106 }
107
108 /**
109 * The corresponding ThreadLocal class
110 */
111 static final class ThreadHashCode extends ThreadLocal<HashCode> {
112 static final Random rng = new Random();
113 public HashCode initialValue() {
114 int h = rng.nextInt();
115 return new HashCode((h == 0) ? 1 : h); // ensure nonzero
116 }
117 }
118
119 /**
120 * Static per-thread hash codes. Shared across all StripedAdders
121 * because adjustments due to collisions in one table are likely
122 * to be appropriate for others.
123 */
124 static final ThreadHashCode threadHashCode = new ThreadHashCode();
125
126 /**
127 * Table of adders. Minimum size 2. Size grows to be at most NCPU.
128 */
129 private transient volatile Adder[] adders;
130
131 /**
132 * Serves as a lock when resizing and/or creating Adders. There
133 * is no need for a blocking lock: When busy, other threads try
134 * other slots.
135 */
136 private final AtomicInteger mutex;
137
138 /**
139 * Marsaglia XorShift random generator for rehashing on collisions
140 */
141 private static int xorShift(int r) {
142 r ^= r << 13;
143 r ^= r >>> 17;
144 return r ^ (r << 5);
145 }
146
147 /**
148 * Creates a new adder with zero sum.
149 */
150 public StripedAdder() {
151 this(2);
152 }
153
154 /**
155 * Creates a new adder with zero sum, and with stripes presized
156 * for the given expected contention level.
157 *
158 * @param expectedContention the expected number of threads that
159 * will concurrently update the sum.
160 */
161 public StripedAdder(int expectedContention) {
162 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 int size = 2;
164 while (size < cap)
165 size <<= 1;
166 Adder[] as = new Adder[size];
167 as[0] = new Adder(0); // ensure at least one available adder
168 this.adders = as;
169 this.mutex = new AtomicInteger();
170 }
171
172 /**
173 * Adds the given value.
174 *
175 * @param x the value to add
176 */
177 public void add(long x) {
178 HashCode hc = threadHashCode.get();
179 for (int h = hc.code;;) {
180 Adder[] as = adders;
181 int n = as.length;
182 Adder a = as[h & (n - 1)];
183 if (a != null) {
184 long v = a.get();
185 if (a.compareAndSet(v, v + x))
186 break;
187 if (n >= NCPU) { // Collision when table at max
188 h = hc.code = xorShift(h); // change code
189 continue;
190 }
191 }
192 final AtomicInteger mutex = this.mutex;
193 if (mutex.get() != 0)
194 h = xorShift(h); // Try elsewhere
195 else if (mutex.compareAndSet(0, 1)) {
196 boolean created = false;
197 try {
198 Adder[] rs = adders;
199 if (a != null && rs == as) // Resize table
200 rs = adders = Arrays.copyOf(as, as.length << 1);
201 int j = h & (rs.length - 1);
202 if (rs[j] == null) { // Create adder
203 rs[j] = new Adder(x);
204 created = true;
205 }
206 } finally {
207 mutex.set(0);
208 }
209 if (created) {
210 hc.code = h; // Use this adder next time
211 break;
212 }
213 }
214 }
215 }
216
217 /**
218 * Returns an estimate of the current sum. The result is
219 * calculated by summing multiple variables, so may not be
220 * accurate if updates occur concurrently with this method.
221 *
222 * @return the estimated sum
223 */
224 public long sum() {
225 long sum = 0;
226 Adder[] as = adders;
227 int n = as.length;
228 for (int i = 0; i < n; ++i) {
229 Adder a = as[i];
230 if (a != null)
231 sum += a.get();
232 }
233 return sum;
234 }
235
236 /**
237 * Resets each of the variables to zero. This is effective in
238 * fully resetting the sum only if there are no concurrent
239 * updates.
240 */
241 public void reset() {
242 Adder[] as = adders;
243 int n = as.length;
244 for (int i = 0; i < n; ++i) {
245 Adder a = as[i];
246 if (a != null)
247 a.set(0L);
248 }
249 }
250
251 /**
252 * Equivalent to {@code add(1)}.
253 */
254 public void increment() {
255 add(1L);
256 }
257
258 /**
259 * Equivalent to {@code add(-1)}.
260 */
261 public void decrement() {
262 add(-1L);
263 }
264
265 /**
266 * Equivalent to {@link #sum} followed by {@link #reset}.
267 *
268 * @return the estimated sum
269 */
270 public long sumAndReset() {
271 long sum = 0;
272 Adder[] as = adders;
273 int n = as.length;
274 for (int i = 0; i < n; ++i) {
275 Adder a = as[i];
276 if (a != null) {
277 sum += a.get();
278 a.set(0L);
279 }
280 }
281 return sum;
282 }
283
284 private void writeObject(java.io.ObjectOutputStream s)
285 throws java.io.IOException {
286 s.defaultWriteObject();
287 s.writeLong(sum());
288 }
289
290 private void readObject(ObjectInputStream s)
291 throws IOException, ClassNotFoundException {
292 s.defaultReadObject();
293 long c = s.readLong();
294 Adder[] as = new Adder[2];
295 as[0] = new Adder(c);
296 this.adders = as;
297 mutex.set(0);
298 }
299
300 }
301
302