ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.2
Committed: Wed Jul 20 16:06:19 2011 UTC (12 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +7 -7 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Arrays;
9 import java.util.Random;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.io.IOException;
13 import java.io.Serializable;
14 import java.io.ObjectInputStream;
15 import java.io.ObjectOutputStream;
16
17 /**
18 * A set of variables that together maintain a sum. When updates
19 * (method {@link #add}) are contended across threads, the set of
20 * adders may grow to reduce contention. Method {@link #sum} returns
21 * the current combined total across these adders. This value is
22 * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
23 * the sum is being calculated), and so cannot be used alone for
24 * fine-grained synchronization control.
25 *
26 * <p> This class may be applicable when many threads frequently
27 * update a common sum that is used for purposes such as collecting
28 * statistics. In this case, performance may be significantly faster
29 * than using a shared {@link AtomicLong}, at the expense of using
30 * significantly more space. On the other hand, if it is known that
31 * only one thread can ever update the sum, performance may be
32 * significantly slower than just updating a local variable.
33 *
34 * @author Doug Lea
35 */
36 public class StripedAdder implements Serializable {
37 private static final long serialVersionUID = 7249069246863182397L;
38
39 /*
40 * Overview: We maintain a table of AtomicLongs (padded to reduce
41 * false sharing). The table is indexed by per-thread hash codes
42 * that are initialized as random values. The table doubles in
43 * size upon contention (as indicated by failed CASes when
44 * performing add()), but is capped at the nearest power of two >=
45 * #cpus: At that point, contention should be infrequent if each
46 * thread has a unique index; so we instead adjust hash codes to
47 * new random values upon contention rather than expanding. A
48 * single spinlock is used for resizing the table as well as
49 * populating slots with new Adders. Upon lock contention, threads
50 * just try other slots rather than blocking. We guarantee that at
51 * least one slot exists, so retries will eventually find a
52 * candidate Adder.
53 */
54
55 /**
56 * Number of processors, to place a cap on table growth.
57 */
58 static final int NCPU = Runtime.getRuntime().availableProcessors();
59
60 /**
61 * Version of AtomicLong padded to avoid sharing cache
62 * lines on most processors
63 */
64 static final class Adder extends AtomicLong {
65 long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
66 Adder(long x) { super(x); }
67 }
68
69 /**
70 * Holder for the thread-local hash code.
71 */
72 static final class HashCode {
73 int code;
74 HashCode(int h) { code = h; }
75 }
76
77 /**
78 * The corresponding ThreadLocal class
79 */
80 static final class ThreadHashCode extends ThreadLocal<HashCode> {
81 static final Random rng = new Random();
82 public HashCode initialValue() {
83 int h = rng.nextInt();
84 return new HashCode((h == 0) ? 1 : h); // ensure nonzero
85 }
86 }
87
88 /**
89 * Static per-thread hash codes. Shared across all StripedAdders
90 * because adjustments due to collisions in one table are likely
91 * to be appropriate for others.
92 */
93 static final ThreadHashCode threadHashCode = new ThreadHashCode();
94
95 /**
96 * Table of adders. Initially of size 2; grows to be at most NCPU.
97 */
98 private transient volatile Adder[] adders;
99
100 /**
101 * Serves as a lock when resizing and/or creating Adders. There
102 * is no need for a blocking lock: When busy, other threads try
103 * other slots.
104 */
105 private final AtomicInteger mutex;
106
107 /**
108 * Marsaglia XorShift for rehashing on collisions
109 */
110 private static int xorShift(int r) {
111 r ^= r << 13;
112 r ^= r >>> 17;
113 return r ^ (r << 5);
114 }
115
116 /**
117 * Creates a new adder with initially zero sum.
118 */
119 public StripedAdder() {
120 Adder[] as = new Adder[2];
121 as[0] = new Adder(0); // ensure at least one available adder
122 this.adders = as;
123 this.mutex = new AtomicInteger();
124 }
125
126 /**
127 * Adds the given value.
128 *
129 * @param x the value to add
130 */
131 public void add(long x) {
132 HashCode hc = threadHashCode.get();
133 for (int h = hc.code;;) {
134 Adder[] as = adders;
135 int n = as.length;
136 Adder a = as[h & (n - 1)];
137 if (a != null) {
138 long v = a.get();
139 if (a.compareAndSet(v, v + x))
140 break;
141 if (n >= NCPU) { // Collision when table at max
142 h = hc.code = xorShift(h); // change code
143 continue;
144 }
145 }
146 final AtomicInteger mutex = this.mutex;
147 if (mutex.get() != 0)
148 h = xorShift(h); // Try elsewhere
149 else if (mutex.compareAndSet(0, 1)) {
150 boolean created = false;
151 try {
152 Adder[] rs = adders;
153 if (a != null && rs == as) // Resize table
154 rs = adders = Arrays.copyOf(as, as.length << 1);
155 int j = h & (rs.length - 1);
156 if (rs[j] == null) { // Create adder
157 rs[j] = new Adder(x);
158 created = true;
159 }
160 } finally {
161 mutex.set(0);
162 }
163 if (created) {
164 hc.code = h; // Use this adder next time
165 break;
166 }
167 }
168 }
169 }
170
171 /**
172 * Returns an estimate of the current sum. The result is
173 * calculated by summing multiple variables, so may not be
174 * accurate if updates occur concurrently with this method.
175 *
176 * @return the estimated sum
177 */
178 public long sum() {
179 long sum = 0;
180 Adder[] as = adders;
181 int n = as.length;
182 for (int i = 0; i < n; ++i) {
183 Adder a = as[i];
184 if (a != null)
185 sum += a.get();
186 }
187 return sum;
188 }
189
190 /**
191 * Resets each of the variables to zero. This is effective in
192 * fully resetting the sum only if there are no concurrent
193 * updates.
194 */
195 public void reset() {
196 Adder[] as = adders;
197 int n = as.length;
198 for (int i = 0; i < n; ++i) {
199 Adder a = as[i];
200 if (a != null)
201 a.set(0L);
202 }
203 }
204
205 /**
206 * Equivalent to {@code add(1)}.
207 */
208 public void increment() {
209 add(1L);
210 }
211
212 /**
213 * Equivalent to {@code add(-1)}.
214 */
215 public void decrement() {
216 add(-1L);
217 }
218
219 /**
220 * Equivalent to {@link #sum} followed by {@link #reset}.
221 *
222 * @return the estimated sum
223 */
224 public long sumAndReset() {
225 long sum = 0;
226 Adder[] as = adders;
227 int n = as.length;
228 for (int i = 0; i < n; ++i) {
229 Adder a = as[i];
230 if (a != null) {
231 sum += a.get();
232 a.set(0L);
233 }
234 }
235 return sum;
236 }
237
238 private void writeObject(java.io.ObjectOutputStream s)
239 throws java.io.IOException {
240 s.defaultWriteObject();
241 s.writeLong(sum());
242 }
243
244 private void readObject(ObjectInputStream s)
245 throws IOException, ClassNotFoundException {
246 s.defaultReadObject();
247 long c = s.readLong();
248 Adder[] as = new Adder[2];
249 as[0] = new Adder(c);
250 this.adders = as;
251 mutex.set(0);
252 }
253
254 }
255
256