ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.2 by jsr166, Wed Jul 20 16:06:19 2011 UTC vs.
Revision 1.12 by dl, Sat Jul 30 16:26:34 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 15 | Line 14 | import java.io.ObjectInputStream;
14   import java.io.ObjectOutputStream;
15  
16   /**
17 < * A set of variables that together maintain a sum.  When updates
18 < * (method {@link #add}) are contended across threads, the set of
19 < * adders may grow to reduce contention. Method {@link #sum} returns
21 < * the current combined total across these adders. This value is
22 < * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
23 < * the sum is being calculated), and so cannot be used alone for
24 < * fine-grained synchronization control.
17 > * One or more variables that together maintain an initially zero sum.
18 > * When updates (method {@link #add}) are contended across threads,
19 > * the set of variables may grow dynamically to reduce contention.
20   *
21 < * <p> This class may be applicable when many threads frequently
22 < * update a common sum that is used for purposes such as collecting
23 < * statistics. In this case, performance may be significantly faster
24 < * than using a shared {@link AtomicLong}, at the expense of using
25 < * significantly more space.  On the other hand, if it is known that
26 < * only one thread can ever update the sum, performance may be
27 < * significantly slower than just updating a local variable.
21 > * <p> This class is usually preferable to {@link AtomicLong} when
22 > * multiple threads update a common sum that is used for purposes such
23 > * as collecting statistics, not for fine-grained synchronization
24 > * control.  Under high update contention, throughput of this class is
25 > * expected to be significantly higher, at the expense of higher space
26 > * consumption.  Under low contention, this class imposes very little
27 > * time and space overhead compared to AtomicLong.  On the other hand,
28 > * in contexts where it is statically known that only one thread can
29 > * ever update a sum, time and space overhead is noticeably greater
30 > * than just updating a local variable.
31 > *
32 > * <p> Method {@link #sum} returns the current combined total across
33 > * the variables maintaining the sum.  This value is <em>NOT</em> an
34 > * atomic snapshot: Concurrent updates may occur while the sum is
35 > * being calculated.  However, updates cannot be "lost", so invocation
36 > * of <code>sum</code> in the absence of concurrent updates always
37 > * returns an accurate result.  The sum may also be <code>reset</code>
38 > * to zero, as an alternative to creating a new adder.  However,
39 > * method {@link #reset} is intrinsically racy, so should only be used
40 > * when it is known that no threads are concurrently updating the sum.
41 > *
42 > * <p><em>jsr166e note: This class is targeted to be placed in
43 > * java.util.concurrent.atomic<em>
44   *
45   * @author Doug Lea
46   */
# Line 37 | Line 48 | public class StripedAdder implements Ser
48      private static final long serialVersionUID = 7249069246863182397L;
49  
50      /*
51 <     * Overview: We maintain a table of AtomicLongs (padded to reduce
52 <     * false sharing). The table is indexed by per-thread hash codes
53 <     * that are initialized as random values.  The table doubles in
54 <     * size upon contention (as indicated by failed CASes when
55 <     * performing add()), but is capped at the nearest power of two >=
56 <     * #cpus: At that point, contention should be infrequent if each
57 <     * thread has a unique index; so we instead adjust hash codes to
58 <     * new random values upon contention rather than expanding. A
59 <     * single spinlock is used for resizing the table as well as
60 <     * populating slots with new Adders. Upon lock contention, threads
61 <     * just try other slots rather than blocking. We guarantee that at
62 <     * least one slot exists, so retries will eventually find a
63 <     * candidate Adder.
51 >     * A StripedAdder maintains a lazily-initialized table of
52 >     * atomically updated variables, plus an extra "base" field. The
53 >     * table size is a power of two. Indexing uses masked per-thread
54 >     * hash codes
55 >     *
56 >     * Table entries are of class Cell; a variant of AtomicLong padded
57 >     * to reduce cache contention on most processors. Padding is
58 >     * overkill for most Atomics because they are usually irregularly
59 >     * scattered in memory and thus don't interfere much with each
60 >     * other. But Atomic objects residing in arrays will tend to be
61 >     * placed adjacent to each other, and so will most often share
62 >     * cache lines (with a huge negative performance impact) without
63 >     * this precaution.
64 >     *
65 >     * In part because Cells are relatively large, we avoid creating
66 >     * them until they are needed.  When there is no contention, all
67 >     * updates are made to the base field.  Upon first contention (a
68 >     * failed CAS on base update), the table is initialized to size 2.
69 >     * The table size is doubled upon further contention until
70 >     * reaching the nearest power of two greater than or equal to the
71 >     * number of CPUS.
72 >     *
73 >     * Per-thread hash codes are initialized to random values.
74 >     * Contention and/or table collisions are indicated by failed
75 >     * CASes when performing an add operation (see method
76 >     * retryAdd). Upon a collision, if the table size is less than the
77 >     * capacity, it is doubled in size unless some other thread holds
78 >     * the lock. If a hashed slot is empty, and lock is available, a
79 >     * new Cell is created. Otherwise, if the slot exists, a CAS is
80 >     * tried.  Retries proceed by "double hashing", using a secondary
81 >     * hash (Marsaglia XorShift) to try to find a free slot.
82 >     *
83 >     * The table size is capped because, when there are more threads
84 >     * than CPUs, supposing that each thread were bound to a CPU,
85 >     * there would exist a perfect hash function mapping threads to
86 >     * slots that eliminates collisions. When we reach capacity, we
87 >     * search for this mapping by randomly varying the hash codes of
88 >     * colliding threads.  Because search is random, and collisions
89 >     * only become known via CAS failures, convergence can be slow,
90 >     * and because threads are typically not bound to CPUS forever,
91 >     * may not occur at all. However, despite these limitations,
92 >     * observed contention rates are typically low in these cases.
93 >     *
94 >     * A single spinlock is used for initializing and resizing the
95 >     * table, as well as populating slots with new Cells.  There is no
96 >     * need for a blocking lock: Upon lock contention, threads try
97 >     * other slots (or the base) rather than blocking.  During these
98 >     * retries, there is increased contention and reduced locality,
99 >     * which is still better than alternatives.
100 >     *
101 >     * It is possible for a Cell to become unused when threads that
102 >     * once hashed to it terminate, as well as in the case where
103 >     * doubling the table causes no thread to hash to it under
104 >     * expanded mask.  We do not try to detect or remove such cells,
105 >     * under the assumption that for long-running adders, observed
106 >     * contention levels will recur, so the cells will eventually be
107 >     * needed again; and for short-lived ones, it does not matter.
108 >     *
109       */
110  
111 <    /**
56 <     * Number of processors, to place a cap on table growth.
57 <     */
58 <    static final int NCPU = Runtime.getRuntime().availableProcessors();
111 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
112  
113      /**
114 <     * Version of AtomicLong padded to avoid sharing cache
115 <     * lines on most processors
114 >     * Padded variant of AtomicLong.  The value field is placed
115 >     * between pads, hoping that the JVM doesn't reorder them.
116 >     * Updates are via inlined CAS in methods add and retryAdd.
117       */
118 <    static final class Adder extends AtomicLong {
119 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
120 <        Adder(long x) { super(x); }
118 >    static final class Cell {
119 >        volatile long p0, p1, p2, p3, p4, p5, p6;
120 >        volatile long value;
121 >        volatile long q0, q1, q2, q3, q4, q5, q6;
122 >        Cell(long x) { value = x; }
123      }
124  
125      /**
126 <     * Holder for the thread-local hash code.
126 >     * Holder for the thread-local hash code. The code is initially
127 >     * random, but may be set to a different value upon collisions.
128       */
129      static final class HashCode {
130 +        static final Random rng = new Random();
131          int code;
132 <        HashCode(int h) { code = h; }
132 >        HashCode() {
133 >            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134 >            code = (h == 0) ? 1 : h;
135 >        }
136      }
137  
138      /**
139       * The corresponding ThreadLocal class
140       */
141      static final class ThreadHashCode extends ThreadLocal<HashCode> {
142 <        static final Random rng = new Random();
82 <        public HashCode initialValue() {
83 <            int h = rng.nextInt();
84 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
85 <        }
142 >        public HashCode initialValue() { return new HashCode(); }
143      }
144  
145      /**
146       * Static per-thread hash codes. Shared across all StripedAdders
147 <     * because adjustments due to collisions in one table are likely
148 <     * to be appropriate for others.
147 >     * to reduce ThreadLocal pollution and because adjustments due to
148 >     * collisions in one table are likely to be appropriate for
149 >     * others.
150       */
151      static final ThreadHashCode threadHashCode = new ThreadHashCode();
152  
153      /**
154 <     * Table of adders. Initially of size 2; grows to be at most NCPU.
154 >     * Table of cells. When non-null, size is a power of 2.
155       */
156 <    private transient volatile Adder[] adders;
156 >    private transient volatile Cell[] cells;
157  
158      /**
159 <     * Serves as a lock when resizing and/or creating Adders.  There
160 <     * is no need for a blocking lock: When busy, other threads try
103 <     * other slots.
159 >     * Base sum, used mainly when there is no contention, but also as
160 >     * a fallback during table initializion races. Updated via CAS.
161       */
162 <    private final AtomicInteger mutex;
162 >    private transient volatile long base;
163  
164      /**
165 <     * Marsaglia XorShift for rehashing on collisions
165 >     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166       */
167 <    private static int xorShift(int r) {
111 <        r ^= r << 13;
112 <        r ^= r >>> 17;
113 <        return r ^ (r << 5);
114 <    }
167 >    private transient volatile int busy;
168  
169      /**
170 <     * Creates a new adder with initially zero sum.
170 >     * Creates a new adder with initial sum of zero.
171       */
172      public StripedAdder() {
120        Adder[] as = new Adder[2];
121        as[0] = new Adder(0); // ensure at least one available adder
122        this.adders = as;
123        this.mutex = new AtomicInteger();
173      }
174  
175      /**
# Line 129 | Line 178 | public class StripedAdder implements Ser
178       * @param x the value to add
179       */
180      public void add(long x) {
181 <        HashCode hc = threadHashCode.get();
182 <        for (int h = hc.code;;) {
183 <            Adder[] as = adders;
184 <            int n = as.length;
185 <            Adder a = as[h & (n - 1)];
186 <            if (a != null) {
187 <                long v = a.get();
188 <                if (a.compareAndSet(v, v + x))
189 <                    break;
190 <                if (n >= NCPU) {                 // Collision when table at max
191 <                    h = hc.code = xorShift(h);   // change code
192 <                    continue;
181 >        Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182 >        if ((as = cells) != null ||
183 >            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184 >            int h = (hc = threadHashCode.get()).code;
185 >            if (as != null && (n = as.length) > 0 &&
186 >                (a = as[(n - 1) & h]) != null) {
187 >                if (UNSAFE.compareAndSwapLong(a, valueOffset,
188 >                                              v = a.value, v + x))
189 >                    return;
190 >                contended = true;
191 >            }
192 >            else
193 >                contended = false;
194 >            retryAdd(x, hc, contended);
195 >        }
196 >    }
197 >
198 >    /**
199 >     * Handle cases of add involving initialization, resizing,
200 >     * creating new Cells, and/or contention. See above for
201 >     * explanation. This method suffers the usual non-modularity
202 >     * problems of optimistic retry code, relying on rechecked sets of
203 >     * reads.
204 >     *
205 >     * @param x the value to add
206 >     * @param hc the hash code holder
207 >     * @param precontended true if CAS failed before call
208 >     */
209 >    private void retryAdd(long x, HashCode hc, boolean precontended) {
210 >        int h = hc.code;
211 >        boolean collide = false; // true if last slot nonempty
212 >        for (;;) {
213 >            Cell[] as; Cell a; int n;
214 >            if ((as = cells) != null && (n = as.length) > 0) {
215 >                if ((a = as[(n - 1) & h]) == null) {
216 >                    if (busy == 0) {            // Try to attach new Cell
217 >                        Cell r = new Cell(x);   // Optimistically create
218 >                        if (busy == 0 &&
219 >                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220 >                            boolean created = false;
221 >                            try {               // Recheck under lock
222 >                                Cell[] rs; int m, j;
223 >                                if ((rs = cells) != null &&
224 >                                    (m = rs.length) > 0 &&
225 >                                    rs[j = (m - 1) & h] == null) {
226 >                                    rs[j] = r;
227 >                                    created = true;
228 >                                }
229 >                            } finally {
230 >                                busy = 0;
231 >                            }
232 >                            if (created)
233 >                                break;
234 >                            continue;           // Slot is now non-empty
235 >                        }
236 >                    }
237 >                    collide = false;
238                  }
239 +                else if (precontended)          // CAS already known to fail
240 +                    precontended = false;       // Continue after rehash
241 +                else {
242 +                    long v = a.value;
243 +                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
244 +                        break;
245 +                    if (!collide)
246 +                        collide = true;
247 +                    else if (n >= NCPU || cells != as)
248 +                        collide = false;        // Can't expand
249 +                    else if (busy == 0 &&
250 +                             UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251 +                        collide = false;
252 +                        try {
253 +                            if (cells == as) { // Expand table
254 +                                Cell[] rs = new Cell[n << 1];
255 +                                for (int i = 0; i < n; ++i)
256 +                                    rs[i] = as[i];
257 +                                cells = rs;
258 +                            }
259 +                        } finally {
260 +                            busy = 0;
261 +                        }
262 +                        continue;
263 +                    }
264 +                }
265 +                h ^= h << 13;                   // Rehash
266 +                h ^= h >>> 17;
267 +                h ^= h << 5;
268              }
269 <            final AtomicInteger mutex = this.mutex;
270 <            if (mutex.get() != 0)
271 <                h = xorShift(h);                 // Try elsewhere
272 <            else if (mutex.compareAndSet(0, 1)) {
273 <                boolean created = false;
274 <                try {
275 <                    Adder[] rs = adders;
276 <                    if (a != null && rs == as)   // Resize table
277 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
278 <                    int j = h & (rs.length - 1);
156 <                    if (rs[j] == null) {         // Create adder
157 <                        rs[j] = new Adder(x);
158 <                        created = true;
269 >            else if (busy == 0 && cells == as &&
270 >                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 >                boolean init = false;
272 >                try {                           // Initialize
273 >                    if (cells == as) {
274 >                        Cell r = new Cell(x);
275 >                        Cell[] rs = new Cell[2];
276 >                        rs[h & 1] = r;
277 >                        cells = rs;
278 >                        init = true;
279                      }
280                  } finally {
281 <                    mutex.set(0);
281 >                    busy = 0;
282                  }
283 <                if (created) {
284 <                    hc.code = h;                // Use this adder next time
283 >                if (init)
284 >                    break;
285 >            }
286 >            else {                              // Lost initialization race
287 >                long b = base;                  // Fall back on using base
288 >                if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289                      break;
166                }
290              }
291          }
292 +        hc.code = h;                            // Record index for next time
293      }
294  
295      /**
296 <     * Returns an estimate of the current sum.  The result is
173 <     * calculated by summing multiple variables, so may not be
174 <     * accurate if updates occur concurrently with this method.
175 <     *
176 <     * @return the estimated sum
296 >     * Equivalent to {@code add(1)}.
297       */
298 <    public long sum() {
299 <        long sum = 0;
180 <        Adder[] as = adders;
181 <        int n = as.length;
182 <        for (int i = 0; i < n; ++i) {
183 <            Adder a = as[i];
184 <            if (a != null)
185 <                sum += a.get();
186 <        }
187 <        return sum;
298 >    public void increment() {
299 >        add(1L);
300      }
301  
302      /**
303 <     * Resets each of the variables to zero. This is effective in
192 <     * fully resetting the sum only if there are no concurrent
193 <     * updates.
303 >     * Equivalent to {@code add(-1)}.
304       */
305 <    public void reset() {
306 <        Adder[] as = adders;
197 <        int n = as.length;
198 <        for (int i = 0; i < n; ++i) {
199 <            Adder a = as[i];
200 <            if (a != null)
201 <                a.set(0L);
202 <        }
305 >    public void decrement() {
306 >        add(-1L);
307      }
308  
309      /**
310 <     * Equivalent to {@code add(1)}.
310 >     * Returns the current sum.  The result is only guaranteed to be
311 >     * accurate in the absence of concurrent updates. Otherwise, it
312 >     * may fail to reflect one or more updates occuring while
313 >     * calculating the result.
314 >     *
315 >     * @return the sum
316       */
317 <    public void increment() {
318 <        add(1L);
317 >    public long sum() {
318 >        Cell[] as = cells;
319 >        long sum = base;
320 >        if (as != null) {
321 >            int n = as.length;
322 >            for (int i = 0; i < n; ++i) {
323 >                Cell a = as[i];
324 >                if (a != null)
325 >                    sum += a.value;
326 >            }
327 >        }
328 >        return sum;
329      }
330  
331      /**
332 <     * Equivalent to {@code add(-1)}.
332 >     * Resets variables maintaining the sum to zero.  This is
333 >     * effective in setting the sum to zero only if there are no
334 >     * concurrent updates.
335       */
336 <    public void decrement() {
337 <        add(-1L);
336 >    public void reset() {
337 >        Cell[] as = cells;
338 >        base = 0L;
339 >        if (as != null) {
340 >            int n = as.length;
341 >            for (int i = 0; i < n; ++i) {
342 >                Cell a = as[i];
343 >                if (a != null)
344 >                    a.value = 0L;
345 >            }
346 >        }
347      }
348  
349      /**
350 <     * Equivalent to {@link #sum} followed by {@link #reset}.
350 >     * Equivalent in effect to {@link #sum} followed by {@link
351 >     * #reset}. This method may apply for example during quiescent
352 >     * points between multithreaded computations.  If there are
353 >     * updates concurrent with this method, the returned value is
354 >     * <em>not</em> guaranteed to be the final sum occurring before
355 >     * the reset.
356       *
357 <     * @return the estimated sum
357 >     * @return the sum
358       */
359 <    public long sumAndReset() {
360 <        long sum = 0;
361 <        Adder[] as = adders;
362 <        int n = as.length;
363 <        for (int i = 0; i < n; ++i) {
364 <            Adder a = as[i];
365 <            if (a != null) {
366 <                sum += a.get();
367 <                a.set(0L);
359 >    public long sumThenReset() {
360 >        Cell[] as = cells;
361 >        long sum = base;
362 >        base = 0L;
363 >        if (as != null) {
364 >            int n = as.length;
365 >            for (int i = 0; i < n; ++i) {
366 >                Cell a = as[i];
367 >                if (a != null) {
368 >                    sum += a.value;
369 >                    a.value = 0L;
370 >                }
371              }
372          }
373          return sum;
# Line 244 | Line 382 | public class StripedAdder implements Ser
382      private void readObject(ObjectInputStream s)
383          throws IOException, ClassNotFoundException {
384          s.defaultReadObject();
385 <        long c = s.readLong();
386 <        Adder[] as = new Adder[2];
387 <        as[0] = new Adder(c);
388 <        this.adders = as;
389 <        mutex.set(0);
385 >        busy = 0;
386 >        cells = null;
387 >        base = s.readLong();
388 >    }
389 >
390 >    // Unsafe mechanics
391 >    private static final sun.misc.Unsafe UNSAFE;
392 >    private static final long baseOffset;
393 >    private static final long busyOffset;
394 >    private static final long valueOffset;
395 >    static {
396 >        try {
397 >            UNSAFE = getUnsafe();
398 >            Class<?> sk = StripedAdder.class;
399 >            baseOffset = UNSAFE.objectFieldOffset
400 >                (sk.getDeclaredField("base"));
401 >            busyOffset = UNSAFE.objectFieldOffset
402 >                (sk.getDeclaredField("busy"));
403 >            Class<?> ak = Cell.class;
404 >            valueOffset = UNSAFE.objectFieldOffset
405 >                (ak.getDeclaredField("value"));
406 >        } catch (Exception e) {
407 >            throw new Error(e);
408 >        }
409      }
410  
411 < }
412 <
411 >    /**
412 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
413 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
414 >     * into a jdk.
415 >     *
416 >     * @return a sun.misc.Unsafe
417 >     */
418 >    private static sun.misc.Unsafe getUnsafe() {
419 >        try {
420 >            return sun.misc.Unsafe.getUnsafe();
421 >        } catch (SecurityException se) {
422 >            try {
423 >                return java.security.AccessController.doPrivileged
424 >                    (new java.security
425 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
426 >                        public sun.misc.Unsafe run() throws Exception {
427 >                            java.lang.reflect.Field f = sun.misc
428 >                                .Unsafe.class.getDeclaredField("theUnsafe");
429 >                            f.setAccessible(true);
430 >                            return (sun.misc.Unsafe) f.get(null);
431 >                        }});
432 >            } catch (java.security.PrivilegedActionException e) {
433 >                throw new RuntimeException("Could not initialize intrinsics",
434 >                                           e.getCause());
435 >            }
436 >        }
437 >    }
438  
439 + }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines