ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.36
Committed: Tue Feb 7 20:54:24 2006 UTC (18 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.35: +0 -1 lines
Log Message:
6378729: Remove workaround for 6280605

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/licenses/publicdomain
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.Random;
12
13 /**
14 * A synchronization point at which threads can pair and swap elements
15 * within pairs. Each thread presents some object on entry to the
16 * {@link #exchange exchange} method, matches with a partner thread,
17 * and receives its partner's object on return.
18 *
19 * <p><b>Sample Usage:</b>
20 * Here are the highlights of a class that uses an {@code Exchanger}
21 * to swap buffers between threads so that the thread filling the
22 * buffer gets a freshly emptied one when it needs it, handing off the
23 * filled one to the thread emptying the buffer.
24 * <pre>{@code
25 * class FillAndEmpty {
26 * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
27 * DataBuffer initialEmptyBuffer = ... a made-up type
28 * DataBuffer initialFullBuffer = ...
29 *
30 * class FillingLoop implements Runnable {
31 * public void run() {
32 * DataBuffer currentBuffer = initialEmptyBuffer;
33 * try {
34 * while (currentBuffer != null) {
35 * addToBuffer(currentBuffer);
36 * if (currentBuffer.isFull())
37 * currentBuffer = exchanger.exchange(currentBuffer);
38 * }
39 * } catch (InterruptedException ex) { ... handle ... }
40 * }
41 * }
42 *
43 * class EmptyingLoop implements Runnable {
44 * public void run() {
45 * DataBuffer currentBuffer = initialFullBuffer;
46 * try {
47 * while (currentBuffer != null) {
48 * takeFromBuffer(currentBuffer);
49 * if (currentBuffer.isEmpty())
50 * currentBuffer = exchanger.exchange(currentBuffer);
51 * }
52 * } catch (InterruptedException ex) { ... handle ...}
53 * }
54 * }
55 *
56 * void start() {
57 * new Thread(new FillingLoop()).start();
58 * new Thread(new EmptyingLoop()).start();
59 * }
60 * }
61 * }</pre>
62 *
63 * <p>Memory consistency effects: For each pair of threads that
64 * successfully exchange objects via an {@code Exchanger}, actions
65 * prior to the {@code exchange()} in each thread
66 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
67 * those subsequent to a return from the corresponding {@code exchange()}
68 * in the other thread.
69 *
70 * @since 1.5
71 * @author Doug Lea and Bill Scherer and Michael Scott
72 * @param <V> The type of objects that may be exchanged
73 */
74 public class Exchanger<V> {
75 /*
76 * The underlying idea is to use a stack to hold nodes containing
77 * pairs of items to be exchanged. Except that:
78 *
79 * * Only one element of the pair is known on creation by a
80 * first-arriving thread; the other is a "hole" waiting to be
81 * filled in. This is a degenerate form of the dual stacks
82 * described in "Nonblocking Concurrent Objects with Condition
83 * Synchronization", by W. N. Scherer III and M. L. Scott.
84 * 18th Annual Conf. on Distributed Computing, Oct. 2004.
85 * It is "degenerate" in that both the items and the holes
86 * are shared in the same nodes.
87 *
88 * * There isn't really a stack here! There can't be -- if two
89 * nodes were both in the stack, they should cancel themselves
90 * out by combining. So that's what we do. The 0th element of
91 * the "arena" array serves only as the top of stack. The
92 * remainder of the array is a form of the elimination backoff
93 * collision array described in "A Scalable Lock-free Stack
94 * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
95 * 16th ACM Symposium on Parallelism in Algorithms and
96 * Architectures, June 2004. Here, threads spin (using short
97 * timed waits with exponential backoff) looking for each
98 * other. If they fail to find others waiting, they try the
99 * top spot again. As shown in that paper, this always
100 * converges.
101 *
102 * The backoff elimination mechanics never come into play in
103 * common usages where only two threads ever meet to exchange
104 * items, but they prevent contention bottlenecks when an
105 * exchanger is used by a large number of threads.
106 *
107 * For more details, see the paper "A Scalable Elimination-based
108 * Exchange Channel" by William Scherer, Doug Lea, and Michael
109 * Scott in Proceedings of SCOOL05 workshop. Available at:
110 * http://hdl.handle.net/1802/2104
111 */
112
113 /** The number of CPUs, for sizing and spin control */
114 static final int NCPUS = Runtime.getRuntime().availableProcessors();
115
116 /**
117 * Size of collision space. Using a size of half the number of
118 * CPUs provides enough space for threads to find each other but
119 * not so much that it would always require one or more to time
120 * out to become unstuck. Note that the arena array holds SIZE+1
121 * elements, to include the top-of-stack slot. Imposing a ceiling
122 * is suboptimal for huge machines, but bounds backoff times to
123 * acceptable values. To ensure max times less than 2.4 seconds,
124 * the ceiling value plus the shift value of backoff base (below)
125 * should be less than or equal to 31.
126 */
127 private static final int SIZE = Math.min(25, (NCPUS + 1) / 2);
128
129 /**
130 * Base unit in nanoseconds for backoffs. Must be a power of two.
131 * Should be small because backoffs exponentially increase from base.
132 * The value should be close to the round-trip time of a call to
133 * LockSupport.park in the case where some other thread has already
134 * called unpark. On multiprocessors, timed waits less than this value
135 * are implemented by spinning.
136 */
137 static final long BACKOFF_BASE = (1L << 6);
138
139 /**
140 * The number of nanoseconds for which it is faster to spin rather
141 * than to use timed park. Should normally be zero on
142 * uniprocessors and BACKOFF_BASE on multiprocessors.
143 */
144 static final long spinForTimeoutThreshold = (NCPUS < 2) ? 0 : BACKOFF_BASE;
145
146 /**
147 * The number of times to spin before blocking in timed waits.
148 * The value is empirically derived -- it works well across a
149 * variety of processors and OSes. Empirically, the best value
150 * seems not to vary with number of CPUs (beyond 2) so is just
151 * a constant.
152 */
153 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 16;
154
155 /**
156 * The number of times to spin before blocking in untimed waits.
157 * This is greater than timed value because untimed waits spin
158 * faster since they don't need to check times on each spin.
159 */
160 static final int maxUntimedSpins = maxTimedSpins * 32;
161
162 /**
163 * Sentinel item representing cancellation. This value is placed
164 * in holes on cancellation, and used as a return value from Node
165 * methods to indicate failure to set or get hole.
166 */
167 static final Object FAIL = new Object();
168
169 /**
170 * The collision arena. arena[0] is used as the top of the stack.
171 * The remainder is used as the collision elimination space.
172 */
173 private final AtomicReference<Node>[] arena;
174
175 /**
176 * Per-thread random number generator. Because random numbers
177 * are used to choose slots and delays to reduce contention, the
178 * random number generator itself cannot introduce contention.
179 * And the statistical quality of the generator is not too
180 * important. So we use a custom cheap generator, and maintain
181 * it as a thread local.
182 */
183 private static final ThreadLocal<RNG> random = new ThreadLocal<RNG>() {
184 public RNG initialValue() { return new RNG(); } };
185
186 /**
187 * Creates a new Exchanger.
188 */
189 public Exchanger() {
190 arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
191 for (int i = 0; i < arena.length; ++i)
192 arena[i] = new AtomicReference<Node>();
193 }
194
195 /**
196 * Main exchange function, handling the different policy variants.
197 * Uses Object, not "V" as argument and return value to simplify
198 * handling of internal sentinel values. Callers from public
199 * methods cast accordingly.
200 *
201 * @param item the item to exchange
202 * @param timed true if the wait is timed
203 * @param nanos if timed, the maximum wait time
204 * @return the other thread's item
205 */
206 private Object doExchange(Object item, boolean timed, long nanos)
207 throws InterruptedException, TimeoutException {
208 long lastTime = timed ? System.nanoTime() : 0;
209 int idx = 0; // start out at slot representing top
210 int backoff = 0; // increases on failure to occupy a slot
211 Node me = new Node(item);
212
213 for (;;) {
214 AtomicReference<Node> slot = arena[idx];
215 Node you = slot.get();
216
217 // Try to occupy this slot
218 if (you == null && slot.compareAndSet(null, me)) {
219 // If this is top slot, use regular wait, else backoff-wait
220 Object v = ((idx == 0)?
221 me.waitForHole(timed, nanos) :
222 me.waitForHole(true, randomDelay(backoff)));
223 if (slot.get() == me)
224 slot.compareAndSet(me, null);
225 if (v != FAIL)
226 return v;
227 if (Thread.interrupted())
228 throw new InterruptedException();
229 if (timed) {
230 long now = System.nanoTime();
231 nanos -= now - lastTime;
232 lastTime = now;
233 if (nanos <= 0)
234 throw new TimeoutException();
235 }
236
237 me = new Node(me.item); // Throw away nodes on failure
238 if (backoff < SIZE - 1) // Increase or stay saturated
239 ++backoff;
240 idx = 0; // Restart at top
241 continue;
242 }
243
244 // Try to release waiter from apparently non-empty slot
245 if (you != null || (you = slot.get()) != null) {
246 boolean success = (you.get() == null &&
247 you.compareAndSet(null, me.item));
248 if (slot.get() == you)
249 slot.compareAndSet(you, null);
250 if (success) {
251 you.signal();
252 return you.item;
253 }
254 }
255
256 // Retry with a random non-top slot <= backoff
257 idx = backoff == 0 ? 1 : 1 + random.get().next() % (backoff + 1);
258 }
259 }
260
261 /**
262 * Returns a random delay less than (base times (2 raised to backoff)).
263 */
264 private long randomDelay(int backoff) {
265 return ((BACKOFF_BASE << backoff) - 1) & random.get().next();
266 }
267
268 /**
269 * Nodes hold partially exchanged data. This class
270 * opportunistically subclasses AtomicReference to represent the
271 * hole. So get() returns hole, and compareAndSet CAS'es value
272 * into hole. Note that this class cannot be parameterized as V
273 * because the sentinel value FAIL is only of type Object.
274 */
275 static final class Node extends AtomicReference<Object> {
276 private static final long serialVersionUID = -3221313401284163686L;
277
278 /** The element offered by the Thread creating this node. */
279 final Object item;
280
281 /** The Thread waiting to be signalled; null until waiting. */
282 volatile Thread waiter;
283
284 /**
285 * Creates node with given item and empty hole.
286 *
287 * @param item the item
288 */
289 Node(Object item) {
290 this.item = item;
291 }
292
293 /**
294 * Unparks thread if it is waiting.
295 */
296 void signal() {
297 LockSupport.unpark(waiter);
298 }
299
300 /**
301 * Waits for and gets the hole filled in by another thread.
302 * Fails if timed out or interrupted before hole filled.
303 *
304 * @param timed true if the wait is timed
305 * @param nanos if timed, the maximum wait time
306 * @return on success, the hole; on failure, FAIL
307 */
308 Object waitForHole(boolean timed, long nanos) {
309 long lastTime = timed ? System.nanoTime() : 0;
310 int spins = timed ? maxTimedSpins : maxUntimedSpins;
311 Thread w = Thread.currentThread();
312 for (;;) {
313 if (w.isInterrupted())
314 compareAndSet(null, FAIL);
315 Object h = get();
316 if (h != null)
317 return h;
318 if (timed) {
319 long now = System.nanoTime();
320 nanos -= now - lastTime;
321 lastTime = now;
322 if (nanos <= 0) {
323 compareAndSet(null, FAIL);
324 continue;
325 }
326 }
327 if (spins > 0)
328 --spins;
329 else if (waiter == null)
330 waiter = w;
331 else if (!timed)
332 LockSupport.park(this);
333 else if (nanos > spinForTimeoutThreshold)
334 LockSupport.parkNanos(this, nanos);
335 }
336 }
337 }
338
339 /**
340 * Waits for another thread to arrive at this exchange point (unless
341 * the current thread is {@link Thread#interrupt interrupted}),
342 * and then transfers the given object to it, receiving its object
343 * in return.
344 *
345 * <p>If another thread is already waiting at the exchange point then
346 * it is resumed for thread scheduling purposes and receives the object
347 * passed in by the current thread. The current thread returns immediately,
348 * receiving the object passed to the exchange by that other thread.
349 *
350 * <p>If no other thread is already waiting at the exchange then the
351 * current thread is disabled for thread scheduling purposes and lies
352 * dormant until one of two things happens:
353 * <ul>
354 * <li>Some other thread enters the exchange; or
355 * <li>Some other thread {@link Thread#interrupt interrupts} the current
356 * thread.
357 * </ul>
358 * <p>If the current thread:
359 * <ul>
360 * <li>has its interrupted status set on entry to this method; or
361 * <li>is {@link Thread#interrupt interrupted} while waiting
362 * for the exchange,
363 * </ul>
364 * then {@link InterruptedException} is thrown and the current thread's
365 * interrupted status is cleared.
366 *
367 * @param x the object to exchange
368 * @return the object provided by the other thread
369 * @throws InterruptedException if the current thread was
370 * interrupted while waiting
371 */
372 public V exchange(V x) throws InterruptedException {
373 try {
374 return (V)doExchange(x, false, 0);
375 } catch (TimeoutException cannotHappen) {
376 throw new Error(cannotHappen);
377 }
378 }
379
380 /**
381 * Waits for another thread to arrive at this exchange point (unless
382 * the current thread is {@link Thread#interrupt interrupted} or
383 * the specified waiting time elapses), and then transfers the given
384 * object to it, receiving its object in return.
385 *
386 * <p>If another thread is already waiting at the exchange point then
387 * it is resumed for thread scheduling purposes and receives the object
388 * passed in by the current thread. The current thread returns immediately,
389 * receiving the object passed to the exchange by that other thread.
390 *
391 * <p>If no other thread is already waiting at the exchange then the
392 * current thread is disabled for thread scheduling purposes and lies
393 * dormant until one of three things happens:
394 * <ul>
395 * <li>Some other thread enters the exchange; or
396 * <li>Some other thread {@link Thread#interrupt interrupts} the current
397 * thread; or
398 * <li>The specified waiting time elapses.
399 * </ul>
400 * <p>If the current thread:
401 * <ul>
402 * <li>has its interrupted status set on entry to this method; or
403 * <li>is {@link Thread#interrupt interrupted} while waiting
404 * for the exchange,
405 * </ul>
406 * then {@link InterruptedException} is thrown and the current thread's
407 * interrupted status is cleared.
408 *
409 * <p>If the specified waiting time elapses then {@link TimeoutException}
410 * is thrown.
411 * If the time is
412 * less than or equal to zero, the method will not wait at all.
413 *
414 * @param x the object to exchange
415 * @param timeout the maximum time to wait
416 * @param unit the time unit of the <tt>timeout</tt> argument
417 * @return the object provided by the other thread
418 * @throws InterruptedException if the current thread was
419 * interrupted while waiting
420 * @throws TimeoutException if the specified waiting time elapses
421 * before another thread enters the exchange
422 */
423 public V exchange(V x, long timeout, TimeUnit unit)
424 throws InterruptedException, TimeoutException {
425 return (V)doExchange(x, true, unit.toNanos(timeout));
426 }
427
428 /**
429 * Cheap XorShift random number generator used for determining
430 * elimination array slots and backoff delays. This uses the
431 * simplest of the generators described in George Marsaglia's
432 * "Xorshift RNGs" paper. This is not a high-quality generator
433 * but is acceptable here.
434 */
435 static final class RNG {
436 /** Use java.util.Random as seed generator for new RNGs. */
437 private static final Random seedGenerator = new Random();
438 private int seed = seedGenerator.nextInt() | 1;
439
440 /**
441 * Returns random nonnegative integer.
442 */
443 int next() {
444 int x = seed;
445 x ^= x << 6;
446 x ^= x >>> 21;
447 seed = x ^= x << 7;
448 return x & 0x7FFFFFFF;
449 }
450 }
451
452 }