ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.1
Committed: Wed Jul 20 15:00:56 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Log Message:
Initial checkin

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Arrays;
9     import java.util.Random;
10     import java.util.concurrent.atomic.AtomicInteger;
11     import java.util.concurrent.atomic.AtomicLong;
12     import java.io.IOException;
13     import java.io.Serializable;
14     import java.io.ObjectInputStream;
15     import java.io.ObjectOutputStream;
16    
17     /**
18     * A set of variables that together maintain a sum. When updates
19     * (method {@link #add}) are contended across threads, the set of
20     * adders may grow to reduce contention. Method {@link #sum} returns
21     * the current combined total across these adders. This value is
22     * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
23     * the sum is being calculated), and so cannot be used alone for
24     * fine-grained synchronization control.
25     *
26     * <p> This class may be applicable when many threads frequently
27     * update a common sum that is used for purposes such as collecting
28     * statistics. In this case, performance may be significantly faster
29     * than using a shared {@link AtomicLong}, at the expense of using
30     * significantly more space. On the other hand, if it is known that
31     * only one thread can ever update the sum, performance may be
32     * significantly slower than just updating a local variable.
33     *
34     * @author Doug Lea
35     */
36     public class StripedAdder implements Serializable {
37     private static final long serialVersionUID = 7249069246863182397L;
38    
39     /*
40     * Overview: We maintain a table of AtomicLongs (padded to reduce
41     * false sharing). The table is indexed by per-thread hash codes
42     * that are initialized as random values. The table doubles in
43     * size upon contention (as indicated by failed CASes when
44     * performing add()), but is capped at the nearest power of two >=
45     * #cpus: At that point, contention should be infrequent if each
46     * thread has a unique index; so we instead adjust hash codes to
47     * new random values upon contention rather than expanding. A
48     * single spinlock is used for resizing the table as well as
49     * populating slots with new Adders. Upon lock contention, threads
50     * just try other slots rather than blocking. We guarantee that at
51     * least one slot exists, so retries will eventually find a
52     * candidate Adder.
53     */
54    
55     /**
56     * Number of processors, to place a cap on table growth.
57     */
58     static final int NCPU = Runtime.getRuntime().availableProcessors();
59    
60     /**
61     * Version of AtomicLong padded to avoid sharing cache
62     * lines on most processors
63     */
64     static final class Adder extends AtomicLong {
65     long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
66     Adder(long x) { super(x); }
67     }
68    
69     /**
70     * Holder for the thread-local hash code.
71     */
72     static final class HashCode {
73     int code;
74     HashCode(int h) { code = h; }
75     }
76    
77     /**
78     * The corresponding ThreadLocal class
79     */
80     static final class ThreadHashCode extends ThreadLocal<HashCode> {
81     static final Random rng = new Random();
82     public HashCode initialValue() {
83     int h = rng.nextInt();
84     return new HashCode((h == 0) ? 1 : h); // ensure nonzero
85     }
86     }
87    
88     /**
89     * Static per-thread hash codes. Shared across all StripedAdders
90     * because adjustments due to collisions in one table are likely
91     * to be appropriate for others.
92     */
93     static final ThreadHashCode threadHashCode = new ThreadHashCode();
94    
95     /**
96     * Table of adders. Initially of size 2; grows to be at most NCPU.
97     */
98     private transient volatile Adder[] adders;
99    
100     /**
101     * Serves as a lock when resizing and/or creating Adders. There
102     * is no need for a blocking lock: When busy, other threads try
103     * other slots.
104     */
105     private final AtomicInteger mutex;
106    
107     /**
108     * Marsaglia XorShift for rehashing on collisions
109     */
110     private static int xorShift(int r) {
111     r ^= r << 13;
112     r ^= r >>> 17;
113     return r ^ (r << 5);
114     }
115    
116     /**
117     * Creates a new adder with initially zero sum.
118     */
119     public StripedAdder() {
120     Adder[] as = new Adder[2];
121     as[0] = new Adder(0); // ensure at least one available adder
122     this.adders = as;
123     this.mutex = new AtomicInteger();
124     }
125    
126     /**
127     * Adds the given value.
128     *
129     * @param x the value to add
130     */
131     public void add(long x) {
132     HashCode hc = threadHashCode.get();
133     for (int h = hc.code;;) {
134     Adder[] as = adders;
135     int n = as.length;
136     Adder a = as[h & (n - 1)];
137     if (a != null) {
138     long v = a.get();
139     if (a.compareAndSet(v, v + x))
140     break;
141     if (n >= NCPU) { // Collision when table at max
142     h = hc.code = xorShift(h); // change code
143     continue;
144     }
145     }
146     final AtomicInteger mutex = this.mutex;
147     if (mutex.get() != 0)
148     h = xorShift(h); // Try elsewhere
149     else if (mutex.compareAndSet(0, 1)) {
150     boolean created = false;
151     try {
152     Adder[] rs = adders;
153     if (a != null && rs == as) // Resize table
154     rs = adders = Arrays.copyOf(as, as.length << 1);
155     int j = h & (rs.length - 1);
156     if (rs[j] == null) { // Create adder
157     rs[j] = new Adder(x);
158     created = true;
159     }
160     } finally {
161     mutex.set(0);
162     }
163     if (created) {
164     hc.code = h; // Use this adder next time
165     break;
166     }
167     }
168     }
169     }
170    
171     /**
172     * Returns an estimate of the current sum. The result is
173     * calculated by summing multiple variables, so may not be
174     * accurate if updates occur concurrently with this method.
175     *
176     * @return the estimated sum
177     */
178     public long sum() {
179     long sum = 0;
180     Adder[] as = adders;
181     int n = as.length;
182     for (int i = 0; i < n; ++i) {
183     Adder a = as[i];
184     if (a != null)
185     sum += a.get();
186     }
187     return sum;
188     }
189    
190     /**
191     * Resets each of the variables to zero. This is effective in
192     * fully resetting the sum only if there are no concurrent
193     * updates.
194     */
195     public void reset() {
196     Adder[] as = adders;
197     int n = as.length;
198     for (int i = 0; i < n; ++i) {
199     Adder a = as[i];
200     if (a != null)
201     a.set(0L);
202     }
203     }
204    
205     /**
206     * Equivalent to {@code add(1)}.
207     */
208     public void increment() {
209     add(1L);
210     }
211    
212     /**
213     * Equivalent to {@code add(-1)}.
214     */
215     public void decrement() {
216     add(-1L);
217     }
218    
219     /**
220     * Equivalent to {@link #sum} followed by {@link #reset}.
221     *
222     * @return the estimated sum
223     */
224     public long sumAndReset() {
225     long sum = 0;
226     Adder[] as = adders;
227     int n = as.length;
228     for (int i = 0; i < n; ++i) {
229     Adder a = as[i];
230     if (a != null) {
231     sum += a.get();
232     a.set(0L);
233     }
234     }
235     return sum;
236     }
237    
238     private void writeObject(java.io.ObjectOutputStream s)
239     throws java.io.IOException {
240     s.defaultWriteObject();
241     s.writeLong(sum());
242     }
243    
244     private void readObject(ObjectInputStream s)
245     throws IOException, ClassNotFoundException {
246     s.defaultReadObject();
247     long c = s.readLong();
248     Adder[] as = new Adder[2];
249     as[0] = new Adder(c);
250     this.adders = as;
251     mutex.set(0);
252     }
253    
254     }
255    
256