ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.3 by dl, Fri Jul 22 13:25:12 2011 UTC vs.
Revision 1.5 by dl, Sun Jul 24 15:08:21 2011 UTC

# Line 27 | Line 27 | import java.io.ObjectOutputStream;
27   * update a common sum that is used for purposes such as collecting
28   * statistics. In this case, performance may be significantly faster
29   * than using a shared {@link AtomicLong}, at the expense of using
30 < * significantly more space.  On the other hand, if it is known that
31 < * only one thread can ever update the sum, performance may be
32 < * significantly slower than just updating a local variable.
30 > * much more space.  On the other hand, if it is known that only one
31 > * thread can ever update the sum, performance may be significantly
32 > * slower than just updating a local variable.
33   *
34   * <p>A StripedAdder may optionally be constructed with a given
35   * expected contention level; i.e., the number of threads that are
# Line 43 | Line 43 | public class StripedAdder implements Ser
43      private static final long serialVersionUID = 7249069246863182397L;
44  
45      /*
46 <     * Overview: We maintain a table of Atomic long variables. The
46 >     * A StripedAdder maintains a table of Atomic long variables. The
47       * table is indexed by per-thread hash codes that are initialized
48       * to random values.
49       *
# Line 58 | Line 58 | public class StripedAdder implements Ser
58       * and failures only become known via CAS failures, convergence
59       * will be slow, and because threads are typically not bound to
60       * CPUS forever, may not occur at all. However, despite these
61 <     * limitations, observed contention is typically very low in these
61 >     * limitations, observed contention is typically low in these
62       * cases.
63       *
64       * Table entries are of class Adder; a form of AtomicLong padded
# Line 67 | Line 67 | public class StripedAdder implements Ser
67       * irregularly scattered in memory and thus don't interfere much
68       * with each other. But Atomic objects residing in arrays will
69       * tend to be placed adjacent to each other, and so will most
70 <     * often share cache lines without this precaution.  Except for
71 <     * slot adders[0], Adders are constructed upon first use, which
72 <     * further improves per-thread locality and helps reduce (an
73 <     * already large) footprint.
70 >     * often share cache lines without this precaution.  Adders are
71 >     * constructed upon first use, which further improves per-thread
72 >     * locality and helps reduce (an already large) footprint.
73       *
74       * A single spinlock is used for resizing the table as well as
75       * populating slots with new Adders. Upon lock contention, threads
76 <     * try other slots rather than blocking. We guarantee that at
77 <     * least one slot (0) exists, so retries will eventually find a
76 >     * try other slots rather than blocking. After initialization, at
77 >     * least one slot exists, so retries will eventually find a
78       * candidate Adder. During these retries, there is increased
79       * contention and reduced locality, which is still better than
80       * alternatives.
# Line 87 | Line 86 | public class StripedAdder implements Ser
86      static final int NCPU = Runtime.getRuntime().availableProcessors();
87  
88      /**
89 +     * The table size set upon first use when default-constructed
90 +     */
91 +    private static final int DEFAULT_ARRAY_SIZE = 8;
92 +
93 +    /**
94       * Padded version of AtomicLong
95       */
96      static final class Adder extends AtomicLong {
# Line 96 | Line 100 | public class StripedAdder implements Ser
100  
101      /**
102       * Holder for the thread-local hash code. The code starts off with
103 <     * a given random value, but may be set to a different
104 <     * pseudo-random value (using a cheaper but adequate xorshift
101 <     * generator) upon collisions.
103 >     * a given random value, but may be set to a different value upon
104 >     * collisions in retryAdd.
105       */
106      static final class HashCode {
107          int code;
# Line 124 | Line 127 | public class StripedAdder implements Ser
127      static final ThreadHashCode threadHashCode = new ThreadHashCode();
128  
129      /**
130 <     * Table of adders. Minimum size 2. Size grows to be at most NCPU.
130 >     * Table of adders. Size is power of two, grows to be at most NCPU.
131       */
132      private transient volatile Adder[] adders;
133  
134      /**
135       * Serves as a lock when resizing and/or creating Adders.  There
136 <     * is no need for a blocking lock: When busy, other threads try
137 <     * other slots.
136 >     * is no need for a blocking lock: Except during initialization
137 >     * races, when busy, other threads try other slots.
138       */
139      private final AtomicInteger mutex;
140  
141      /**
139     * Marsaglia XorShift random generator for rehashing on collisions
140     */
141    private static int xorShift(int r) {
142        r ^= r << 13;
143        r ^= r >>> 17;
144        return r ^ (r << 5);
145    }
146
147    /**
142       * Creates a new adder with zero sum.
143       */
144      public StripedAdder() {
145 <        this(2);
145 >        this.mutex = new AtomicInteger();
146 >        // remaining initialization on first call to add.
147      }
148  
149      /**
# Line 159 | Line 154 | public class StripedAdder implements Ser
154       * will concurrently update the sum.
155       */
156      public StripedAdder(int expectedContention) {
157 <        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
158 <        int size = 2;
159 <        while (size < cap)
160 <            size <<= 1;
157 >        int size;
158 >        if (expectedContention > 0) {
159 >            int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
160 >            size = 1;
161 >            while (size < cap)
162 >                size <<= 1;
163 >        }
164 >        else
165 >            size = 0;
166          Adder[] as = new Adder[size];
167 <        as[0] = new Adder(0); // ensure at least one available adder
167 >        for (int i = 0; i < size; ++i)
168 >            as[i] = new Adder(0);
169          this.adders = as;
170          this.mutex = new AtomicInteger();
171      }
# Line 175 | Line 176 | public class StripedAdder implements Ser
176       * @param x the value to add
177       */
178      public void add(long x) {
179 +        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
180          HashCode hc = threadHashCode.get();
181 <        for (int h = hc.code;;) {
182 <            Adder[] as = adders;
183 <            int n = as.length;
184 <            Adder a = as[h & (n - 1)];
185 <            if (a != null) {
186 <                long v = a.get();
187 <                if (a.compareAndSet(v, v + x))
188 <                    break;
189 <                if (n >= NCPU) {                 // Collision when table at max
190 <                    h = hc.code = xorShift(h);   // change code
191 <                    continue;
181 >        if ((as = adders) == null || (n = as.length) < 1 ||
182 >            (a = as[hc.code & (n - 1)]) == null ||
183 >            !a.compareAndSet(v = a.get(), v + x))
184 >            retryAdd(x, hc);
185 >    }
186 >
187 >    /**
188 >     * Handle cases of add involving initialization, resizing,
189 >     * creating new Adders, and/or contention.
190 >     */
191 >    private void retryAdd(long x, HashCode hc) {
192 >        int h = hc.code;
193 >        final AtomicInteger mutex = this.mutex;
194 >        for (boolean retried = false; ; retried = true) {
195 >            Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
196 >            if ((as = adders) == null || (n = as.length) < 1) {
197 >                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
198 >                    try {
199 >                        if (adders == null)        // Default-initialize
200 >                            adders = new Adder[DEFAULT_ARRAY_SIZE];
201 >                    } finally {
202 >                        mutex.set(0);
203 >                    }
204                  }
205 +                else
206 +                    Thread.yield();               // initialization race
207              }
208 <            final AtomicInteger mutex = this.mutex;
209 <            if (mutex.get() != 0)
210 <                h = xorShift(h);                 // Try elsewhere
211 <            else if (mutex.compareAndSet(0, 1)) {
208 >            else if ((a = as[k = h & (n - 1)]) != null &&
209 >                     retried && a.compareAndSet(v = a.get(), v + x))
210 >                break;
211 >            else if ((a == null || n < NCPU) &&
212 >                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
213                  boolean created = false;
214                  try {
215 <                    Adder[] rs = adders;
216 <                    if (a != null && rs == as)   // Resize table
217 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
218 <                    int j = h & (rs.length - 1);
219 <                    if (rs[j] == null) {         // Create adder
220 <                        rs[j] = new Adder(x);
221 <                        created = true;
215 >                    if (adders == as) {
216 >                        if (as[k] == null) {
217 >                            as[k] = new Adder(x);
218 >                            created = true;
219 >                        }
220 >                        else {                   // Expand table
221 >                            Adder[] rs = new Adder[n << 1];
222 >                            for (int i = 0; i < n; ++i)
223 >                                rs[i] = as[i];
224 >                            adders = rs;
225 >                        }
226                      }
227                  } finally {
228                      mutex.set(0);
229                  }
230 <                if (created) {
210 <                    hc.code = h;                 // Use this adder next time
230 >                if (created)
231                      break;
232 <                }
232 >            }
233 >            else {                                // Try elsewhere
234 >                h ^= h << 13;
235 >                h ^= h >>> 17;                    // Marsaglia XorShift
236 >                h ^= h << 5;
237              }
238          }
239 +        hc.code = h;
240      }
241  
242      /**
# Line 222 | Line 247 | public class StripedAdder implements Ser
247       * @return the estimated sum
248       */
249      public long sum() {
250 <        long sum = 0;
250 >        long sum = 0L;
251          Adder[] as = adders;
252 <        int n = as.length;
253 <        for (int i = 0; i < n; ++i) {
254 <            Adder a = as[i];
255 <            if (a != null)
256 <                sum += a.get();
252 >        if (as != null) {
253 >            int n = as.length;
254 >            for (int i = 0; i < n; ++i) {
255 >                Adder a = as[i];
256 >                if (a != null)
257 >                    sum += a.get();
258 >            }
259          }
260          return sum;
261      }
# Line 240 | Line 267 | public class StripedAdder implements Ser
267       */
268      public void reset() {
269          Adder[] as = adders;
270 <        int n = as.length;
271 <        for (int i = 0; i < n; ++i) {
272 <            Adder a = as[i];
273 <            if (a != null)
274 <                a.set(0L);
270 >        if (as != null) {
271 >            int n = as.length;
272 >            for (int i = 0; i < n; ++i) {
273 >                Adder a = as[i];
274 >                if (a != null)
275 >                    a.set(0L);
276 >            }
277          }
278      }
279  
# Line 268 | Line 297 | public class StripedAdder implements Ser
297       * @return the estimated sum
298       */
299      public long sumAndReset() {
300 <        long sum = 0;
300 >        long sum = 0L;
301          Adder[] as = adders;
302 <        int n = as.length;
303 <        for (int i = 0; i < n; ++i) {
304 <            Adder a = as[i];
305 <            if (a != null) {
306 <                sum += a.get();
307 <                a.set(0L);
302 >        if (as != null) {
303 >            int n = as.length;
304 >            for (int i = 0; i < n; ++i) {
305 >                Adder a = as[i];
306 >                if (a != null) {
307 >                    sum += a.get();
308 >                    a.set(0L);
309 >                }
310              }
311          }
312          return sum;
# Line 290 | Line 321 | public class StripedAdder implements Ser
321      private void readObject(ObjectInputStream s)
322          throws IOException, ClassNotFoundException {
323          s.defaultReadObject();
293        long c = s.readLong();
294        Adder[] as = new Adder[2];
295        as[0] = new Adder(c);
296        this.adders = as;
324          mutex.set(0);
325 +        add(s.readLong());
326      }
327  
328   }
301
302

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines