ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.1 by dl, Wed Jul 20 15:00:56 2011 UTC vs.
Revision 1.11 by dl, Fri Jul 29 14:23:35 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 16 | Line 15 | import java.io.ObjectOutputStream;
15  
16   /**
17   * A set of variables that together maintain a sum.  When updates
18 < * (method {@link #add}) are contended across threads, the set of
19 < * adders may grow to reduce contention. Method {@link #sum} returns
20 < * the current combined total across these adders. This value is
21 < * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
22 < * the sum is being calculated), and so cannot be used alone for
23 < * fine-grained synchronization control.
24 < *
18 > * (method {@link #add}) are contended across threads, this set of
19 > * adder variables may grow dynamically to reduce contention. Method
20 > * {@link #sum} returns the current combined total across these
21 > * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22 > * updates may occur while the sum is being calculated), and so cannot
23 > * be used alone for fine-grained synchronization control.
24 > *
25   * <p> This class may be applicable when many threads frequently
26   * update a common sum that is used for purposes such as collecting
27   * statistics. In this case, performance may be significantly faster
28   * than using a shared {@link AtomicLong}, at the expense of using
29 < * significantly more space.  On the other hand, if it is known that
30 < * only one thread can ever update the sum, performance may be
31 < * significantly slower than just updating a local variable.
29 > * more space.  On the other hand, if it is known that only one thread
30 > * can ever update the sum, performance may be significantly slower
31 > * than just updating a local variable.
32 > *
33 > * <p>A StripedAdder may optionally be constructed with a given
34 > * expected contention level; i.e., the number of threads that are
35 > * expected to concurrently update the sum. Supplying an accurate
36 > * value may improve performance by reducing the need for dynamic
37 > * adjustment.
38   *
39 < * @author Doug Lea
39 > * @author Doug Lea
40   */
41   public class StripedAdder implements Serializable {
42      private static final long serialVersionUID = 7249069246863182397L;
43  
44      /*
45 <     * Overview: We maintain a table of AtomicLongs (padded to reduce
46 <     * false sharing). The table is indexed by per-thread hash codes
47 <     * that are initialized as random values.  The table doubles in
48 <     * size upon contention (as indicated by failed CASes when
49 <     * performing add()), but is capped at the nearest power of two >=
50 <     * #cpus: At that point, contention should be infrequent if each
51 <     * thread has a unique index; so we instead adjust hash codes to
52 <     * new random values upon contention rather than expanding. A
53 <     * single spinlock is used for resizing the table as well as
54 <     * populating slots with new Adders. Upon lock contention, threads
55 <     * just try other slots rather than blocking. We guarantee that at
45 >     * A StripedAdder maintains a table of Atomic long variables. The
46 >     * table is indexed by per-thread hash codes.
47 >     *
48 >     * Table entries are of class Adder; a variant of AtomicLong
49 >     * padded to reduce cache contention on most processors. Padding
50 >     * is overkill for most Atomics because they are usually
51 >     * irregularly scattered in memory and thus don't interfere much
52 >     * with each other. But Atomic objects residing in arrays will
53 >     * tend to be placed adjacent to each other, and so will most
54 >     * often share cache lines (with a huge negative performance
55 >     * impact) without this precaution.
56 >     *
57 >     * Because Adders are relatively large, we avoid creating them
58 >     * until they are needed. On the other hand, we try to create them
59 >     * on any sign of contention.
60 >     *
61 >     * Per-thread hash codes are initialized to random values.
62 >     * Collisions are indicated by failed CASes when performing an add
63 >     * operation (see method retryAdd). Upon a collision, if the table
64 >     * size is less than the capacity, it is doubled in size unless
65 >     * some other thread holds lock. If a hashed slot is empty, and
66 >     * lock is available, a new Adder is created. Otherwise, if the
67 >     * slot exists, a CAS is tried.  Retries proceed by "double
68 >     * hashing", using a secondary hash (Marsaglia XorShift) to try to
69 >     * find a free slot.
70 >     *
71 >     * By default, the table is lazily initialized.  Upon first use,
72 >     * the table is set to size 1, and contains a single Adder. The
73 >     * maximum table size is bounded by nearest power of two >= the
74 >     * number of CPUS.  The table size is capped because, when there
75 >     * are more threads than CPUs, supposing that each thread were
76 >     * bound to a CPU, there would exist a perfect hash function
77 >     * mapping threads to slots that eliminates collisions. When we
78 >     * reach capacity, we search for this mapping by randomly varying
79 >     * the hash codes of colliding threads.  Because search is random,
80 >     * and failures only become known via CAS failures, convergence
81 >     * will be slow, and because threads are typically not bound to
82 >     * CPUS forever, may not occur at all. However, despite these
83 >     * limitations, observed contention is typically low in these
84 >     * cases.
85 >     *
86 >     * A single spinlock is used for resizing the table as well as
87 >     * populating slots with new Adders. After initialization, there
88 >     * is no need for a blocking lock: Upon lock contention, threads
89 >     * try other slots rather than blocking. After initialization, at
90       * least one slot exists, so retries will eventually find a
91 <     * candidate Adder.
91 >     * candidate Adder.  During these retries, there is increased
92 >     * contention and reduced locality, which is still better than
93 >     * alternatives.
94       */
95  
96 <    /**
56 <     * Number of processors, to place a cap on table growth.
57 <     */
58 <    static final int NCPU = Runtime.getRuntime().availableProcessors();
96 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
97  
98      /**
99 <     * Version of AtomicLong padded to avoid sharing cache
100 <     * lines on most processors
99 >     * Padded variant of AtomicLong.  The value field is placed
100 >     * between pads, hoping that the JVM doesn't reorder them.
101 >     * Updates are via inlined CAS in methods add and retryAdd.
102       */
103 <    static final class Adder extends AtomicLong {
104 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
105 <        Adder(long x) { super(x); }
103 >    static final class Adder {
104 >        volatile long p0, p1, p2, p3, p4, p5, p6;
105 >        volatile long value;
106 >        volatile long q0, q1, q2, q3, q4, q5, q6;
107 >        Adder(long x) { value = x; }
108      }
109  
110 <    /**
111 <     * Holder for the thread-local hash code.
110 >    /**
111 >     * Holder for the thread-local hash code. The code is initially
112 >     * random, but may be set to a different value upon collisions.
113       */
114      static final class HashCode {
115 +        static final Random rng = new Random();
116          int code;
117 <        HashCode(int h) { code = h; }
117 >        HashCode() {
118 >            int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119 >            code = (h == 0) ? 1 : h;
120 >        }
121      }
122  
123      /**
124       * The corresponding ThreadLocal class
125       */
126      static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 <        static final Random rng = new Random();
82 <        public HashCode initialValue() {
83 <            int h = rng.nextInt();
84 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
85 <        }
127 >        public HashCode initialValue() { return new HashCode(); }
128      }
129  
130      /**
131       * Static per-thread hash codes. Shared across all StripedAdders
132 <     * because adjustments due to collisions in one table are likely
133 <     * to be appropriate for others.
132 >     * to reduce ThreadLocal pollution and because adjustments due to
133 >     * collisions in one table are likely to be appropriate for
134 >     * others.
135       */
136      static final ThreadHashCode threadHashCode = new ThreadHashCode();
137  
138      /**
139 <     * Table of adders. Initially of size 2; grows to be at most NCPU.
139 >     * Table of adders. When non-null, size is a power of 2.
140       */
141      private transient volatile Adder[] adders;
142  
143      /**
144 <     * Serves as a lock when resizing and/or creating Adders.  There
102 <     * is no need for a blocking lock: When busy, other threads try
103 <     * other slots.
144 >     * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145       */
146 <    private final AtomicInteger mutex;
146 >    private volatile int busy;
147  
148      /**
149 <     * Marsaglia XorShift for rehashing on collisions
149 >     * Creates a new adder with zero sum.
150       */
151 <    private static int xorShift(int r) {
111 <        r ^= r << 13;
112 <        r ^= r >>> 17;
113 <        return r ^ (r << 5);
151 >    public StripedAdder() {
152      }
153  
154      /**
155 <     * Creates a new adder with initially zero sum.
155 >     * Creates a new adder with zero sum, and with stripes presized
156 >     * for the given expected contention level.
157 >     *
158 >     * @param expectedContention the expected number of threads that
159 >     * will concurrently update the sum.
160       */
161 <    public StripedAdder() {
162 <        Adder[] as = new Adder[2];
163 <        as[0] = new Adder(0); // ensure at least one available adder
161 >    public StripedAdder(int expectedContention) {
162 >        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 >        int size = 1;
164 >        while (size < cap)
165 >            size <<= 1;
166 >        Adder[] as = new Adder[size];
167 >        for (int i = 0; i < size; ++i)
168 >            as[i] = new Adder(0);
169          this.adders = as;
123        this.mutex = new AtomicInteger();
170      }
171  
172      /**
# Line 129 | Line 175 | public class StripedAdder implements Ser
175       * @param x the value to add
176       */
177      public void add(long x) {
178 +        Adder[] as; Adder a; int n;  // locals to hold volatile reads
179          HashCode hc = threadHashCode.get();
180 <        for (int h = hc.code;;) {
181 <            Adder[] as = adders;
182 <            int n = as.length;
183 <            Adder a = as[h & (n - 1)];
184 <            if (a != null) {
185 <                long v = a.get();
186 <                if (a.compareAndSet(v, v + x))
187 <                    break;
188 <                if (n >= NCPU) {                 // Collision when table at max
189 <                    h = hc.code = xorShift(h);   // change code
190 <                    continue;
180 >        int h = hc.code;
181 >        boolean contended;
182 >        if ((as = adders) != null && (n = as.length) > 0 &&
183 >            (a = as[(n - 1) & h]) != null) {
184 >            long v = a.value;
185 >            if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186 >                return;
187 >            contended = true;
188 >        }
189 >        else
190 >            contended = false;
191 >        retryAdd(x, hc, contended);
192 >    }
193 >
194 >    /**
195 >     * Handle cases of add involving initialization, resizing,
196 >     * creating new Adders, and/or contention. See above for
197 >     * explanation. This method suffers the usual non-modularity
198 >     * problems of optimistic retry code, relying on rechecked sets of
199 >     * reads.
200 >     *
201 >     * @param x the value to add
202 >     * @param hc the hash code holder
203 >     * @param precontended true if CAS failed before call
204 >     */
205 >    private void retryAdd(long x, HashCode hc, boolean precontended) {
206 >        int h = hc.code;
207 >        boolean collide = false; // true if last slot nonempty
208 >        for (;;) {
209 >            Adder[] as; Adder a; int n;
210 >            if ((as = adders) != null && (n = as.length) > 0) {
211 >                if ((a = as[(n - 1) & h]) == null) {
212 >                    if (busy == 0) {            // Try to attach new Adder
213 >                        Adder r = new Adder(x); // Optimistically create
214 >                        if (busy == 0 &&
215 >                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
216 >                            boolean created = false;
217 >                            try {               // Recheck under lock
218 >                                Adder[] rs; int m, j;
219 >                                if ((rs = adders) != null &&
220 >                                    (m = rs.length) > 0 &&
221 >                                    rs[j = (m - 1) & h] == null) {
222 >                                    rs[j] = r;
223 >                                    created = true;
224 >                                }
225 >                            } finally {
226 >                                busy = 0;
227 >                            }
228 >                            if (created)
229 >                                break;
230 >                            continue;           // Slot is now non-empty
231 >                        }
232 >                    }
233 >                    collide = false;
234 >                }
235 >                else if (precontended)          // CAS already known to fail
236 >                    precontended = false;       // Continue after rehash
237 >                else {
238 >                    long v = a.value;
239 >                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
240 >                        break;
241 >                    if (!collide)
242 >                        collide = true;
243 >                    else if (n >= NCPU || adders != as)
244 >                        collide = false;        // Can't expand
245 >                    else if (busy == 0 &&
246 >                             UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
247 >                        collide = false;
248 >                        try {
249 >                            if (adders == as) { // Expand table
250 >                                Adder[] rs = new Adder[n << 1];
251 >                                for (int i = 0; i < n; ++i)
252 >                                    rs[i] = as[i];
253 >                                adders = rs;
254 >                            }
255 >                        } finally {
256 >                            busy = 0;
257 >                        }
258 >                        continue;
259 >                    }
260                  }
261 +                h ^= h << 13;                   // Rehash
262 +                h ^= h >>> 17;
263 +                h ^= h << 5;
264              }
265 <            final AtomicInteger mutex = this.mutex;
266 <            if (mutex.get() != 0)
267 <                h = xorShift(h);                 // Try elsewhere
268 <            else if (mutex.compareAndSet(0, 1)) {
269 <                boolean created = false;
270 <                try {
271 <                    Adder[] rs = adders;
272 <                    if (a != null && rs == as)   // Resize table
273 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
274 <                    int j = h & (rs.length - 1);
275 <                    if (rs[j] == null) {         // Create adder
276 <                        rs[j] = new Adder(x);
277 <                        created = true;
265 >            else if (adders == as) {            // Try to default-initialize
266 >                Adder[] rs = new Adder[1];
267 >                rs[0] = new Adder(x);
268 >                boolean init = false;
269 >                while (adders == as) {
270 >                    if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 >                        try {
272 >                            if (adders == as) {
273 >                                adders = rs;
274 >                                init = true;
275 >                            }
276 >                        } finally {
277 >                            busy = 0;
278 >                        }
279 >                        break;
280                      }
281 <                } finally {
282 <                    mutex.set(0);
281 >                    if (adders != as)
282 >                        break;
283 >                    Thread.yield();              // Back off
284                  }
285 <                if (created) {
164 <                    hc.code = h;                // Use this adder next time
285 >                if (init)
286                      break;
166                }
287              }
288          }
289 <    }
170 <
171 <    /**
172 <     * Returns an estimate of the current sum.  The result is
173 <     * calculated by summing multiple variables, so may not be
174 <     * accurate if updates occur concurrently with this method.
175 <     *
176 <     * @return the estimated sum
177 <     */
178 <    public long sum() {
179 <        long sum = 0;
180 <        Adder[] as = adders;
181 <        int n = as.length;
182 <        for (int i = 0; i < n; ++i) {
183 <            Adder a = as[i];
184 <            if (a != null)
185 <                sum += a.get();
186 <        }
187 <        return sum;
188 <    }
189 <
190 <    /**
191 <     * Resets each of the variables to zero. This is effective in
192 <     * fully resetting the sum only if there are no concurrent
193 <     * updates.
194 <     */
195 <    public void reset() {
196 <        Adder[] as = adders;
197 <        int n = as.length;
198 <        for (int i = 0; i < n; ++i) {
199 <            Adder a = as[i];
200 <            if (a != null)
201 <                a.set(0L);
202 <        }
289 >        hc.code = h;                            // Record index for next time
290      }
291  
292      /**
# Line 217 | Line 304 | public class StripedAdder implements Ser
304      }
305  
306      /**
307 <     * Equivalent to {@link #sum} followed by {@link #reset}.
307 >     * Returns an estimate of the current sum.  The result is
308 >     * calculated by summing multiple variables, so may not be
309 >     * accurate if updates occur concurrently with this method.
310       *
311       * @return the estimated sum
312       */
313 <    public long sumAndReset() {
314 <        long sum = 0;
313 >    public long sum() {
314 >        long sum = 0L;
315          Adder[] as = adders;
316 <        int n = as.length;
317 <        for (int i = 0; i < n; ++i) {
318 <            Adder a = as[i];
319 <            if (a != null) {
320 <                sum += a.get();
321 <                a.set(0L);
316 >        if (as != null) {
317 >            int n = as.length;
318 >            for (int i = 0; i < n; ++i) {
319 >                Adder a = as[i];
320 >                if (a != null)
321 >                    sum += a.value;
322 >            }
323 >        }
324 >        return sum;
325 >    }
326 >
327 >    /**
328 >     * Resets each of the variables to zero, returning the estimated
329 >     * previous sum. This is effective in fully resetting the sum only
330 >     * if there are no concurrent updates.
331 >     *
332 >     * @return the estimated previous sum
333 >     */
334 >    public long reset() {
335 >        long sum = 0L;
336 >        Adder[] as = adders;
337 >        if (as != null) {
338 >            int n = as.length;
339 >            for (int i = 0; i < n; ++i) {
340 >                Adder a = as[i];
341 >                if (a != null) {
342 >                    sum += a.value;
343 >                    a.value = 0L;
344 >                }
345              }
346          }
347          return sum;
# Line 244 | Line 356 | public class StripedAdder implements Ser
356      private void readObject(ObjectInputStream s)
357          throws IOException, ClassNotFoundException {
358          s.defaultReadObject();
359 <        long c = s.readLong();
360 <        Adder[] as = new Adder[2];
249 <        as[0] = new Adder(c);
250 <        this.adders = as;
251 <        mutex.set(0);
359 >        busy = 0;
360 >        add(s.readLong());
361      }
362  
363 < }
363 >    // Unsafe mechanics
364 >    private static final sun.misc.Unsafe UNSAFE;
365 >    private static final long busyOffset;
366 >    private static final long valueOffset;
367 >    static {
368 >        try {
369 >            UNSAFE = getUnsafe();
370 >            Class<?> sk = StripedAdder.class;
371 >            busyOffset = UNSAFE.objectFieldOffset
372 >                (sk.getDeclaredField("busy"));
373 >            Class<?> ak = Adder.class;
374 >            valueOffset = UNSAFE.objectFieldOffset
375 >                (ak.getDeclaredField("value"));
376 >        } catch (Exception e) {
377 >            throw new Error(e);
378 >        }
379 >    }
380  
381 +    /**
382 +     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
383 +     * Replace with a simple call to Unsafe.getUnsafe when integrating
384 +     * into a jdk.
385 +     *
386 +     * @return a sun.misc.Unsafe
387 +     */
388 +    private static sun.misc.Unsafe getUnsafe() {
389 +        try {
390 +            return sun.misc.Unsafe.getUnsafe();
391 +        } catch (SecurityException se) {
392 +            try {
393 +                return java.security.AccessController.doPrivileged
394 +                    (new java.security
395 +                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
396 +                        public sun.misc.Unsafe run() throws Exception {
397 +                            java.lang.reflect.Field f = sun.misc
398 +                                .Unsafe.class.getDeclaredField("theUnsafe");
399 +                            f.setAccessible(true);
400 +                            return (sun.misc.Unsafe) f.get(null);
401 +                        }});
402 +            } catch (java.security.PrivilegedActionException e) {
403 +                throw new RuntimeException("Could not initialize intrinsics",
404 +                                           e.getCause());
405 +            }
406 +        }
407 +    }
408  
409 + }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines