ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.2 by jsr166, Wed Jul 20 16:06:19 2011 UTC vs.
Revision 1.4 by dl, Sat Jul 23 16:32:53 2011 UTC

# Line 16 | Line 16 | import java.io.ObjectOutputStream;
16  
17   /**
18   * A set of variables that together maintain a sum.  When updates
19 < * (method {@link #add}) are contended across threads, the set of
20 < * adders may grow to reduce contention. Method {@link #sum} returns
21 < * the current combined total across these adders. This value is
22 < * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
23 < * the sum is being calculated), and so cannot be used alone for
24 < * fine-grained synchronization control.
19 > * (method {@link #add}) are contended across threads, this set of
20 > * adder variables may grow dynamically to reduce contention. Method
21 > * {@link #sum} returns the current combined total across these
22 > * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 > * updates may occur while the sum is being calculated), and so cannot
24 > * be used alone for fine-grained synchronization control.
25   *
26   * <p> This class may be applicable when many threads frequently
27   * update a common sum that is used for purposes such as collecting
28   * statistics. In this case, performance may be significantly faster
29   * than using a shared {@link AtomicLong}, at the expense of using
30 < * significantly more space.  On the other hand, if it is known that
31 < * only one thread can ever update the sum, performance may be
32 < * significantly slower than just updating a local variable.
30 > * much more space.  On the other hand, if it is known that only one
31 > * thread can ever update the sum, performance may be significantly
32 > * slower than just updating a local variable.
33 > *
34 > * <p>A StripedAdder may optionally be constructed with a given
35 > * expected contention level; i.e., the number of threads that are
36 > * expected to concurrently update the sum. Supplying an accurate
37 > * value may improve performance by reducing the need for dynamic
38 > * adjustment.
39   *
40   * @author Doug Lea
41   */
# Line 37 | Line 43 | public class StripedAdder implements Ser
43      private static final long serialVersionUID = 7249069246863182397L;
44  
45      /*
46 <     * Overview: We maintain a table of AtomicLongs (padded to reduce
47 <     * false sharing). The table is indexed by per-thread hash codes
48 <     * that are initialized as random values.  The table doubles in
49 <     * size upon contention (as indicated by failed CASes when
50 <     * performing add()), but is capped at the nearest power of two >=
51 <     * #cpus: At that point, contention should be infrequent if each
52 <     * thread has a unique index; so we instead adjust hash codes to
53 <     * new random values upon contention rather than expanding. A
54 <     * single spinlock is used for resizing the table as well as
46 >     * A StripedAdder maintains a table of Atomic long variables. The
47 >     * table is indexed by per-thread hash codes that are initialized
48 >     * to random values.
49 >     *
50 >     * The table doubles in size upon contention (as indicated by
51 >     * failed CASes when performing add()), but is capped at the
52 >     * nearest power of two >= #CPUS. This reflects the idea that,
53 >     * when there are more threads than CPUs, then if each thread were
54 >     * bound to a CPU, there would exist a perfect hash function
55 >     * mapping threads to slots that eliminates collisions. When we
56 >     * reach capacity, we search for this mapping by randomly varying
57 >     * the hash codes of colliding threads.  Because search is random,
58 >     * and failures only become known via CAS failures, convergence
59 >     * will be slow, and because threads are typically not bound to
60 >     * CPUS forever, may not occur at all. However, despite these
61 >     * limitations, observed contention is typically low in these
62 >     * cases.
63 >     *
64 >     * Table entries are of class Adder; a form of AtomicLong padded
65 >     * to reduce cache contention on most processors. Padding is
66 >     * overkill for most Atomics because they are most often
67 >     * irregularly scattered in memory and thus don't interfere much
68 >     * with each other. But Atomic objects residing in arrays will
69 >     * tend to be placed adjacent to each other, and so will most
70 >     * often share cache lines without this precaution.  Adders are
71 >     * constructed upon first use, which further improves per-thread
72 >     * locality and helps reduce (an already large) footprint.
73 >     *
74 >     * A single spinlock is used for resizing the table as well as
75       * populating slots with new Adders. Upon lock contention, threads
76 <     * just try other slots rather than blocking. We guarantee that at
76 >     * try other slots rather than blocking. After initialization, at
77       * least one slot exists, so retries will eventually find a
78 <     * candidate Adder.
78 >     * candidate Adder. During these retries, there is increased
79 >     * contention and reduced locality, which is still better than
80 >     * alternatives.
81       */
82  
83      /**
# Line 58 | Line 86 | public class StripedAdder implements Ser
86      static final int NCPU = Runtime.getRuntime().availableProcessors();
87  
88      /**
89 <     * Version of AtomicLong padded to avoid sharing cache
62 <     * lines on most processors
89 >     * Padded version of AtomicLong
90       */
91      static final class Adder extends AtomicLong {
92          long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
# Line 67 | Line 94 | public class StripedAdder implements Ser
94      }
95  
96      /**
97 <     * Holder for the thread-local hash code.
97 >     * Holder for the thread-local hash code. The code starts off with
98 >     * a given random value, but may be set to a different value upon
99 >     * collisions in retryAdd.
100       */
101      static final class HashCode {
102          int code;
# Line 93 | Line 122 | public class StripedAdder implements Ser
122      static final ThreadHashCode threadHashCode = new ThreadHashCode();
123  
124      /**
125 <     * Table of adders. Initially of size 2; grows to be at most NCPU.
125 >     * Table of adders. Minimum size 2. Size grows to be at most NCPU.
126       */
127      private transient volatile Adder[] adders;
128  
# Line 105 | Line 134 | public class StripedAdder implements Ser
134      private final AtomicInteger mutex;
135  
136      /**
137 <     * Marsaglia XorShift for rehashing on collisions
137 >     * Creates a new adder with zero sum.
138       */
139 <    private static int xorShift(int r) {
140 <        r ^= r << 13;
141 <        r ^= r >>> 17;
113 <        return r ^ (r << 5);
139 >    public StripedAdder() {
140 >        this.mutex = new AtomicInteger();
141 >        // remaining initialization on first call to add.
142      }
143  
144      /**
145 <     * Creates a new adder with initially zero sum.
145 >     * Creates a new adder with zero sum, and with stripes presized
146 >     * for the given expected contention level.
147 >     *
148 >     * @param expectedContention the expected number of threads that
149 >     * will concurrently update the sum.
150       */
151 <    public StripedAdder() {
152 <        Adder[] as = new Adder[2];
153 <        as[0] = new Adder(0); // ensure at least one available adder
151 >    public StripedAdder(int expectedContention) {
152 >        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
153 >        int size = 2;
154 >        while (size < cap)
155 >            size <<= 1;
156 >        Adder[] as = new Adder[size];
157 >        for (int i = 0; i < size; ++i)
158 >            as[i] = new Adder(0);
159          this.adders = as;
160          this.mutex = new AtomicInteger();
161      }
# Line 129 | Line 166 | public class StripedAdder implements Ser
166       * @param x the value to add
167       */
168      public void add(long x) {
169 +        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
170          HashCode hc = threadHashCode.get();
171 <        for (int h = hc.code;;) {
172 <            Adder[] as = adders;
173 <            int n = as.length;
174 <            Adder a = as[h & (n - 1)];
175 <            if (a != null) {
176 <                long v = a.get();
177 <                if (a.compareAndSet(v, v + x))
178 <                    break;
179 <                if (n >= NCPU) {                 // Collision when table at max
180 <                    h = hc.code = xorShift(h);   // change code
181 <                    continue;
171 >        if ((as = adders) == null || (n = as.length) < 1 ||
172 >            (a = as[hc.code & (n - 1)]) == null ||
173 >            !a.compareAndSet(v = a.get(), v + x))
174 >            retryAdd(x, hc);
175 >    }
176 >
177 >    /**
178 >     * Handle cases of add involving initialization, resizing,
179 >     * creating new Adders, and/or contention.
180 >     */
181 >    private void retryAdd(long x, HashCode hc) {
182 >        int h = hc.code;
183 >        final AtomicInteger mutex = this.mutex;
184 >        AtomicInteger lock = null;                     // nonnull when held
185 >        try {
186 >            for (;;) {
187 >                Adder[] as; Adder a; long v; int n, k; // locals for volatiles
188 >                boolean needLock = true;
189 >                if ((as = adders) == null || (n = as.length) < 1) {
190 >                    if (lock != null)                  // default-initialize
191 >                        adders = new Adder[2];
192                  }
193 <            }
194 <            final AtomicInteger mutex = this.mutex;
195 <            if (mutex.get() != 0)
196 <                h = xorShift(h);                 // Try elsewhere
149 <            else if (mutex.compareAndSet(0, 1)) {
150 <                boolean created = false;
151 <                try {
152 <                    Adder[] rs = adders;
153 <                    if (a != null && rs == as)   // Resize table
154 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
155 <                    int j = h & (rs.length - 1);
156 <                    if (rs[j] == null) {         // Create adder
157 <                        rs[j] = new Adder(x);
158 <                        created = true;
193 >                else if ((a = as[k = h & (n - 1)]) == null) {
194 >                    if (lock != null) {                // attach new adder
195 >                        as[k] = new Adder(x);
196 >                        break;
197                      }
160                } finally {
161                    mutex.set(0);
198                  }
199 <                if (created) {
164 <                    hc.code = h;                // Use this adder next time
199 >                else if (a.compareAndSet(v = a.get(), v + x))
200                      break;
201 +                else if (n >= NCPU)                    // cannot expand
202 +                    needLock = false;
203 +                else if (lock != null)                 // expand table
204 +                    adders = Arrays.copyOf(as, n << 1);
205 +
206 +                if (lock == null) {
207 +                    if (needLock && mutex.get() == 0 &&
208 +                        mutex.compareAndSet(0, 1))
209 +                        lock = mutex;
210 +                    else {                             // try elsewhere
211 +                        h ^= h << 13;                  // Marsaglia XorShift
212 +                        h ^= h >>> 17;
213 +                        h ^= h << 5;
214 +                    }
215                  }
216              }
217 +        } finally {
218 +            if (lock != null)
219 +                lock.set(0);
220          }
221 +        if (hc.code != h)                              // avoid unneeded writes
222 +            hc.code = h;
223      }
224  
225      /**
# Line 176 | Line 230 | public class StripedAdder implements Ser
230       * @return the estimated sum
231       */
232      public long sum() {
233 <        long sum = 0;
233 >        long sum = 0L;
234          Adder[] as = adders;
235 <        int n = as.length;
236 <        for (int i = 0; i < n; ++i) {
237 <            Adder a = as[i];
238 <            if (a != null)
239 <                sum += a.get();
235 >        if (as != null) {
236 >            int n = as.length;
237 >            for (int i = 0; i < n; ++i) {
238 >                Adder a = as[i];
239 >                if (a != null)
240 >                    sum += a.get();
241 >            }
242          }
243          return sum;
244      }
# Line 194 | Line 250 | public class StripedAdder implements Ser
250       */
251      public void reset() {
252          Adder[] as = adders;
253 <        int n = as.length;
254 <        for (int i = 0; i < n; ++i) {
255 <            Adder a = as[i];
256 <            if (a != null)
257 <                a.set(0L);
253 >        if (as != null) {
254 >            int n = as.length;
255 >            for (int i = 0; i < n; ++i) {
256 >                Adder a = as[i];
257 >                if (a != null)
258 >                    a.set(0L);
259 >            }
260          }
261      }
262  
# Line 222 | Line 280 | public class StripedAdder implements Ser
280       * @return the estimated sum
281       */
282      public long sumAndReset() {
283 <        long sum = 0;
283 >        long sum = 0L;
284          Adder[] as = adders;
285 <        int n = as.length;
286 <        for (int i = 0; i < n; ++i) {
287 <            Adder a = as[i];
288 <            if (a != null) {
289 <                sum += a.get();
290 <                a.set(0L);
285 >        if (as != null) {
286 >            int n = as.length;
287 >            for (int i = 0; i < n; ++i) {
288 >                Adder a = as[i];
289 >                if (a != null) {
290 >                    sum += a.get();
291 >                    a.set(0L);
292 >                }
293              }
294          }
295          return sum;
# Line 244 | Line 304 | public class StripedAdder implements Ser
304      private void readObject(ObjectInputStream s)
305          throws IOException, ClassNotFoundException {
306          s.defaultReadObject();
247        long c = s.readLong();
248        Adder[] as = new Adder[2];
249        as[0] = new Adder(c);
250        this.adders = as;
307          mutex.set(0);
308 +        add(s.readLong());
309      }
310  
311   }
255
256

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines