ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.11 by dl, Fri Jul 29 14:23:35 2011 UTC vs.
Revision 1.12 by dl, Sat Jul 30 16:26:34 2011 UTC

# Line 14 | Line 14 | import java.io.ObjectInputStream;
14   import java.io.ObjectOutputStream;
15  
16   /**
17 < * A set of variables that together maintain a sum.  When updates
18 < * (method {@link #add}) are contended across threads, this set of
19 < * adder variables may grow dynamically to reduce contention. Method
20 < * {@link #sum} returns the current combined total across these
21 < * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22 < * updates may occur while the sum is being calculated), and so cannot
23 < * be used alone for fine-grained synchronization control.
17 > * One or more variables that together maintain an initially zero sum.
18 > * When updates (method {@link #add}) are contended across threads,
19 > * the set of variables may grow dynamically to reduce contention.
20   *
21 < * <p> This class may be applicable when many threads frequently
22 < * update a common sum that is used for purposes such as collecting
23 < * statistics. In this case, performance may be significantly faster
24 < * than using a shared {@link AtomicLong}, at the expense of using
25 < * more space.  On the other hand, if it is known that only one thread
26 < * can ever update the sum, performance may be significantly slower
21 > * <p> This class is usually preferable to {@link AtomicLong} when
22 > * multiple threads update a common sum that is used for purposes such
23 > * as collecting statistics, not for fine-grained synchronization
24 > * control.  Under high update contention, throughput of this class is
25 > * expected to be significantly higher, at the expense of higher space
26 > * consumption.  Under low contention, this class imposes very little
27 > * time and space overhead compared to AtomicLong.  On the other hand,
28 > * in contexts where it is statically known that only one thread can
29 > * ever update a sum, time and space overhead is noticeably greater
30   * than just updating a local variable.
31   *
32 < * <p>A StripedAdder may optionally be constructed with a given
33 < * expected contention level; i.e., the number of threads that are
34 < * expected to concurrently update the sum. Supplying an accurate
35 < * value may improve performance by reducing the need for dynamic
36 < * adjustment.
32 > * <p> Method {@link #sum} returns the current combined total across
33 > * the variables maintaining the sum.  This value is <em>NOT</em> an
34 > * atomic snapshot: Concurrent updates may occur while the sum is
35 > * being calculated.  However, updates cannot be "lost", so invocation
36 > * of <code>sum</code> in the absence of concurrent updates always
37 > * returns an accurate result.  The sum may also be <code>reset</code>
38 > * to zero, as an alternative to creating a new adder.  However,
39 > * method {@link #reset} is intrinsically racy, so should only be used
40 > * when it is known that no threads are concurrently updating the sum.
41 > *
42 > * <p><em>jsr166e note: This class is targeted to be placed in
43 > * java.util.concurrent.atomic<em>
44   *
45   * @author Doug Lea
46   */
# Line 42 | Line 48 | public class StripedAdder implements Ser
48      private static final long serialVersionUID = 7249069246863182397L;
49  
50      /*
51 <     * A StripedAdder maintains a table of Atomic long variables. The
52 <     * table is indexed by per-thread hash codes.
51 >     * A StripedAdder maintains a lazily-initialized table of
52 >     * atomically updated variables, plus an extra "base" field. The
53 >     * table size is a power of two. Indexing uses masked per-thread
54 >     * hash codes
55 >     *
56 >     * Table entries are of class Cell; a variant of AtomicLong padded
57 >     * to reduce cache contention on most processors. Padding is
58 >     * overkill for most Atomics because they are usually irregularly
59 >     * scattered in memory and thus don't interfere much with each
60 >     * other. But Atomic objects residing in arrays will tend to be
61 >     * placed adjacent to each other, and so will most often share
62 >     * cache lines (with a huge negative performance impact) without
63 >     * this precaution.
64       *
65 <     * Table entries are of class Adder; a variant of AtomicLong
66 <     * padded to reduce cache contention on most processors. Padding
67 <     * is overkill for most Atomics because they are usually
68 <     * irregularly scattered in memory and thus don't interfere much
69 <     * with each other. But Atomic objects residing in arrays will
70 <     * tend to be placed adjacent to each other, and so will most
71 <     * often share cache lines (with a huge negative performance
55 <     * impact) without this precaution.
56 <     *
57 <     * Because Adders are relatively large, we avoid creating them
58 <     * until they are needed. On the other hand, we try to create them
59 <     * on any sign of contention.
65 >     * In part because Cells are relatively large, we avoid creating
66 >     * them until they are needed.  When there is no contention, all
67 >     * updates are made to the base field.  Upon first contention (a
68 >     * failed CAS on base update), the table is initialized to size 2.
69 >     * The table size is doubled upon further contention until
70 >     * reaching the nearest power of two greater than or equal to the
71 >     * number of CPUS.
72       *
73       * Per-thread hash codes are initialized to random values.
74 <     * Collisions are indicated by failed CASes when performing an add
75 <     * operation (see method retryAdd). Upon a collision, if the table
76 <     * size is less than the capacity, it is doubled in size unless
77 <     * some other thread holds lock. If a hashed slot is empty, and
78 <     * lock is available, a new Adder is created. Otherwise, if the
79 <     * slot exists, a CAS is tried.  Retries proceed by "double
80 <     * hashing", using a secondary hash (Marsaglia XorShift) to try to
81 <     * find a free slot.
82 <     *
83 <     * By default, the table is lazily initialized.  Upon first use,
84 <     * the table is set to size 1, and contains a single Adder. The
85 <     * maximum table size is bounded by nearest power of two >= the
86 <     * number of CPUS.  The table size is capped because, when there
87 <     * are more threads than CPUs, supposing that each thread were
88 <     * bound to a CPU, there would exist a perfect hash function
89 <     * mapping threads to slots that eliminates collisions. When we
90 <     * reach capacity, we search for this mapping by randomly varying
91 <     * the hash codes of colliding threads.  Because search is random,
92 <     * and failures only become known via CAS failures, convergence
93 <     * will be slow, and because threads are typically not bound to
94 <     * CPUS forever, may not occur at all. However, despite these
95 <     * limitations, observed contention is typically low in these
96 <     * cases.
97 <     *
98 <     * A single spinlock is used for resizing the table as well as
99 <     * populating slots with new Adders. After initialization, there
100 <     * is no need for a blocking lock: Upon lock contention, threads
101 <     * try other slots rather than blocking. After initialization, at
102 <     * least one slot exists, so retries will eventually find a
103 <     * candidate Adder.  During these retries, there is increased
104 <     * contention and reduced locality, which is still better than
105 <     * alternatives.
74 >     * Contention and/or table collisions are indicated by failed
75 >     * CASes when performing an add operation (see method
76 >     * retryAdd). Upon a collision, if the table size is less than the
77 >     * capacity, it is doubled in size unless some other thread holds
78 >     * the lock. If a hashed slot is empty, and lock is available, a
79 >     * new Cell is created. Otherwise, if the slot exists, a CAS is
80 >     * tried.  Retries proceed by "double hashing", using a secondary
81 >     * hash (Marsaglia XorShift) to try to find a free slot.
82 >     *
83 >     * The table size is capped because, when there are more threads
84 >     * than CPUs, supposing that each thread were bound to a CPU,
85 >     * there would exist a perfect hash function mapping threads to
86 >     * slots that eliminates collisions. When we reach capacity, we
87 >     * search for this mapping by randomly varying the hash codes of
88 >     * colliding threads.  Because search is random, and collisions
89 >     * only become known via CAS failures, convergence can be slow,
90 >     * and because threads are typically not bound to CPUS forever,
91 >     * may not occur at all. However, despite these limitations,
92 >     * observed contention rates are typically low in these cases.
93 >     *
94 >     * A single spinlock is used for initializing and resizing the
95 >     * table, as well as populating slots with new Cells.  There is no
96 >     * need for a blocking lock: Upon lock contention, threads try
97 >     * other slots (or the base) rather than blocking.  During these
98 >     * retries, there is increased contention and reduced locality,
99 >     * which is still better than alternatives.
100 >     *
101 >     * It is possible for a Cell to become unused when threads that
102 >     * once hashed to it terminate, as well as in the case where
103 >     * doubling the table causes no thread to hash to it under
104 >     * expanded mask.  We do not try to detect or remove such cells,
105 >     * under the assumption that for long-running adders, observed
106 >     * contention levels will recur, so the cells will eventually be
107 >     * needed again; and for short-lived ones, it does not matter.
108 >     *
109       */
110  
111      private static final int NCPU = Runtime.getRuntime().availableProcessors();
# Line 100 | Line 115 | public class StripedAdder implements Ser
115       * between pads, hoping that the JVM doesn't reorder them.
116       * Updates are via inlined CAS in methods add and retryAdd.
117       */
118 <    static final class Adder {
118 >    static final class Cell {
119          volatile long p0, p1, p2, p3, p4, p5, p6;
120          volatile long value;
121          volatile long q0, q1, q2, q3, q4, q5, q6;
122 <        Adder(long x) { value = x; }
122 >        Cell(long x) { value = x; }
123      }
124  
125      /**
# Line 115 | Line 130 | public class StripedAdder implements Ser
130          static final Random rng = new Random();
131          int code;
132          HashCode() {
133 <            int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
133 >            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134              code = (h == 0) ? 1 : h;
135          }
136      }
# Line 136 | Line 151 | public class StripedAdder implements Ser
151      static final ThreadHashCode threadHashCode = new ThreadHashCode();
152  
153      /**
154 <     * Table of adders. When non-null, size is a power of 2.
154 >     * Table of cells. When non-null, size is a power of 2.
155       */
156 <    private transient volatile Adder[] adders;
156 >    private transient volatile Cell[] cells;
157  
158      /**
159 <     * Spinlock (locked via CAS) used when resizing and/or creating Adders.
159 >     * Base sum, used mainly when there is no contention, but also as
160 >     * a fallback during table initializion races. Updated via CAS.
161       */
162 <    private volatile int busy;
162 >    private transient volatile long base;
163  
164      /**
165 <     * Creates a new adder with zero sum.
165 >     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166       */
167 <    public StripedAdder() {
152 <    }
167 >    private transient volatile int busy;
168  
169      /**
170 <     * Creates a new adder with zero sum, and with stripes presized
156 <     * for the given expected contention level.
157 <     *
158 <     * @param expectedContention the expected number of threads that
159 <     * will concurrently update the sum.
170 >     * Creates a new adder with initial sum of zero.
171       */
172 <    public StripedAdder(int expectedContention) {
162 <        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 <        int size = 1;
164 <        while (size < cap)
165 <            size <<= 1;
166 <        Adder[] as = new Adder[size];
167 <        for (int i = 0; i < size; ++i)
168 <            as[i] = new Adder(0);
169 <        this.adders = as;
172 >    public StripedAdder() {
173      }
174  
175      /**
# Line 175 | Line 178 | public class StripedAdder implements Ser
178       * @param x the value to add
179       */
180      public void add(long x) {
181 <        Adder[] as; Adder a; int n;  // locals to hold volatile reads
182 <        HashCode hc = threadHashCode.get();
183 <        int h = hc.code;
184 <        boolean contended;
185 <        if ((as = adders) != null && (n = as.length) > 0 &&
186 <            (a = as[(n - 1) & h]) != null) {
187 <            long v = a.value;
188 <            if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
189 <                return;
190 <            contended = true;
181 >        Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182 >        if ((as = cells) != null ||
183 >            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184 >            int h = (hc = threadHashCode.get()).code;
185 >            if (as != null && (n = as.length) > 0 &&
186 >                (a = as[(n - 1) & h]) != null) {
187 >                if (UNSAFE.compareAndSwapLong(a, valueOffset,
188 >                                              v = a.value, v + x))
189 >                    return;
190 >                contended = true;
191 >            }
192 >            else
193 >                contended = false;
194 >            retryAdd(x, hc, contended);
195          }
189        else
190            contended = false;
191        retryAdd(x, hc, contended);
196      }
197  
198      /**
199       * Handle cases of add involving initialization, resizing,
200 <     * creating new Adders, and/or contention. See above for
200 >     * creating new Cells, and/or contention. See above for
201       * explanation. This method suffers the usual non-modularity
202       * problems of optimistic retry code, relying on rechecked sets of
203       * reads.
# Line 206 | Line 210 | public class StripedAdder implements Ser
210          int h = hc.code;
211          boolean collide = false; // true if last slot nonempty
212          for (;;) {
213 <            Adder[] as; Adder a; int n;
214 <            if ((as = adders) != null && (n = as.length) > 0) {
213 >            Cell[] as; Cell a; int n;
214 >            if ((as = cells) != null && (n = as.length) > 0) {
215                  if ((a = as[(n - 1) & h]) == null) {
216 <                    if (busy == 0) {            // Try to attach new Adder
217 <                        Adder r = new Adder(x); // Optimistically create
216 >                    if (busy == 0) {            // Try to attach new Cell
217 >                        Cell r = new Cell(x);   // Optimistically create
218                          if (busy == 0 &&
219                              UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220                              boolean created = false;
221                              try {               // Recheck under lock
222 <                                Adder[] rs; int m, j;
223 <                                if ((rs = adders) != null &&
222 >                                Cell[] rs; int m, j;
223 >                                if ((rs = cells) != null &&
224                                      (m = rs.length) > 0 &&
225                                      rs[j = (m - 1) & h] == null) {
226                                      rs[j] = r;
# Line 240 | Line 244 | public class StripedAdder implements Ser
244                          break;
245                      if (!collide)
246                          collide = true;
247 <                    else if (n >= NCPU || adders != as)
247 >                    else if (n >= NCPU || cells != as)
248                          collide = false;        // Can't expand
249                      else if (busy == 0 &&
250                               UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251                          collide = false;
252                          try {
253 <                            if (adders == as) { // Expand table
254 <                                Adder[] rs = new Adder[n << 1];
253 >                            if (cells == as) { // Expand table
254 >                                Cell[] rs = new Cell[n << 1];
255                                  for (int i = 0; i < n; ++i)
256                                      rs[i] = as[i];
257 <                                adders = rs;
257 >                                cells = rs;
258                              }
259                          } finally {
260                              busy = 0;
# Line 262 | Line 266 | public class StripedAdder implements Ser
266                  h ^= h >>> 17;
267                  h ^= h << 5;
268              }
269 <            else if (adders == as) {            // Try to default-initialize
270 <                Adder[] rs = new Adder[1];
267 <                rs[0] = new Adder(x);
269 >            else if (busy == 0 && cells == as &&
270 >                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271                  boolean init = false;
272 <                while (adders == as) {
273 <                    if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
274 <                        try {
275 <                            if (adders == as) {
276 <                                adders = rs;
277 <                                init = true;
278 <                            }
276 <                        } finally {
277 <                            busy = 0;
278 <                        }
279 <                        break;
272 >                try {                           // Initialize
273 >                    if (cells == as) {
274 >                        Cell r = new Cell(x);
275 >                        Cell[] rs = new Cell[2];
276 >                        rs[h & 1] = r;
277 >                        cells = rs;
278 >                        init = true;
279                      }
280 <                    if (adders != as)
281 <                        break;
283 <                    Thread.yield();              // Back off
280 >                } finally {
281 >                    busy = 0;
282                  }
283                  if (init)
284                      break;
285              }
286 +            else {                              // Lost initialization race
287 +                long b = base;                  // Fall back on using base
288 +                if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289 +                    break;
290 +            }
291          }
292          hc.code = h;                            // Record index for next time
293      }
# Line 304 | Line 307 | public class StripedAdder implements Ser
307      }
308  
309      /**
310 <     * Returns an estimate of the current sum.  The result is
311 <     * calculated by summing multiple variables, so may not be
312 <     * accurate if updates occur concurrently with this method.
310 >     * Returns the current sum.  The result is only guaranteed to be
311 >     * accurate in the absence of concurrent updates. Otherwise, it
312 >     * may fail to reflect one or more updates occuring while
313 >     * calculating the result.
314       *
315 <     * @return the estimated sum
315 >     * @return the sum
316       */
317      public long sum() {
318 <        long sum = 0L;
319 <        Adder[] as = adders;
318 >        Cell[] as = cells;
319 >        long sum = base;
320          if (as != null) {
321              int n = as.length;
322              for (int i = 0; i < n; ++i) {
323 <                Adder a = as[i];
323 >                Cell a = as[i];
324                  if (a != null)
325                      sum += a.value;
326              }
# Line 325 | Line 329 | public class StripedAdder implements Ser
329      }
330  
331      /**
332 <     * Resets each of the variables to zero, returning the estimated
333 <     * previous sum. This is effective in fully resetting the sum only
334 <     * if there are no concurrent updates.
335 <     *
336 <     * @return the estimated previous sum
337 <     */
338 <    public long reset() {
339 <        long sum = 0L;
340 <        Adder[] as = adders;
332 >     * Resets variables maintaining the sum to zero.  This is
333 >     * effective in setting the sum to zero only if there are no
334 >     * concurrent updates.
335 >     */
336 >    public void reset() {
337 >        Cell[] as = cells;
338 >        base = 0L;
339 >        if (as != null) {
340 >            int n = as.length;
341 >            for (int i = 0; i < n; ++i) {
342 >                Cell a = as[i];
343 >                if (a != null)
344 >                    a.value = 0L;
345 >            }
346 >        }
347 >    }
348 >
349 >    /**
350 >     * Equivalent in effect to {@link #sum} followed by {@link
351 >     * #reset}. This method may apply for example during quiescent
352 >     * points between multithreaded computations.  If there are
353 >     * updates concurrent with this method, the returned value is
354 >     * <em>not</em> guaranteed to be the final sum occurring before
355 >     * the reset.
356 >     *
357 >     * @return the sum
358 >     */
359 >    public long sumThenReset() {
360 >        Cell[] as = cells;
361 >        long sum = base;
362 >        base = 0L;
363          if (as != null) {
364              int n = as.length;
365              for (int i = 0; i < n; ++i) {
366 <                Adder a = as[i];
366 >                Cell a = as[i];
367                  if (a != null) {
368                      sum += a.value;
369                      a.value = 0L;
# Line 357 | Line 383 | public class StripedAdder implements Ser
383          throws IOException, ClassNotFoundException {
384          s.defaultReadObject();
385          busy = 0;
386 <        add(s.readLong());
386 >        cells = null;
387 >        base = s.readLong();
388      }
389  
390      // Unsafe mechanics
391      private static final sun.misc.Unsafe UNSAFE;
392 +    private static final long baseOffset;
393      private static final long busyOffset;
394      private static final long valueOffset;
395      static {
396          try {
397              UNSAFE = getUnsafe();
398              Class<?> sk = StripedAdder.class;
399 +            baseOffset = UNSAFE.objectFieldOffset
400 +                (sk.getDeclaredField("base"));
401              busyOffset = UNSAFE.objectFieldOffset
402                  (sk.getDeclaredField("busy"));
403 <            Class<?> ak = Adder.class;
403 >            Class<?> ak = Cell.class;
404              valueOffset = UNSAFE.objectFieldOffset
405                  (ak.getDeclaredField("value"));
406          } catch (Exception e) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines