ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.5
Committed: Sun Jul 24 15:08:21 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.4: +57 -40 lines
Log Message:
Reduce collisions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Arrays;
9 import java.util.Random;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.io.IOException;
13 import java.io.Serializable;
14 import java.io.ObjectInputStream;
15 import java.io.ObjectOutputStream;
16
17 /**
18 * A set of variables that together maintain a sum. When updates
19 * (method {@link #add}) are contended across threads, this set of
20 * adder variables may grow dynamically to reduce contention. Method
21 * {@link #sum} returns the current combined total across these
22 * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 * updates may occur while the sum is being calculated), and so cannot
24 * be used alone for fine-grained synchronization control.
25 *
26 * <p> This class may be applicable when many threads frequently
27 * update a common sum that is used for purposes such as collecting
28 * statistics. In this case, performance may be significantly faster
29 * than using a shared {@link AtomicLong}, at the expense of using
30 * much more space. On the other hand, if it is known that only one
31 * thread can ever update the sum, performance may be significantly
32 * slower than just updating a local variable.
33 *
34 * <p>A StripedAdder may optionally be constructed with a given
35 * expected contention level; i.e., the number of threads that are
36 * expected to concurrently update the sum. Supplying an accurate
37 * value may improve performance by reducing the need for dynamic
38 * adjustment.
39 *
40 * @author Doug Lea
41 */
42 public class StripedAdder implements Serializable {
43 private static final long serialVersionUID = 7249069246863182397L;
44
45 /*
46 * A StripedAdder maintains a table of Atomic long variables. The
47 * table is indexed by per-thread hash codes that are initialized
48 * to random values.
49 *
50 * The table doubles in size upon contention (as indicated by
51 * failed CASes when performing add()), but is capped at the
52 * nearest power of two >= #CPUS. This reflects the idea that,
53 * when there are more threads than CPUs, then if each thread were
54 * bound to a CPU, there would exist a perfect hash function
55 * mapping threads to slots that eliminates collisions. When we
56 * reach capacity, we search for this mapping by randomly varying
57 * the hash codes of colliding threads. Because search is random,
58 * and failures only become known via CAS failures, convergence
59 * will be slow, and because threads are typically not bound to
60 * CPUS forever, may not occur at all. However, despite these
61 * limitations, observed contention is typically low in these
62 * cases.
63 *
64 * Table entries are of class Adder; a form of AtomicLong padded
65 * to reduce cache contention on most processors. Padding is
66 * overkill for most Atomics because they are most often
67 * irregularly scattered in memory and thus don't interfere much
68 * with each other. But Atomic objects residing in arrays will
69 * tend to be placed adjacent to each other, and so will most
70 * often share cache lines without this precaution. Adders are
71 * constructed upon first use, which further improves per-thread
72 * locality and helps reduce (an already large) footprint.
73 *
74 * A single spinlock is used for resizing the table as well as
75 * populating slots with new Adders. Upon lock contention, threads
76 * try other slots rather than blocking. After initialization, at
77 * least one slot exists, so retries will eventually find a
78 * candidate Adder. During these retries, there is increased
79 * contention and reduced locality, which is still better than
80 * alternatives.
81 */
82
83 /**
84 * Number of processors, to place a cap on table growth.
85 */
86 static final int NCPU = Runtime.getRuntime().availableProcessors();
87
88 /**
89 * The table size set upon first use when default-constructed
90 */
91 private static final int DEFAULT_ARRAY_SIZE = 8;
92
93 /**
94 * Padded version of AtomicLong
95 */
96 static final class Adder extends AtomicLong {
97 long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
98 Adder(long x) { super(x); }
99 }
100
101 /**
102 * Holder for the thread-local hash code. The code starts off with
103 * a given random value, but may be set to a different value upon
104 * collisions in retryAdd.
105 */
106 static final class HashCode {
107 int code;
108 HashCode(int h) { code = h; }
109 }
110
111 /**
112 * The corresponding ThreadLocal class
113 */
114 static final class ThreadHashCode extends ThreadLocal<HashCode> {
115 static final Random rng = new Random();
116 public HashCode initialValue() {
117 int h = rng.nextInt();
118 return new HashCode((h == 0) ? 1 : h); // ensure nonzero
119 }
120 }
121
122 /**
123 * Static per-thread hash codes. Shared across all StripedAdders
124 * because adjustments due to collisions in one table are likely
125 * to be appropriate for others.
126 */
127 static final ThreadHashCode threadHashCode = new ThreadHashCode();
128
129 /**
130 * Table of adders. Size is power of two, grows to be at most NCPU.
131 */
132 private transient volatile Adder[] adders;
133
134 /**
135 * Serves as a lock when resizing and/or creating Adders. There
136 * is no need for a blocking lock: Except during initialization
137 * races, when busy, other threads try other slots.
138 */
139 private final AtomicInteger mutex;
140
141 /**
142 * Creates a new adder with zero sum.
143 */
144 public StripedAdder() {
145 this.mutex = new AtomicInteger();
146 // remaining initialization on first call to add.
147 }
148
149 /**
150 * Creates a new adder with zero sum, and with stripes presized
151 * for the given expected contention level.
152 *
153 * @param expectedContention the expected number of threads that
154 * will concurrently update the sum.
155 */
156 public StripedAdder(int expectedContention) {
157 int size;
158 if (expectedContention > 0) {
159 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
160 size = 1;
161 while (size < cap)
162 size <<= 1;
163 }
164 else
165 size = 0;
166 Adder[] as = new Adder[size];
167 for (int i = 0; i < size; ++i)
168 as[i] = new Adder(0);
169 this.adders = as;
170 this.mutex = new AtomicInteger();
171 }
172
173 /**
174 * Adds the given value.
175 *
176 * @param x the value to add
177 */
178 public void add(long x) {
179 Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
180 HashCode hc = threadHashCode.get();
181 if ((as = adders) == null || (n = as.length) < 1 ||
182 (a = as[hc.code & (n - 1)]) == null ||
183 !a.compareAndSet(v = a.get(), v + x))
184 retryAdd(x, hc);
185 }
186
187 /**
188 * Handle cases of add involving initialization, resizing,
189 * creating new Adders, and/or contention.
190 */
191 private void retryAdd(long x, HashCode hc) {
192 int h = hc.code;
193 final AtomicInteger mutex = this.mutex;
194 for (boolean retried = false; ; retried = true) {
195 Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
196 if ((as = adders) == null || (n = as.length) < 1) {
197 if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
198 try {
199 if (adders == null) // Default-initialize
200 adders = new Adder[DEFAULT_ARRAY_SIZE];
201 } finally {
202 mutex.set(0);
203 }
204 }
205 else
206 Thread.yield(); // initialization race
207 }
208 else if ((a = as[k = h & (n - 1)]) != null &&
209 retried && a.compareAndSet(v = a.get(), v + x))
210 break;
211 else if ((a == null || n < NCPU) &&
212 mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
213 boolean created = false;
214 try {
215 if (adders == as) {
216 if (as[k] == null) {
217 as[k] = new Adder(x);
218 created = true;
219 }
220 else { // Expand table
221 Adder[] rs = new Adder[n << 1];
222 for (int i = 0; i < n; ++i)
223 rs[i] = as[i];
224 adders = rs;
225 }
226 }
227 } finally {
228 mutex.set(0);
229 }
230 if (created)
231 break;
232 }
233 else { // Try elsewhere
234 h ^= h << 13;
235 h ^= h >>> 17; // Marsaglia XorShift
236 h ^= h << 5;
237 }
238 }
239 hc.code = h;
240 }
241
242 /**
243 * Returns an estimate of the current sum. The result is
244 * calculated by summing multiple variables, so may not be
245 * accurate if updates occur concurrently with this method.
246 *
247 * @return the estimated sum
248 */
249 public long sum() {
250 long sum = 0L;
251 Adder[] as = adders;
252 if (as != null) {
253 int n = as.length;
254 for (int i = 0; i < n; ++i) {
255 Adder a = as[i];
256 if (a != null)
257 sum += a.get();
258 }
259 }
260 return sum;
261 }
262
263 /**
264 * Resets each of the variables to zero. This is effective in
265 * fully resetting the sum only if there are no concurrent
266 * updates.
267 */
268 public void reset() {
269 Adder[] as = adders;
270 if (as != null) {
271 int n = as.length;
272 for (int i = 0; i < n; ++i) {
273 Adder a = as[i];
274 if (a != null)
275 a.set(0L);
276 }
277 }
278 }
279
280 /**
281 * Equivalent to {@code add(1)}.
282 */
283 public void increment() {
284 add(1L);
285 }
286
287 /**
288 * Equivalent to {@code add(-1)}.
289 */
290 public void decrement() {
291 add(-1L);
292 }
293
294 /**
295 * Equivalent to {@link #sum} followed by {@link #reset}.
296 *
297 * @return the estimated sum
298 */
299 public long sumAndReset() {
300 long sum = 0L;
301 Adder[] as = adders;
302 if (as != null) {
303 int n = as.length;
304 for (int i = 0; i < n; ++i) {
305 Adder a = as[i];
306 if (a != null) {
307 sum += a.get();
308 a.set(0L);
309 }
310 }
311 }
312 return sum;
313 }
314
315 private void writeObject(java.io.ObjectOutputStream s)
316 throws java.io.IOException {
317 s.defaultWriteObject();
318 s.writeLong(sum());
319 }
320
321 private void readObject(ObjectInputStream s)
322 throws IOException, ClassNotFoundException {
323 s.defaultReadObject();
324 mutex.set(0);
325 add(s.readLong());
326 }
327
328 }