1 |
dl |
1.1 |
/* |
2 |
|
|
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
|
|
* Expert Group and released to the public domain, as explained at |
4 |
|
|
* http://creativecommons.org/publicdomain/zero/1.0/ |
5 |
|
|
*/ |
6 |
|
|
|
7 |
|
|
package jsr166e; |
8 |
|
|
import java.util.Arrays; |
9 |
|
|
import java.util.Random; |
10 |
|
|
import java.util.concurrent.atomic.AtomicInteger; |
11 |
|
|
import java.util.concurrent.atomic.AtomicLong; |
12 |
|
|
import java.io.IOException; |
13 |
|
|
import java.io.Serializable; |
14 |
|
|
import java.io.ObjectInputStream; |
15 |
|
|
import java.io.ObjectOutputStream; |
16 |
|
|
|
17 |
|
|
/** |
18 |
|
|
* A set of variables that together maintain a sum. When updates |
19 |
|
|
* (method {@link #add}) are contended across threads, the set of |
20 |
|
|
* adders may grow to reduce contention. Method {@link #sum} returns |
21 |
|
|
* the current combined total across these adders. This value is |
22 |
|
|
* <em>NOT</em> an atomic snapshot (concurrent updates may occur while |
23 |
|
|
* the sum is being calculated), and so cannot be used alone for |
24 |
|
|
* fine-grained synchronization control. |
25 |
jsr166 |
1.2 |
* |
26 |
dl |
1.1 |
* <p> This class may be applicable when many threads frequently |
27 |
|
|
* update a common sum that is used for purposes such as collecting |
28 |
|
|
* statistics. In this case, performance may be significantly faster |
29 |
|
|
* than using a shared {@link AtomicLong}, at the expense of using |
30 |
|
|
* significantly more space. On the other hand, if it is known that |
31 |
|
|
* only one thread can ever update the sum, performance may be |
32 |
|
|
* significantly slower than just updating a local variable. |
33 |
|
|
* |
34 |
jsr166 |
1.2 |
* @author Doug Lea |
35 |
dl |
1.1 |
*/ |
36 |
|
|
public class StripedAdder implements Serializable { |
37 |
|
|
private static final long serialVersionUID = 7249069246863182397L; |
38 |
|
|
|
39 |
|
|
/* |
40 |
|
|
* Overview: We maintain a table of AtomicLongs (padded to reduce |
41 |
|
|
* false sharing). The table is indexed by per-thread hash codes |
42 |
|
|
* that are initialized as random values. The table doubles in |
43 |
|
|
* size upon contention (as indicated by failed CASes when |
44 |
|
|
* performing add()), but is capped at the nearest power of two >= |
45 |
|
|
* #cpus: At that point, contention should be infrequent if each |
46 |
|
|
* thread has a unique index; so we instead adjust hash codes to |
47 |
|
|
* new random values upon contention rather than expanding. A |
48 |
|
|
* single spinlock is used for resizing the table as well as |
49 |
|
|
* populating slots with new Adders. Upon lock contention, threads |
50 |
|
|
* just try other slots rather than blocking. We guarantee that at |
51 |
|
|
* least one slot exists, so retries will eventually find a |
52 |
|
|
* candidate Adder. |
53 |
|
|
*/ |
54 |
|
|
|
55 |
jsr166 |
1.2 |
/** |
56 |
dl |
1.1 |
* Number of processors, to place a cap on table growth. |
57 |
|
|
*/ |
58 |
|
|
static final int NCPU = Runtime.getRuntime().availableProcessors(); |
59 |
|
|
|
60 |
|
|
/** |
61 |
|
|
* Version of AtomicLong padded to avoid sharing cache |
62 |
|
|
* lines on most processors |
63 |
|
|
*/ |
64 |
|
|
static final class Adder extends AtomicLong { |
65 |
|
|
long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd; |
66 |
|
|
Adder(long x) { super(x); } |
67 |
|
|
} |
68 |
|
|
|
69 |
jsr166 |
1.2 |
/** |
70 |
dl |
1.1 |
* Holder for the thread-local hash code. |
71 |
|
|
*/ |
72 |
|
|
static final class HashCode { |
73 |
|
|
int code; |
74 |
|
|
HashCode(int h) { code = h; } |
75 |
|
|
} |
76 |
|
|
|
77 |
|
|
/** |
78 |
|
|
* The corresponding ThreadLocal class |
79 |
|
|
*/ |
80 |
|
|
static final class ThreadHashCode extends ThreadLocal<HashCode> { |
81 |
|
|
static final Random rng = new Random(); |
82 |
jsr166 |
1.2 |
public HashCode initialValue() { |
83 |
dl |
1.1 |
int h = rng.nextInt(); |
84 |
|
|
return new HashCode((h == 0) ? 1 : h); // ensure nonzero |
85 |
|
|
} |
86 |
|
|
} |
87 |
|
|
|
88 |
|
|
/** |
89 |
|
|
* Static per-thread hash codes. Shared across all StripedAdders |
90 |
|
|
* because adjustments due to collisions in one table are likely |
91 |
|
|
* to be appropriate for others. |
92 |
|
|
*/ |
93 |
|
|
static final ThreadHashCode threadHashCode = new ThreadHashCode(); |
94 |
|
|
|
95 |
|
|
/** |
96 |
|
|
* Table of adders. Initially of size 2; grows to be at most NCPU. |
97 |
|
|
*/ |
98 |
|
|
private transient volatile Adder[] adders; |
99 |
|
|
|
100 |
|
|
/** |
101 |
|
|
* Serves as a lock when resizing and/or creating Adders. There |
102 |
|
|
* is no need for a blocking lock: When busy, other threads try |
103 |
|
|
* other slots. |
104 |
|
|
*/ |
105 |
|
|
private final AtomicInteger mutex; |
106 |
|
|
|
107 |
|
|
/** |
108 |
|
|
* Marsaglia XorShift for rehashing on collisions |
109 |
|
|
*/ |
110 |
jsr166 |
1.2 |
private static int xorShift(int r) { |
111 |
dl |
1.1 |
r ^= r << 13; |
112 |
|
|
r ^= r >>> 17; |
113 |
|
|
return r ^ (r << 5); |
114 |
|
|
} |
115 |
|
|
|
116 |
|
|
/** |
117 |
|
|
* Creates a new adder with initially zero sum. |
118 |
|
|
*/ |
119 |
|
|
public StripedAdder() { |
120 |
|
|
Adder[] as = new Adder[2]; |
121 |
|
|
as[0] = new Adder(0); // ensure at least one available adder |
122 |
|
|
this.adders = as; |
123 |
|
|
this.mutex = new AtomicInteger(); |
124 |
|
|
} |
125 |
|
|
|
126 |
|
|
/** |
127 |
|
|
* Adds the given value. |
128 |
|
|
* |
129 |
|
|
* @param x the value to add |
130 |
|
|
*/ |
131 |
|
|
public void add(long x) { |
132 |
|
|
HashCode hc = threadHashCode.get(); |
133 |
|
|
for (int h = hc.code;;) { |
134 |
|
|
Adder[] as = adders; |
135 |
|
|
int n = as.length; |
136 |
|
|
Adder a = as[h & (n - 1)]; |
137 |
|
|
if (a != null) { |
138 |
|
|
long v = a.get(); |
139 |
|
|
if (a.compareAndSet(v, v + x)) |
140 |
|
|
break; |
141 |
|
|
if (n >= NCPU) { // Collision when table at max |
142 |
|
|
h = hc.code = xorShift(h); // change code |
143 |
|
|
continue; |
144 |
|
|
} |
145 |
|
|
} |
146 |
|
|
final AtomicInteger mutex = this.mutex; |
147 |
|
|
if (mutex.get() != 0) |
148 |
|
|
h = xorShift(h); // Try elsewhere |
149 |
|
|
else if (mutex.compareAndSet(0, 1)) { |
150 |
|
|
boolean created = false; |
151 |
|
|
try { |
152 |
|
|
Adder[] rs = adders; |
153 |
|
|
if (a != null && rs == as) // Resize table |
154 |
|
|
rs = adders = Arrays.copyOf(as, as.length << 1); |
155 |
|
|
int j = h & (rs.length - 1); |
156 |
|
|
if (rs[j] == null) { // Create adder |
157 |
|
|
rs[j] = new Adder(x); |
158 |
|
|
created = true; |
159 |
|
|
} |
160 |
|
|
} finally { |
161 |
|
|
mutex.set(0); |
162 |
|
|
} |
163 |
|
|
if (created) { |
164 |
|
|
hc.code = h; // Use this adder next time |
165 |
|
|
break; |
166 |
|
|
} |
167 |
|
|
} |
168 |
|
|
} |
169 |
|
|
} |
170 |
|
|
|
171 |
|
|
/** |
172 |
|
|
* Returns an estimate of the current sum. The result is |
173 |
|
|
* calculated by summing multiple variables, so may not be |
174 |
|
|
* accurate if updates occur concurrently with this method. |
175 |
jsr166 |
1.2 |
* |
176 |
dl |
1.1 |
* @return the estimated sum |
177 |
|
|
*/ |
178 |
|
|
public long sum() { |
179 |
|
|
long sum = 0; |
180 |
|
|
Adder[] as = adders; |
181 |
|
|
int n = as.length; |
182 |
|
|
for (int i = 0; i < n; ++i) { |
183 |
|
|
Adder a = as[i]; |
184 |
|
|
if (a != null) |
185 |
|
|
sum += a.get(); |
186 |
|
|
} |
187 |
|
|
return sum; |
188 |
|
|
} |
189 |
|
|
|
190 |
|
|
/** |
191 |
|
|
* Resets each of the variables to zero. This is effective in |
192 |
|
|
* fully resetting the sum only if there are no concurrent |
193 |
|
|
* updates. |
194 |
|
|
*/ |
195 |
|
|
public void reset() { |
196 |
|
|
Adder[] as = adders; |
197 |
|
|
int n = as.length; |
198 |
|
|
for (int i = 0; i < n; ++i) { |
199 |
|
|
Adder a = as[i]; |
200 |
|
|
if (a != null) |
201 |
|
|
a.set(0L); |
202 |
|
|
} |
203 |
|
|
} |
204 |
|
|
|
205 |
|
|
/** |
206 |
|
|
* Equivalent to {@code add(1)}. |
207 |
|
|
*/ |
208 |
|
|
public void increment() { |
209 |
|
|
add(1L); |
210 |
|
|
} |
211 |
|
|
|
212 |
|
|
/** |
213 |
|
|
* Equivalent to {@code add(-1)}. |
214 |
|
|
*/ |
215 |
|
|
public void decrement() { |
216 |
|
|
add(-1L); |
217 |
|
|
} |
218 |
|
|
|
219 |
|
|
/** |
220 |
|
|
* Equivalent to {@link #sum} followed by {@link #reset}. |
221 |
|
|
* |
222 |
|
|
* @return the estimated sum |
223 |
|
|
*/ |
224 |
|
|
public long sumAndReset() { |
225 |
|
|
long sum = 0; |
226 |
|
|
Adder[] as = adders; |
227 |
|
|
int n = as.length; |
228 |
|
|
for (int i = 0; i < n; ++i) { |
229 |
|
|
Adder a = as[i]; |
230 |
|
|
if (a != null) { |
231 |
|
|
sum += a.get(); |
232 |
|
|
a.set(0L); |
233 |
|
|
} |
234 |
|
|
} |
235 |
|
|
return sum; |
236 |
|
|
} |
237 |
|
|
|
238 |
|
|
private void writeObject(java.io.ObjectOutputStream s) |
239 |
|
|
throws java.io.IOException { |
240 |
|
|
s.defaultWriteObject(); |
241 |
|
|
s.writeLong(sum()); |
242 |
|
|
} |
243 |
|
|
|
244 |
|
|
private void readObject(ObjectInputStream s) |
245 |
|
|
throws IOException, ClassNotFoundException { |
246 |
|
|
s.defaultReadObject(); |
247 |
|
|
long c = s.readLong(); |
248 |
|
|
Adder[] as = new Adder[2]; |
249 |
|
|
as[0] = new Adder(c); |
250 |
|
|
this.adders = as; |
251 |
|
|
mutex.set(0); |
252 |
|
|
} |
253 |
|
|
|
254 |
|
|
} |
255 |
|
|
|
256 |
|
|
|