ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.5 by dl, Sun Jul 24 15:08:21 2011 UTC vs.
Revision 1.9 by dl, Thu Jul 28 19:27:07 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 44 | Line 43 | public class StripedAdder implements Ser
43  
44      /*
45       * A StripedAdder maintains a table of Atomic long variables. The
46 <     * table is indexed by per-thread hash codes that are initialized
48 <     * to random values.
46 >     * table is indexed by per-thread hash codes.
47       *
48 <     * The table doubles in size upon contention (as indicated by
49 <     * failed CASes when performing add()), but is capped at the
50 <     * nearest power of two >= #CPUS. This reflects the idea that,
53 <     * when there are more threads than CPUs, then if each thread were
54 <     * bound to a CPU, there would exist a perfect hash function
55 <     * mapping threads to slots that eliminates collisions. When we
56 <     * reach capacity, we search for this mapping by randomly varying
57 <     * the hash codes of colliding threads.  Because search is random,
58 <     * and failures only become known via CAS failures, convergence
59 <     * will be slow, and because threads are typically not bound to
60 <     * CPUS forever, may not occur at all. However, despite these
61 <     * limitations, observed contention is typically low in these
62 <     * cases.
63 <     *
64 <     * Table entries are of class Adder; a form of AtomicLong padded
65 <     * to reduce cache contention on most processors. Padding is
66 <     * overkill for most Atomics because they are most often
48 >     * Table entries are of class Adder; a variant of AtomicLong
49 >     * padded to reduce cache contention on most processors. Padding
50 >     * is overkill for most Atomics because they are usually
51       * irregularly scattered in memory and thus don't interfere much
52       * with each other. But Atomic objects residing in arrays will
53       * tend to be placed adjacent to each other, and so will most
54 <     * often share cache lines without this precaution.  Adders are
55 <     * constructed upon first use, which further improves per-thread
56 <     * locality and helps reduce (an already large) footprint.
54 >     * often share cache lines (with a huge negative performance
55 >     * impact) without this precaution.
56 >     *
57 >     * Because Adders are relatively large, we avoid creating them
58 >     * until they are needed. On the other hand, we try to create them
59 >     * on any sign of contention.
60 >     *
61 >     * Per-thread hash codes are initialized to random values.
62 >     * Collisions are indicated by failed CASes when performing an add
63 >     * operation (see method retryAdd). Upon a collision, if the table
64 >     * size is less than the capacity, it is doubled in size unless
65 >     * some other thread holds lock. If a hashed slot is empty, and
66 >     * lock is available, a new Adder is created. Otherwise, if the
67 >     * slot exists, a CAS is tried.  Retries proceed by "double
68 >     * hashing", using a secondary hash (Marsaglia XorShift) to try to
69 >     * find a free slot.
70 >     *
71 >     * By default, the table is lazily initialized.  Upon first use,
72 >     * the table is set to size 2 (the minimum non-empty size), but
73 >     * containing only a single Adder. The maximum table size is
74 >     * bounded by nearest power of two >= the number of CPUS.  The
75 >     * table size is capped because, when there are more threads than
76 >     * CPUs, supposing that each thread were bound to a CPU, there
77 >     * would exist a perfect hash function mapping threads to slots
78 >     * that eliminates collisions. When we reach capacity, we search
79 >     * for this mapping by randomly varying the hash codes of
80 >     * colliding threads.  Because search is random, and failures only
81 >     * become known via CAS failures, convergence will be slow, and
82 >     * because threads are typically not bound to CPUS forever, may
83 >     * not occur at all. However, despite these limitations, observed
84 >     * contention is typically low in these cases.
85       *
86       * A single spinlock is used for resizing the table as well as
87 <     * populating slots with new Adders. Upon lock contention, threads
87 >     * populating slots with new Adders. After initialization, there
88 >     * is no need for a blocking lock: Upon lock contention, threads
89       * try other slots rather than blocking. After initialization, at
90       * least one slot exists, so retries will eventually find a
91 <     * candidate Adder. During these retries, there is increased
91 >     * candidate Adder.  During these retries, there is increased
92       * contention and reduced locality, which is still better than
93       * alternatives.
94       */
95  
96 <    /**
84 <     * Number of processors, to place a cap on table growth.
85 <     */
86 <    static final int NCPU = Runtime.getRuntime().availableProcessors();
96 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
97  
98      /**
99 <     * The table size set upon first use when default-constructed
99 >     * Padded variant of AtomicLong.  The value field is placed
100 >     * between pads, hoping that the JVM doesn't reorder them.
101 >     * Updates are via inlined CAS in methods add and retryAdd.
102       */
103 <    private static final int DEFAULT_ARRAY_SIZE = 8;
104 <
105 <    /**
106 <     * Padded version of AtomicLong
107 <     */
96 <    static final class Adder extends AtomicLong {
97 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
98 <        Adder(long x) { super(x); }
103 >    static final class Adder {
104 >        volatile long p0, p1, p2, p3, p4, p5, p6;
105 >        volatile long value;
106 >        volatile long q0, q1, q2, q3, q4, q5, q6;
107 >        Adder(long x) { value = x; }
108      }
109  
110      /**
111 <     * Holder for the thread-local hash code. The code starts off with
112 <     * a given random value, but may be set to a different value upon
104 <     * collisions in retryAdd.
111 >     * Holder for the thread-local hash code. The code is initially
112 >     * random, but may be set to a different value upon collisions.
113       */
114      static final class HashCode {
115 +        static final Random rng = new Random();
116          int code;
117 <        HashCode(int h) { code = h; }
117 >        HashCode() {
118 >            int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119 >            code = (h == 0) ? 1 : h;
120 >        }
121      }
122  
123      /**
124       * The corresponding ThreadLocal class
125       */
126      static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 <        static final Random rng = new Random();
116 <        public HashCode initialValue() {
117 <            int h = rng.nextInt();
118 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
119 <        }
127 >        public HashCode initialValue() { return new HashCode(); }
128      }
129  
130      /**
131       * Static per-thread hash codes. Shared across all StripedAdders
132 <     * because adjustments due to collisions in one table are likely
133 <     * to be appropriate for others.
132 >     * to reduce ThreadLocal pollution and because adjustments due to
133 >     * collisions in one table are likely to be appropriate for
134 >     * others.
135       */
136      static final ThreadHashCode threadHashCode = new ThreadHashCode();
137  
138      /**
139 <     * Table of adders. Size is power of two, grows to be at most NCPU.
139 >     * Table of adders. When non-null, size is a power of 2, at least 2.
140       */
141      private transient volatile Adder[] adders;
142  
143      /**
144 <     * Serves as a lock when resizing and/or creating Adders.  There
136 <     * is no need for a blocking lock: Except during initialization
137 <     * races, when busy, other threads try other slots.
144 >     * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145       */
146 <    private final AtomicInteger mutex;
146 >    private volatile int busy;
147  
148      /**
149       * Creates a new adder with zero sum.
150       */
151      public StripedAdder() {
145        this.mutex = new AtomicInteger();
146        // remaining initialization on first call to add.
152      }
153  
154      /**
# Line 154 | Line 159 | public class StripedAdder implements Ser
159       * will concurrently update the sum.
160       */
161      public StripedAdder(int expectedContention) {
162 <        int size;
163 <        if (expectedContention > 0) {
164 <            int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
165 <            size = 1;
161 <            while (size < cap)
162 <                size <<= 1;
163 <        }
164 <        else
165 <            size = 0;
162 >        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 >        int size = 2;
164 >        while (size < cap)
165 >            size <<= 1;
166          Adder[] as = new Adder[size];
167          for (int i = 0; i < size; ++i)
168              as[i] = new Adder(0);
169          this.adders = as;
170        this.mutex = new AtomicInteger();
170      }
171  
172      /**
# Line 176 | Line 175 | public class StripedAdder implements Ser
175       * @param x the value to add
176       */
177      public void add(long x) {
178 <        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
178 >        Adder[] as; Adder a; int n;  // locals to hold volatile reads
179          HashCode hc = threadHashCode.get();
180 <        if ((as = adders) == null || (n = as.length) < 1 ||
181 <            (a = as[hc.code & (n - 1)]) == null ||
182 <            !a.compareAndSet(v = a.get(), v + x))
183 <            retryAdd(x, hc);
180 >        int h = hc.code;
181 >        boolean collide;
182 >        if ((as = adders) != null && (n = as.length) > 0 &&
183 >            (a = as[(n - 1) & h]) != null) {
184 >            long v = a.value;
185 >            if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186 >                return;
187 >            collide = true;
188 >        }
189 >        else
190 >            collide = false;
191 >        retryAdd(x, hc, collide);
192      }
193  
194      /**
195       * Handle cases of add involving initialization, resizing,
196 <     * creating new Adders, and/or contention.
196 >     * creating new Adders, and/or contention. See above for
197 >     * explanation. This method suffers the usual non-modularity
198 >     * problems of optimistic retry code, relying on rechecked sets of
199 >     * reads.
200       */
201 <    private void retryAdd(long x, HashCode hc) {
201 >    private void retryAdd(long x, HashCode hc, boolean collide) {
202          int h = hc.code;
203 <        final AtomicInteger mutex = this.mutex;
204 <        for (boolean retried = false; ; retried = true) {
205 <            Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
206 <            if ((as = adders) == null || (n = as.length) < 1) {
207 <                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
208 <                    try {
209 <                        if (adders == null)        // Default-initialize
210 <                            adders = new Adder[DEFAULT_ARRAY_SIZE];
211 <                    } finally {
212 <                        mutex.set(0);
203 >        for (;;) {
204 >            Adder[] as; Adder a; int n;
205 >            if ((as = adders) != null && (n = as.length) > 0) {
206 >                if ((a = as[(n - 1) & h]) != null) {
207 >                    boolean shared = true;      // Slot exists
208 >                    if (collide && n < NCPU && busy == 0 &&
209 >                        UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
210 >                        try {
211 >                            if (adders == as) { // Expand table
212 >                                Adder[] rs = new Adder[n << 1];
213 >                                for (int i = 0; i < n; ++i)
214 >                                    rs[i] = as[i];
215 >                                adders = rs;
216 >                                shared = false;
217 >                            }
218 >                        } finally {
219 >                            busy = 0;
220 >                        }
221 >                        if (shared || (h & n) != 0) {
222 >                            collide = false;
223 >                            continue;           // Array or index changed
224 >                        }
225                      }
226 +                    long v = a.value;
227 +                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
228 +                        break;
229 +                    collide = shared;
230                  }
231 <                else
232 <                    Thread.yield();               // initialization race
233 <            }
234 <            else if ((a = as[k = h & (n - 1)]) != null &&
235 <                     retried && a.compareAndSet(v = a.get(), v + x))
236 <                break;
237 <            else if ((a == null || n < NCPU) &&
238 <                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
239 <                boolean created = false;
240 <                try {
241 <                    if (adders == as) {
242 <                        if (as[k] == null) {
243 <                            as[k] = new Adder(x);
218 <                            created = true;
231 >                else {                          // Try to attach new Adder
232 >                    if (busy == 0 &&
233 >                        UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
234 >                        boolean created = false;
235 >                        try {                   // Recheck under lock
236 >                            Adder[] rs; int m, j;
237 >                            if ((rs = adders) != null && (m = rs.length) > 0 &&
238 >                                rs[j = (m - 1) & h] == null) {
239 >                                rs[j] = new Adder(x);
240 >                                created = true;
241 >                            }
242 >                        } finally {
243 >                            busy = 0;
244                          }
245 <                        else {                   // Expand table
246 <                            Adder[] rs = new Adder[n << 1];
247 <                            for (int i = 0; i < n; ++i)
248 <                                rs[i] = as[i];
245 >                        if (created)
246 >                            break;
247 >                        continue;               // Slot is now non-empty
248 >                    }
249 >                    collide = false;
250 >                }
251 >                h ^= h << 13;                   // Rehash
252 >                h ^= h >>> 17;
253 >                h ^= h << 5;
254 >            }
255 >            else if (busy == 0) {               // Default-initialize
256 >                Adder r = new Adder(x);
257 >                Adder[] rs = new Adder[2];
258 >                rs[h & 1] = r;
259 >                if (adders == as && busy == 0 &&
260 >                    UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
261 >                    boolean init = false;
262 >                    try {
263 >                        if (adders == as) {
264                              adders = rs;
265 +                            init = true;
266                          }
267 +                    } finally {
268 +                        busy = 0;
269                      }
270 <                } finally {
271 <                    mutex.set(0);
270 >                    if (init)
271 >                        break;
272                  }
230                if (created)
231                    break;
232            }
233            else {                                // Try elsewhere
234                h ^= h << 13;
235                h ^= h >>> 17;                    // Marsaglia XorShift
236                h ^= h << 5;
273              }
274 +            else if (adders == as)              // Lost initialization race
275 +                Thread.yield();
276          }
277 <        hc.code = h;
277 >        hc.code = h;                            // Record index for next time
278 >    }
279 >
280 >    /**
281 >     * Equivalent to {@code add(1)}.
282 >     */
283 >    public void increment() {
284 >        add(1L);
285 >    }
286 >
287 >    /**
288 >     * Equivalent to {@code add(-1)}.
289 >     */
290 >    public void decrement() {
291 >        add(-1L);
292      }
293  
294      /**
# Line 254 | Line 306 | public class StripedAdder implements Ser
306              for (int i = 0; i < n; ++i) {
307                  Adder a = as[i];
308                  if (a != null)
309 <                    sum += a.get();
309 >                    sum += a.value;
310              }
311          }
312          return sum;
313      }
314  
315      /**
316 <     * Resets each of the variables to zero. This is effective in
317 <     * fully resetting the sum only if there are no concurrent
318 <     * updates.
267 <     */
268 <    public void reset() {
269 <        Adder[] as = adders;
270 <        if (as != null) {
271 <            int n = as.length;
272 <            for (int i = 0; i < n; ++i) {
273 <                Adder a = as[i];
274 <                if (a != null)
275 <                    a.set(0L);
276 <            }
277 <        }
278 <    }
279 <
280 <    /**
281 <     * Equivalent to {@code add(1)}.
282 <     */
283 <    public void increment() {
284 <        add(1L);
285 <    }
286 <
287 <    /**
288 <     * Equivalent to {@code add(-1)}.
289 <     */
290 <    public void decrement() {
291 <        add(-1L);
292 <    }
293 <
294 <    /**
295 <     * Equivalent to {@link #sum} followed by {@link #reset}.
316 >     * Resets each of the variables to zero, returning the estimated
317 >     * previous sum. This is effective in fully resetting the sum only
318 >     * if there are no concurrent updates.
319       *
320 <     * @return the estimated sum
320 >     * @return the estimated previous sum
321       */
322 <    public long sumAndReset() {
322 >    public long reset() {
323          long sum = 0L;
324          Adder[] as = adders;
325          if (as != null) {
# Line 304 | Line 327 | public class StripedAdder implements Ser
327              for (int i = 0; i < n; ++i) {
328                  Adder a = as[i];
329                  if (a != null) {
330 <                    sum += a.get();
331 <                    a.set(0L);
330 >                    sum += a.value;
331 >                    a.value = 0L;
332                  }
333              }
334          }
# Line 321 | Line 344 | public class StripedAdder implements Ser
344      private void readObject(ObjectInputStream s)
345          throws IOException, ClassNotFoundException {
346          s.defaultReadObject();
347 <        mutex.set(0);
347 >        busy = 0;
348          add(s.readLong());
349      }
350  
351 +    // Unsafe mechanics
352 +    private static final sun.misc.Unsafe UNSAFE;
353 +    private static final long busyOffset;
354 +    private static final long valueOffset;
355 +    static {
356 +        try {
357 +            UNSAFE = getUnsafe();
358 +            Class<?> sk = StripedAdder.class;
359 +            busyOffset = UNSAFE.objectFieldOffset
360 +                (sk.getDeclaredField("busy"));
361 +            Class<?> ak = Adder.class;
362 +            valueOffset = UNSAFE.objectFieldOffset
363 +                (ak.getDeclaredField("value"));
364 +        } catch (Exception e) {
365 +            throw new Error(e);
366 +        }
367 +    }
368 +
369 +    /**
370 +     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
371 +     * Replace with a simple call to Unsafe.getUnsafe when integrating
372 +     * into a jdk.
373 +     *
374 +     * @return a sun.misc.Unsafe
375 +     */
376 +    private static sun.misc.Unsafe getUnsafe() {
377 +        try {
378 +            return sun.misc.Unsafe.getUnsafe();
379 +        } catch (SecurityException se) {
380 +            try {
381 +                return java.security.AccessController.doPrivileged
382 +                    (new java.security
383 +                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
384 +                        public sun.misc.Unsafe run() throws Exception {
385 +                            java.lang.reflect.Field f = sun.misc
386 +                                .Unsafe.class.getDeclaredField("theUnsafe");
387 +                            f.setAccessible(true);
388 +                            return (sun.misc.Unsafe) f.get(null);
389 +                        }});
390 +            } catch (java.security.PrivilegedActionException e) {
391 +                throw new RuntimeException("Could not initialize intrinsics",
392 +                                           e.getCause());
393 +            }
394 +        }
395 +    }
396 +
397   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines