ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.34
Committed: Mon Dec 12 20:05:48 2005 UTC (18 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.33: +107 -67 lines
Log Message:
Performance improvements

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 dl 1.14 * http://creativecommons.org/licenses/publicdomain
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.21 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 dl 1.4 import java.util.concurrent.locks.*;
11 dl 1.16 import java.util.concurrent.atomic.*;
12     import java.util.Random;
13 tim 1.1
14     /**
15 dl 1.28 * A synchronization point at which threads can pair and swap elements
16     * within pairs. Each thread presents some object on entry to the
17     * {@link #exchange exchange} method, matches with a partner thread,
18     * and receives its partner's object on return.
19 tim 1.1 *
20     * <p><b>Sample Usage:</b>
21 jsr166 1.29 * Here are the highlights of a class that uses an {@code Exchanger}
22     * to swap buffers between threads so that the thread filling the
23     * buffer gets a freshly emptied one when it needs it, handing off the
24     * filled one to the thread emptying the buffer.
25     * <pre>{@code
26 tim 1.1 * class FillAndEmpty {
27 jsr166 1.29 * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
28 dl 1.9 * DataBuffer initialEmptyBuffer = ... a made-up type
29     * DataBuffer initialFullBuffer = ...
30 tim 1.1 *
31     * class FillingLoop implements Runnable {
32     * public void run() {
33 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
34 tim 1.1 * try {
35     * while (currentBuffer != null) {
36     * addToBuffer(currentBuffer);
37 dl 1.30 * if (currentBuffer.isFull())
38 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
39     * }
40 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
41 tim 1.1 * }
42     * }
43     *
44     * class EmptyingLoop implements Runnable {
45     * public void run() {
46 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
47 tim 1.1 * try {
48     * while (currentBuffer != null) {
49     * takeFromBuffer(currentBuffer);
50 dl 1.30 * if (currentBuffer.isEmpty())
51 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
52     * }
53 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
54 tim 1.1 * }
55     * }
56     *
57     * void start() {
58     * new Thread(new FillingLoop()).start();
59     * new Thread(new EmptyingLoop()).start();
60     * }
61     * }
62 jsr166 1.29 * }</pre>
63 tim 1.1 *
64 jsr166 1.27 * <p>Memory consistency effects: For each pair of threads that
65     * successfully exchange objects via an {@code Exchanger}, actions
66     * prior to the {@code exchange()} in each thread
67     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
68     * those subsequent to a return from the corresponding {@code exchange()}
69     * in the other thread.
70 brian 1.22 *
71 tim 1.1 * @since 1.5
72 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
73 dl 1.11 * @param <V> The type of objects that may be exchanged
74 tim 1.1 */
75     public class Exchanger<V> {
76 dl 1.16 /*
77     * The underlying idea is to use a stack to hold nodes containing
78     * pairs of items to be exchanged. Except that:
79     *
80     * * Only one element of the pair is known on creation by a
81     * first-arriving thread; the other is a "hole" waiting to be
82     * filled in. This is a degenerate form of the dual stacks
83     * described in "Nonblocking Concurrent Objects with Condition
84     * Synchronization", by W. N. Scherer III and M. L. Scott.
85     * 18th Annual Conf. on Distributed Computing, Oct. 2004.
86     * It is "degenerate" in that both the items and the holes
87     * are shared in the same nodes.
88     *
89     * * There isn't really a stack here! There can't be -- if two
90     * nodes were both in the stack, they should cancel themselves
91     * out by combining. So that's what we do. The 0th element of
92     * the "arena" array serves only as the top of stack. The
93     * remainder of the array is a form of the elimination backoff
94     * collision array described in "A Scalable Lock-free Stack
95     * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
96     * 16th ACM Symposium on Parallelism in Algorithms and
97     * Architectures, June 2004. Here, threads spin (using short
98     * timed waits with exponential backoff) looking for each
99     * other. If they fail to find others waiting, they try the
100     * top spot again. As shown in that paper, this always
101     * converges.
102     *
103     * The backoff elimination mechanics never come into play in
104     * common usages where only two threads ever meet to exchange
105     * items, but they prevent contention bottlenecks when an
106     * exchanger is used by a large number of threads.
107 dl 1.30 *
108     * For more details, see the paper "A Scalable Elimination-based
109     * Exchange Channel" by William Scherer, Doug Lea, and Michael
110     * Scott in Proceedings of SCOOL05 workshop. Available at:
111     * http://hdl.handle.net/1802/2104
112 dl 1.16 */
113 dl 1.2
114 dl 1.32 /** The number of CPUs, for sizing and spin control */
115     static final int NCPUS = Runtime.getRuntime().availableProcessors();
116    
117 jsr166 1.17 /**
118 dl 1.16 * Size of collision space. Using a size of half the number of
119     * CPUs provides enough space for threads to find each other but
120     * not so much that it would always require one or more to time
121 dl 1.34 * out to become unstuck. Note that the arena array holds SIZE+1
122     * elements, to include the top-of-stack slot. Imposing a ceiling
123     * is suboptimal for huge machines, but bounds backoff times to
124     * acceptable values. To ensure max times less than 2.4seconds,
125     * the ceiling value plus the shift value of backoff base (below)
126     * should be less than or equal to 31.
127 dl 1.16 */
128 dl 1.34 private static final int SIZE = Math.min(25, (NCPUS + 1) / 2);
129    
130     /**
131     * Base unit in nanoseconds for backoffs. Must be a power of two.
132     * Should be small because backoffs exponentially increase from
133     * base. The value should be close to the round-trip time of a
134     * call to LockSupport.park in the case where some other thread
135     * has already called unpark. On multiprocessors, timed waits less
136     * than this value are implemented by spinning.
137     */
138     static final long BACKOFF_BASE = (1L << 6);
139    
140     /**
141     * The number of nanoseconds for which it is faster to spin rather
142     * than to use timed park. Should normally be zero on
143     * uniprocessors and BACKOFF_BASE on multiprocessors.
144     */
145     static final long spinForTimeoutThreshold = (NCPUS < 2)? 0 : BACKOFF_BASE;
146 dl 1.32
147     /**
148     * The number of times to spin before blocking in timed waits.
149     * The value is empirically derived -- it works well across a
150     * variety of processors and OSes. Empirically, the best value
151     * seems not to vary with number of CPUs (beyond 2) so is just
152     * a constant.
153     */
154     static final int maxTimedSpins = (NCPUS < 2)? 0 : 16;
155    
156     /**
157     * The number of times to spin before blocking in untimed waits.
158     * This is greater than timed value because untimed waits spin
159     * faster since they don't need to check times on each spin.
160     */
161     static final int maxUntimedSpins = maxTimedSpins * 32;
162    
163     /**
164 dl 1.16 * Sentinel item representing cancellation. This value is placed
165     * in holes on cancellation, and used as a return value from Node
166     * methods to indicate failure to set or get hole.
167 jsr166 1.17 */
168 dl 1.16 static final Object FAIL = new Object();
169    
170 jsr166 1.17 /**
171 dl 1.16 * The collision arena. arena[0] is used as the top of the stack.
172     * The remainder is used as the collision elimination space.
173 dl 1.3 */
174 dl 1.30 private final AtomicReference<Node>[] arena;
175 dl 1.5
176 dl 1.34 /**
177     * Per-thread random number generator. Because random numbers are
178     * used to choose slots and delays to reduce contention, the
179     * random number generator itself cannot introduce contention.
180     * And the statistical quality of the generator is not too
181     * important. So we use a custom cheap generator, and maintain it
182     * as a thread local.
183     */
184     private static final ThreadLocal<RNG> random = new ThreadLocal<RNG>() {
185     public RNG initialValue() { return new RNG(); } };
186 dl 1.5
187 dl 1.16 /**
188     * Creates a new Exchanger.
189     */
190     public Exchanger() {
191 dl 1.30 arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
192 dl 1.16 for (int i = 0; i < arena.length; ++i)
193 dl 1.30 arena[i] = new AtomicReference<Node>();
194 dl 1.16 }
195 dl 1.2
196 dl 1.16 /**
197     * Main exchange function, handling the different policy variants.
198     * Uses Object, not "V" as argument and return value to simplify
199     * handling of internal sentinel values. Callers from public
200     * methods cast accordingly.
201 dl 1.30 *
202     * @param item the item to exchange
203     * @param timed true if the wait is timed
204     * @param nanos if timed, the maximum wait time
205     * @return the other thread's item
206 dl 1.16 */
207     private Object doExchange(Object item, boolean timed, long nanos)
208     throws InterruptedException, TimeoutException {
209 dl 1.30 long lastTime = timed ? System.nanoTime() : 0;
210 dl 1.16 int idx = 0; // start out at slot representing top
211     int backoff = 0; // increases on failure to occupy a slot
212 dl 1.34 Node me = new Node(item);
213 dl 1.16
214     for (;;) {
215 dl 1.30 AtomicReference<Node> slot = arena[idx];
216 dl 1.16 Node you = slot.get();
217 dl 1.2
218 dl 1.16 // Try to occupy this slot
219 dl 1.34 if (you == null && slot.compareAndSet(null, me)) {
220 dl 1.16 // If this is top slot, use regular wait, else backoff-wait
221     Object v = ((idx == 0)?
222     me.waitForHole(timed, nanos) :
223     me.waitForHole(true, randomDelay(backoff)));
224 dl 1.34 if (slot.get() == me)
225     slot.compareAndSet(me, null);
226 dl 1.16 if (v != FAIL)
227     return v;
228     if (Thread.interrupted())
229     throw new InterruptedException();
230     if (timed) {
231     long now = System.nanoTime();
232     nanos -= now - lastTime;
233     lastTime = now;
234     if (nanos <= 0)
235     throw new TimeoutException();
236     }
237 dl 1.2
238 dl 1.34 me = new Node(me.item); // Throw away nodes on failure
239 dl 1.16 if (backoff < SIZE - 1) // Increase or stay saturated
240     ++backoff;
241     idx = 0; // Restart at top
242 dl 1.34 continue;
243 dl 1.2 }
244    
245 dl 1.34 // Try to release waiter from apparently non-empty slot
246     if (you != null || (you = slot.get()) != null) {
247     boolean success = (you.get() == null &&
248     you.compareAndSet(null, me.item));
249     if (slot.get() == you)
250     slot.compareAndSet(you, null);
251     if (success) {
252     you.signal();
253     return you.item;
254     }
255     }
256 dl 1.2
257 dl 1.34 // Retry with a random non-top slot <= backoff
258     idx = backoff == 0? 1 : 1 + random.get().next() % (backoff + 1);
259 dl 1.2 }
260     }
261 tim 1.1
262     /**
263 jsr166 1.31 * Returns a random delay less than (base times (2 raised to backoff)).
264 dl 1.16 */
265     private long randomDelay(int backoff) {
266 dl 1.34 return ((BACKOFF_BASE << backoff) - 1) & random.get().next();
267 dl 1.16 }
268    
269     /**
270     * Nodes hold partially exchanged data. This class
271     * opportunistically subclasses AtomicReference to represent the
272     * hole. So get() returns hole, and compareAndSet CAS'es value
273     * into hole. Note that this class cannot be parameterized as V
274     * because the sentinel value FAIL is only of type Object.
275 jsr166 1.15 */
276 dl 1.16 static final class Node extends AtomicReference<Object> {
277 dl 1.20 private static final long serialVersionUID = -3221313401284163686L;
278 jsr166 1.21
279 dl 1.16 /** The element offered by the Thread creating this node. */
280     final Object item;
281 jsr166 1.31
282 dl 1.34 /** The Thread waiting to be signalled; null until waiting. */
283     volatile Thread waiter;
284 dl 1.16
285     /**
286     * Creates node with given item and empty hole.
287 jsr166 1.31 *
288     * @param item the item
289 dl 1.16 */
290     Node(Object item) {
291     this.item = item;
292     }
293    
294     /**
295 dl 1.34 * Unparks thread if it is waiting
296 dl 1.16 */
297 dl 1.34 void signal() {
298     LockSupport.unpark(waiter);
299     }
300 dl 1.16
301     /**
302 jsr166 1.17 * Waits for and gets the hole filled in by another thread.
303     * Fails if timed out or interrupted before hole filled.
304 dl 1.30 *
305     * @param timed true if the wait is timed
306     * @param nanos if timed, the maximum wait time
307     * @return on success, the hole; on failure, FAIL
308 dl 1.16 */
309     Object waitForHole(boolean timed, long nanos) {
310 dl 1.30 long lastTime = timed ? System.nanoTime() : 0;
311 dl 1.32 int spins = timed? maxTimedSpins : maxUntimedSpins;
312 dl 1.34 Thread w = Thread.currentThread();
313     for (;;) {
314     if (w.isInterrupted())
315     compareAndSet(null, FAIL);
316     Object h = get();
317     if (h != null)
318     return h;
319     if (timed) {
320     long now = System.nanoTime();
321     nanos -= now - lastTime;
322     lastTime = now;
323     if (nanos <= 0) {
324     compareAndSet(null, FAIL);
325     continue;
326 dl 1.32 }
327 dl 1.16 }
328 dl 1.34 if (spins > 0)
329     --spins;
330     else if (waiter == null)
331     waiter = w;
332     else if (!timed)
333     LockSupport.park(this);
334     else if (nanos > spinForTimeoutThreshold)
335     LockSupport.parkNanos(this, nanos);
336 dl 1.16 }
337     }
338 tim 1.1 }
339    
340     /**
341     * Waits for another thread to arrive at this exchange point (unless
342 jsr166 1.31 * the current thread is {@link Thread#interrupt interrupted}),
343 tim 1.1 * and then transfers the given object to it, receiving its object
344     * in return.
345 jsr166 1.17 *
346 tim 1.1 * <p>If another thread is already waiting at the exchange point then
347     * it is resumed for thread scheduling purposes and receives the object
348     * passed in by the current thread. The current thread returns immediately,
349     * receiving the object passed to the exchange by that other thread.
350 jsr166 1.17 *
351 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
352 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
353     * dormant until one of two things happens:
354     * <ul>
355     * <li>Some other thread enters the exchange; or
356     * <li>Some other thread {@link Thread#interrupt interrupts} the current
357     * thread.
358     * </ul>
359     * <p>If the current thread:
360     * <ul>
361 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
362 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
363 jsr166 1.15 * for the exchange,
364 tim 1.1 * </ul>
365 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
366     * interrupted status is cleared.
367 tim 1.1 *
368     * @param x the object to exchange
369 dl 1.30 * @return the object provided by the other thread
370     * @throws InterruptedException if the current thread was
371     * interrupted while waiting
372 jsr166 1.15 */
373 tim 1.1 public V exchange(V x) throws InterruptedException {
374 dl 1.2 try {
375 dl 1.16 return (V)doExchange(x, false, 0);
376 jsr166 1.15 } catch (TimeoutException cannotHappen) {
377 dl 1.2 throw new Error(cannotHappen);
378     }
379 tim 1.1 }
380    
381     /**
382     * Waits for another thread to arrive at this exchange point (unless
383 jsr166 1.31 * the current thread is {@link Thread#interrupt interrupted} or
384     * the specified waiting time elapses), and then transfers the given
385     * object to it, receiving its object in return.
386 tim 1.1 *
387     * <p>If another thread is already waiting at the exchange point then
388     * it is resumed for thread scheduling purposes and receives the object
389     * passed in by the current thread. The current thread returns immediately,
390     * receiving the object passed to the exchange by that other thread.
391     *
392 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
393 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
394     * dormant until one of three things happens:
395     * <ul>
396     * <li>Some other thread enters the exchange; or
397     * <li>Some other thread {@link Thread#interrupt interrupts} the current
398     * thread; or
399     * <li>The specified waiting time elapses.
400     * </ul>
401     * <p>If the current thread:
402     * <ul>
403 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
404 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
405 jsr166 1.15 * for the exchange,
406 tim 1.1 * </ul>
407 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
408     * interrupted status is cleared.
409 tim 1.1 *
410     * <p>If the specified waiting time elapses then {@link TimeoutException}
411     * is thrown.
412 jsr166 1.15 * If the time is
413 tim 1.1 * less than or equal to zero, the method will not wait at all.
414     *
415     * @param x the object to exchange
416     * @param timeout the maximum time to wait
417 dl 1.30 * @param unit the time unit of the <tt>timeout</tt> argument
418     * @return the object provided by the other thread
419     * @throws InterruptedException if the current thread was
420     * interrupted while waiting
421     * @throws TimeoutException if the specified waiting time elapses
422     * before another thread enters the exchange
423 jsr166 1.15 */
424     public V exchange(V x, long timeout, TimeUnit unit)
425 tim 1.1 throws InterruptedException, TimeoutException {
426 dl 1.16 return (V)doExchange(x, true, unit.toNanos(timeout));
427 tim 1.1 }
428 dl 1.34
429     /**
430     * Cheap XorShift random number generator used for determining
431     * elimination array slots and backoff delays. This uses the
432     * simplest of the generators described in George Marsaglia's
433     * "Xorshift RNGs" paper. This is not a high-quality generator
434     * but is acceptable here.
435     */
436     static final class RNG {
437     /** Use java.util.Random as seed generator for new RNGs. */
438     private static final Random seedGenerator = new Random();
439     private int seed = seedGenerator.nextInt() | 1;
440    
441     /**
442     * Returns random nonnegative integer.
443     */
444     int next() {
445     int x = seed;
446     x ^= x << 6;
447     x ^= x >>> 21;
448     seed = x ^= x << 7;
449     return x & 0x7FFFFFFF;
450     }
451     }
452    
453 tim 1.1 }