ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.34
Committed: Mon Dec 12 20:05:48 2005 UTC (18 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.33: +107 -67 lines
Log Message:
Performance improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/licenses/publicdomain
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 import java.util.concurrent.locks.*;
11 import java.util.concurrent.atomic.*;
12 import java.util.Random;
13
14 /**
15 * A synchronization point at which threads can pair and swap elements
16 * within pairs. Each thread presents some object on entry to the
17 * {@link #exchange exchange} method, matches with a partner thread,
18 * and receives its partner's object on return.
19 *
20 * <p><b>Sample Usage:</b>
21 * Here are the highlights of a class that uses an {@code Exchanger}
22 * to swap buffers between threads so that the thread filling the
23 * buffer gets a freshly emptied one when it needs it, handing off the
24 * filled one to the thread emptying the buffer.
25 * <pre>{@code
26 * class FillAndEmpty {
27 * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
28 * DataBuffer initialEmptyBuffer = ... a made-up type
29 * DataBuffer initialFullBuffer = ...
30 *
31 * class FillingLoop implements Runnable {
32 * public void run() {
33 * DataBuffer currentBuffer = initialEmptyBuffer;
34 * try {
35 * while (currentBuffer != null) {
36 * addToBuffer(currentBuffer);
37 * if (currentBuffer.isFull())
38 * currentBuffer = exchanger.exchange(currentBuffer);
39 * }
40 * } catch (InterruptedException ex) { ... handle ... }
41 * }
42 * }
43 *
44 * class EmptyingLoop implements Runnable {
45 * public void run() {
46 * DataBuffer currentBuffer = initialFullBuffer;
47 * try {
48 * while (currentBuffer != null) {
49 * takeFromBuffer(currentBuffer);
50 * if (currentBuffer.isEmpty())
51 * currentBuffer = exchanger.exchange(currentBuffer);
52 * }
53 * } catch (InterruptedException ex) { ... handle ...}
54 * }
55 * }
56 *
57 * void start() {
58 * new Thread(new FillingLoop()).start();
59 * new Thread(new EmptyingLoop()).start();
60 * }
61 * }
62 * }</pre>
63 *
64 * <p>Memory consistency effects: For each pair of threads that
65 * successfully exchange objects via an {@code Exchanger}, actions
66 * prior to the {@code exchange()} in each thread
67 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
68 * those subsequent to a return from the corresponding {@code exchange()}
69 * in the other thread.
70 *
71 * @since 1.5
72 * @author Doug Lea and Bill Scherer and Michael Scott
73 * @param <V> The type of objects that may be exchanged
74 */
75 public class Exchanger<V> {
76 /*
77 * The underlying idea is to use a stack to hold nodes containing
78 * pairs of items to be exchanged. Except that:
79 *
80 * * Only one element of the pair is known on creation by a
81 * first-arriving thread; the other is a "hole" waiting to be
82 * filled in. This is a degenerate form of the dual stacks
83 * described in "Nonblocking Concurrent Objects with Condition
84 * Synchronization", by W. N. Scherer III and M. L. Scott.
85 * 18th Annual Conf. on Distributed Computing, Oct. 2004.
86 * It is "degenerate" in that both the items and the holes
87 * are shared in the same nodes.
88 *
89 * * There isn't really a stack here! There can't be -- if two
90 * nodes were both in the stack, they should cancel themselves
91 * out by combining. So that's what we do. The 0th element of
92 * the "arena" array serves only as the top of stack. The
93 * remainder of the array is a form of the elimination backoff
94 * collision array described in "A Scalable Lock-free Stack
95 * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
96 * 16th ACM Symposium on Parallelism in Algorithms and
97 * Architectures, June 2004. Here, threads spin (using short
98 * timed waits with exponential backoff) looking for each
99 * other. If they fail to find others waiting, they try the
100 * top spot again. As shown in that paper, this always
101 * converges.
102 *
103 * The backoff elimination mechanics never come into play in
104 * common usages where only two threads ever meet to exchange
105 * items, but they prevent contention bottlenecks when an
106 * exchanger is used by a large number of threads.
107 *
108 * For more details, see the paper "A Scalable Elimination-based
109 * Exchange Channel" by William Scherer, Doug Lea, and Michael
110 * Scott in Proceedings of SCOOL05 workshop. Available at:
111 * http://hdl.handle.net/1802/2104
112 */
113
114 /** The number of CPUs, for sizing and spin control */
115 static final int NCPUS = Runtime.getRuntime().availableProcessors();
116
117 /**
118 * Size of collision space. Using a size of half the number of
119 * CPUs provides enough space for threads to find each other but
120 * not so much that it would always require one or more to time
121 * out to become unstuck. Note that the arena array holds SIZE+1
122 * elements, to include the top-of-stack slot. Imposing a ceiling
123 * is suboptimal for huge machines, but bounds backoff times to
124 * acceptable values. To ensure max times less than 2.4seconds,
125 * the ceiling value plus the shift value of backoff base (below)
126 * should be less than or equal to 31.
127 */
128 private static final int SIZE = Math.min(25, (NCPUS + 1) / 2);
129
130 /**
131 * Base unit in nanoseconds for backoffs. Must be a power of two.
132 * Should be small because backoffs exponentially increase from
133 * base. The value should be close to the round-trip time of a
134 * call to LockSupport.park in the case where some other thread
135 * has already called unpark. On multiprocessors, timed waits less
136 * than this value are implemented by spinning.
137 */
138 static final long BACKOFF_BASE = (1L << 6);
139
140 /**
141 * The number of nanoseconds for which it is faster to spin rather
142 * than to use timed park. Should normally be zero on
143 * uniprocessors and BACKOFF_BASE on multiprocessors.
144 */
145 static final long spinForTimeoutThreshold = (NCPUS < 2)? 0 : BACKOFF_BASE;
146
147 /**
148 * The number of times to spin before blocking in timed waits.
149 * The value is empirically derived -- it works well across a
150 * variety of processors and OSes. Empirically, the best value
151 * seems not to vary with number of CPUs (beyond 2) so is just
152 * a constant.
153 */
154 static final int maxTimedSpins = (NCPUS < 2)? 0 : 16;
155
156 /**
157 * The number of times to spin before blocking in untimed waits.
158 * This is greater than timed value because untimed waits spin
159 * faster since they don't need to check times on each spin.
160 */
161 static final int maxUntimedSpins = maxTimedSpins * 32;
162
163 /**
164 * Sentinel item representing cancellation. This value is placed
165 * in holes on cancellation, and used as a return value from Node
166 * methods to indicate failure to set or get hole.
167 */
168 static final Object FAIL = new Object();
169
170 /**
171 * The collision arena. arena[0] is used as the top of the stack.
172 * The remainder is used as the collision elimination space.
173 */
174 private final AtomicReference<Node>[] arena;
175
176 /**
177 * Per-thread random number generator. Because random numbers are
178 * used to choose slots and delays to reduce contention, the
179 * random number generator itself cannot introduce contention.
180 * And the statistical quality of the generator is not too
181 * important. So we use a custom cheap generator, and maintain it
182 * as a thread local.
183 */
184 private static final ThreadLocal<RNG> random = new ThreadLocal<RNG>() {
185 public RNG initialValue() { return new RNG(); } };
186
187 /**
188 * Creates a new Exchanger.
189 */
190 public Exchanger() {
191 arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
192 for (int i = 0; i < arena.length; ++i)
193 arena[i] = new AtomicReference<Node>();
194 }
195
196 /**
197 * Main exchange function, handling the different policy variants.
198 * Uses Object, not "V" as argument and return value to simplify
199 * handling of internal sentinel values. Callers from public
200 * methods cast accordingly.
201 *
202 * @param item the item to exchange
203 * @param timed true if the wait is timed
204 * @param nanos if timed, the maximum wait time
205 * @return the other thread's item
206 */
207 private Object doExchange(Object item, boolean timed, long nanos)
208 throws InterruptedException, TimeoutException {
209 long lastTime = timed ? System.nanoTime() : 0;
210 int idx = 0; // start out at slot representing top
211 int backoff = 0; // increases on failure to occupy a slot
212 Node me = new Node(item);
213
214 for (;;) {
215 AtomicReference<Node> slot = arena[idx];
216 Node you = slot.get();
217
218 // Try to occupy this slot
219 if (you == null && slot.compareAndSet(null, me)) {
220 // If this is top slot, use regular wait, else backoff-wait
221 Object v = ((idx == 0)?
222 me.waitForHole(timed, nanos) :
223 me.waitForHole(true, randomDelay(backoff)));
224 if (slot.get() == me)
225 slot.compareAndSet(me, null);
226 if (v != FAIL)
227 return v;
228 if (Thread.interrupted())
229 throw new InterruptedException();
230 if (timed) {
231 long now = System.nanoTime();
232 nanos -= now - lastTime;
233 lastTime = now;
234 if (nanos <= 0)
235 throw new TimeoutException();
236 }
237
238 me = new Node(me.item); // Throw away nodes on failure
239 if (backoff < SIZE - 1) // Increase or stay saturated
240 ++backoff;
241 idx = 0; // Restart at top
242 continue;
243 }
244
245 // Try to release waiter from apparently non-empty slot
246 if (you != null || (you = slot.get()) != null) {
247 boolean success = (you.get() == null &&
248 you.compareAndSet(null, me.item));
249 if (slot.get() == you)
250 slot.compareAndSet(you, null);
251 if (success) {
252 you.signal();
253 return you.item;
254 }
255 }
256
257 // Retry with a random non-top slot <= backoff
258 idx = backoff == 0? 1 : 1 + random.get().next() % (backoff + 1);
259 }
260 }
261
262 /**
263 * Returns a random delay less than (base times (2 raised to backoff)).
264 */
265 private long randomDelay(int backoff) {
266 return ((BACKOFF_BASE << backoff) - 1) & random.get().next();
267 }
268
269 /**
270 * Nodes hold partially exchanged data. This class
271 * opportunistically subclasses AtomicReference to represent the
272 * hole. So get() returns hole, and compareAndSet CAS'es value
273 * into hole. Note that this class cannot be parameterized as V
274 * because the sentinel value FAIL is only of type Object.
275 */
276 static final class Node extends AtomicReference<Object> {
277 private static final long serialVersionUID = -3221313401284163686L;
278
279 /** The element offered by the Thread creating this node. */
280 final Object item;
281
282 /** The Thread waiting to be signalled; null until waiting. */
283 volatile Thread waiter;
284
285 /**
286 * Creates node with given item and empty hole.
287 *
288 * @param item the item
289 */
290 Node(Object item) {
291 this.item = item;
292 }
293
294 /**
295 * Unparks thread if it is waiting
296 */
297 void signal() {
298 LockSupport.unpark(waiter);
299 }
300
301 /**
302 * Waits for and gets the hole filled in by another thread.
303 * Fails if timed out or interrupted before hole filled.
304 *
305 * @param timed true if the wait is timed
306 * @param nanos if timed, the maximum wait time
307 * @return on success, the hole; on failure, FAIL
308 */
309 Object waitForHole(boolean timed, long nanos) {
310 long lastTime = timed ? System.nanoTime() : 0;
311 int spins = timed? maxTimedSpins : maxUntimedSpins;
312 Thread w = Thread.currentThread();
313 for (;;) {
314 if (w.isInterrupted())
315 compareAndSet(null, FAIL);
316 Object h = get();
317 if (h != null)
318 return h;
319 if (timed) {
320 long now = System.nanoTime();
321 nanos -= now - lastTime;
322 lastTime = now;
323 if (nanos <= 0) {
324 compareAndSet(null, FAIL);
325 continue;
326 }
327 }
328 if (spins > 0)
329 --spins;
330 else if (waiter == null)
331 waiter = w;
332 else if (!timed)
333 LockSupport.park(this);
334 else if (nanos > spinForTimeoutThreshold)
335 LockSupport.parkNanos(this, nanos);
336 }
337 }
338 }
339
340 /**
341 * Waits for another thread to arrive at this exchange point (unless
342 * the current thread is {@link Thread#interrupt interrupted}),
343 * and then transfers the given object to it, receiving its object
344 * in return.
345 *
346 * <p>If another thread is already waiting at the exchange point then
347 * it is resumed for thread scheduling purposes and receives the object
348 * passed in by the current thread. The current thread returns immediately,
349 * receiving the object passed to the exchange by that other thread.
350 *
351 * <p>If no other thread is already waiting at the exchange then the
352 * current thread is disabled for thread scheduling purposes and lies
353 * dormant until one of two things happens:
354 * <ul>
355 * <li>Some other thread enters the exchange; or
356 * <li>Some other thread {@link Thread#interrupt interrupts} the current
357 * thread.
358 * </ul>
359 * <p>If the current thread:
360 * <ul>
361 * <li>has its interrupted status set on entry to this method; or
362 * <li>is {@link Thread#interrupt interrupted} while waiting
363 * for the exchange,
364 * </ul>
365 * then {@link InterruptedException} is thrown and the current thread's
366 * interrupted status is cleared.
367 *
368 * @param x the object to exchange
369 * @return the object provided by the other thread
370 * @throws InterruptedException if the current thread was
371 * interrupted while waiting
372 */
373 public V exchange(V x) throws InterruptedException {
374 try {
375 return (V)doExchange(x, false, 0);
376 } catch (TimeoutException cannotHappen) {
377 throw new Error(cannotHappen);
378 }
379 }
380
381 /**
382 * Waits for another thread to arrive at this exchange point (unless
383 * the current thread is {@link Thread#interrupt interrupted} or
384 * the specified waiting time elapses), and then transfers the given
385 * object to it, receiving its object in return.
386 *
387 * <p>If another thread is already waiting at the exchange point then
388 * it is resumed for thread scheduling purposes and receives the object
389 * passed in by the current thread. The current thread returns immediately,
390 * receiving the object passed to the exchange by that other thread.
391 *
392 * <p>If no other thread is already waiting at the exchange then the
393 * current thread is disabled for thread scheduling purposes and lies
394 * dormant until one of three things happens:
395 * <ul>
396 * <li>Some other thread enters the exchange; or
397 * <li>Some other thread {@link Thread#interrupt interrupts} the current
398 * thread; or
399 * <li>The specified waiting time elapses.
400 * </ul>
401 * <p>If the current thread:
402 * <ul>
403 * <li>has its interrupted status set on entry to this method; or
404 * <li>is {@link Thread#interrupt interrupted} while waiting
405 * for the exchange,
406 * </ul>
407 * then {@link InterruptedException} is thrown and the current thread's
408 * interrupted status is cleared.
409 *
410 * <p>If the specified waiting time elapses then {@link TimeoutException}
411 * is thrown.
412 * If the time is
413 * less than or equal to zero, the method will not wait at all.
414 *
415 * @param x the object to exchange
416 * @param timeout the maximum time to wait
417 * @param unit the time unit of the <tt>timeout</tt> argument
418 * @return the object provided by the other thread
419 * @throws InterruptedException if the current thread was
420 * interrupted while waiting
421 * @throws TimeoutException if the specified waiting time elapses
422 * before another thread enters the exchange
423 */
424 public V exchange(V x, long timeout, TimeUnit unit)
425 throws InterruptedException, TimeoutException {
426 return (V)doExchange(x, true, unit.toNanos(timeout));
427 }
428
429 /**
430 * Cheap XorShift random number generator used for determining
431 * elimination array slots and backoff delays. This uses the
432 * simplest of the generators described in George Marsaglia's
433 * "Xorshift RNGs" paper. This is not a high-quality generator
434 * but is acceptable here.
435 */
436 static final class RNG {
437 /** Use java.util.Random as seed generator for new RNGs. */
438 private static final Random seedGenerator = new Random();
439 private int seed = seedGenerator.nextInt() | 1;
440
441 /**
442 * Returns random nonnegative integer.
443 */
444 int next() {
445 int x = seed;
446 x ^= x << 6;
447 x ^= x >>> 21;
448 seed = x ^= x << 7;
449 return x & 0x7FFFFFFF;
450 }
451 }
452
453 }