ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.30
Committed: Sun Nov 6 15:30:24 2005 UTC (18 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.29: +35 -26 lines
Log Message:
Incorporate review suggestions

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/licenses/publicdomain
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 import java.util.concurrent.locks.*;
11 import java.util.concurrent.atomic.*;
12 import java.util.Random;
13
14 /**
15 * A synchronization point at which threads can pair and swap elements
16 * within pairs. Each thread presents some object on entry to the
17 * {@link #exchange exchange} method, matches with a partner thread,
18 * and receives its partner's object on return.
19 *
20 * <p><b>Sample Usage:</b>
21 * Here are the highlights of a class that uses an {@code Exchanger}
22 * to swap buffers between threads so that the thread filling the
23 * buffer gets a freshly emptied one when it needs it, handing off the
24 * filled one to the thread emptying the buffer.
25 * <pre>{@code
26 * class FillAndEmpty {
27 * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
28 * DataBuffer initialEmptyBuffer = ... a made-up type
29 * DataBuffer initialFullBuffer = ...
30 *
31 * class FillingLoop implements Runnable {
32 * public void run() {
33 * DataBuffer currentBuffer = initialEmptyBuffer;
34 * try {
35 * while (currentBuffer != null) {
36 * addToBuffer(currentBuffer);
37 * if (currentBuffer.isFull())
38 * currentBuffer = exchanger.exchange(currentBuffer);
39 * }
40 * } catch (InterruptedException ex) { ... handle ... }
41 * }
42 * }
43 *
44 * class EmptyingLoop implements Runnable {
45 * public void run() {
46 * DataBuffer currentBuffer = initialFullBuffer;
47 * try {
48 * while (currentBuffer != null) {
49 * takeFromBuffer(currentBuffer);
50 * if (currentBuffer.isEmpty())
51 * currentBuffer = exchanger.exchange(currentBuffer);
52 * }
53 * } catch (InterruptedException ex) { ... handle ...}
54 * }
55 * }
56 *
57 * void start() {
58 * new Thread(new FillingLoop()).start();
59 * new Thread(new EmptyingLoop()).start();
60 * }
61 * }
62 * }</pre>
63 *
64 * <p>Memory consistency effects: For each pair of threads that
65 * successfully exchange objects via an {@code Exchanger}, actions
66 * prior to the {@code exchange()} in each thread
67 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
68 * those subsequent to a return from the corresponding {@code exchange()}
69 * in the other thread.
70 *
71 * @since 1.5
72 * @author Doug Lea and Bill Scherer and Michael Scott
73 * @param <V> The type of objects that may be exchanged
74 */
75 public class Exchanger<V> {
76 /*
77 * The underlying idea is to use a stack to hold nodes containing
78 * pairs of items to be exchanged. Except that:
79 *
80 * * Only one element of the pair is known on creation by a
81 * first-arriving thread; the other is a "hole" waiting to be
82 * filled in. This is a degenerate form of the dual stacks
83 * described in "Nonblocking Concurrent Objects with Condition
84 * Synchronization", by W. N. Scherer III and M. L. Scott.
85 * 18th Annual Conf. on Distributed Computing, Oct. 2004.
86 * It is "degenerate" in that both the items and the holes
87 * are shared in the same nodes.
88 *
89 * * There isn't really a stack here! There can't be -- if two
90 * nodes were both in the stack, they should cancel themselves
91 * out by combining. So that's what we do. The 0th element of
92 * the "arena" array serves only as the top of stack. The
93 * remainder of the array is a form of the elimination backoff
94 * collision array described in "A Scalable Lock-free Stack
95 * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
96 * 16th ACM Symposium on Parallelism in Algorithms and
97 * Architectures, June 2004. Here, threads spin (using short
98 * timed waits with exponential backoff) looking for each
99 * other. If they fail to find others waiting, they try the
100 * top spot again. As shown in that paper, this always
101 * converges.
102 *
103 * The backoff elimination mechanics never come into play in
104 * common usages where only two threads ever meet to exchange
105 * items, but they prevent contention bottlenecks when an
106 * exchanger is used by a large number of threads.
107 *
108 * For more details, see the paper "A Scalable Elimination-based
109 * Exchange Channel" by William Scherer, Doug Lea, and Michael
110 * Scott in Proceedings of SCOOL05 workshop. Available at:
111 * http://hdl.handle.net/1802/2104
112 */
113
114 /**
115 * Size of collision space. Using a size of half the number of
116 * CPUs provides enough space for threads to find each other but
117 * not so much that it would always require one or more to time
118 * out to become unstuck. Note that the arena array holds SIZE+1
119 * elements, to include the top-of-stack slot.
120 */
121 private static final int SIZE =
122 (Runtime.getRuntime().availableProcessors() + 1) / 2;
123
124 /**
125 * Base unit in nanoseconds for backoffs. Must be a power of two.
126 * Should be small because backoffs exponentially increase from
127 * base.
128 */
129 private static final long BACKOFF_BASE = 128L;
130
131 /**
132 * Sentinel item representing cancellation. This value is placed
133 * in holes on cancellation, and used as a return value from Node
134 * methods to indicate failure to set or get hole.
135 */
136 static final Object FAIL = new Object();
137
138 /**
139 * The collision arena. arena[0] is used as the top of the stack.
140 * The remainder is used as the collision elimination space.
141 * Each slot holds an AtomicReference<Node>, but this cannot be
142 * expressed for arrays, so elements are casted on each use.
143 */
144 private final AtomicReference<Node>[] arena;
145
146 /** Generator for random backoffs and delays. */
147 private final Random random = new Random();
148
149 /**
150 * Creates a new Exchanger.
151 */
152 public Exchanger() {
153 arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
154 for (int i = 0; i < arena.length; ++i)
155 arena[i] = new AtomicReference<Node>();
156 }
157
158 /**
159 * Main exchange function, handling the different policy variants.
160 * Uses Object, not "V" as argument and return value to simplify
161 * handling of internal sentinel values. Callers from public
162 * methods cast accordingly.
163 *
164 * @param item the item to exchange
165 * @param timed true if the wait is timed
166 * @param nanos if timed, the maximum wait time
167 * @return the other thread's item
168 */
169 private Object doExchange(Object item, boolean timed, long nanos)
170 throws InterruptedException, TimeoutException {
171 Node me = new Node(item);
172 long lastTime = timed ? System.nanoTime() : 0;
173 int idx = 0; // start out at slot representing top
174 int backoff = 0; // increases on failure to occupy a slot
175
176 for (;;) {
177 AtomicReference<Node> slot = arena[idx];
178
179 // If this slot is already occupied, there is a waiting item...
180 Node you = slot.get();
181 if (you != null) {
182 Object v = you.fillHole(item);
183 slot.compareAndSet(you, null);
184 if (v != FAIL) // ... unless it was cancelled
185 return v;
186 }
187
188 // Try to occupy this slot
189 if (slot.compareAndSet(null, me)) {
190 // If this is top slot, use regular wait, else backoff-wait
191 Object v = ((idx == 0)?
192 me.waitForHole(timed, nanos) :
193 me.waitForHole(true, randomDelay(backoff)));
194 slot.compareAndSet(me, null);
195 if (v != FAIL)
196 return v;
197 if (Thread.interrupted())
198 throw new InterruptedException();
199 if (timed) {
200 long now = System.nanoTime();
201 nanos -= now - lastTime;
202 lastTime = now;
203 if (nanos <= 0)
204 throw new TimeoutException();
205 }
206
207 me = new Node(item); // Throw away nodes on failure
208 if (backoff < SIZE - 1) // Increase or stay saturated
209 ++backoff;
210 idx = 0; // Restart at top
211 }
212
213 else // Retry with a random non-top slot <= backoff
214 idx = 1 + random.nextInt(backoff + 1);
215
216 }
217 }
218
219 /**
220 * Returns a random delay less than (base times (2 raised to backoff))
221 */
222 private long randomDelay(int backoff) {
223 return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
224 }
225
226 /**
227 * Nodes hold partially exchanged data. This class
228 * opportunistically subclasses AtomicReference to represent the
229 * hole. So get() returns hole, and compareAndSet CAS'es value
230 * into hole. Note that this class cannot be parameterized as V
231 * because the sentinel value FAIL is only of type Object.
232 */
233 static final class Node extends AtomicReference<Object> {
234 private static final long serialVersionUID = -3221313401284163686L;
235
236 /** The element offered by the Thread creating this node. */
237 final Object item;
238 /** The Thread creating this node. */
239 final Thread waiter;
240
241 /**
242 * Creates node with given item and empty hole.
243 * @param item the item.
244 */
245 Node(Object item) {
246 this.item = item;
247 waiter = Thread.currentThread();
248 }
249
250 /**
251 * Tries to fill in hole. On success, wakes up the waiter.
252 * @param val the value to place in hole.
253 * @return on success, the item; on failure, FAIL.
254 */
255 Object fillHole(Object val) {
256 if (compareAndSet(null, val)) {
257 LockSupport.unpark(waiter);
258 return item;
259 }
260 return FAIL;
261 }
262
263 /**
264 * Waits for and gets the hole filled in by another thread.
265 * Fails if timed out or interrupted before hole filled.
266 *
267 * @param timed true if the wait is timed
268 * @param nanos if timed, the maximum wait time
269 * @return on success, the hole; on failure, FAIL
270 */
271 Object waitForHole(boolean timed, long nanos) {
272 long lastTime = timed ? System.nanoTime() : 0;
273 Object h;
274 while ((h = get()) == null) {
275 // If interrupted or timed out, try to cancel by
276 // CASing FAIL as hole value.
277 if (Thread.currentThread().isInterrupted() ||
278 (timed && nanos <= 0)) {
279 if (compareAndSet(null, FAIL))
280 return FAIL;
281 }
282 else if (!timed)
283 LockSupport.park();
284 else {
285 LockSupport.parkNanos(nanos);
286 long now = System.nanoTime();
287 nanos -= now - lastTime;
288 lastTime = now;
289 }
290 }
291 return h;
292 }
293 }
294
295 /**
296 * Waits for another thread to arrive at this exchange point (unless
297 * it is {@link Thread#interrupt interrupted}),
298 * and then transfers the given object to it, receiving its object
299 * in return.
300 *
301 * <p>If another thread is already waiting at the exchange point then
302 * it is resumed for thread scheduling purposes and receives the object
303 * passed in by the current thread. The current thread returns immediately,
304 * receiving the object passed to the exchange by that other thread.
305 *
306 * <p>If no other thread is already waiting at the exchange then the
307 * current thread is disabled for thread scheduling purposes and lies
308 * dormant until one of two things happens:
309 * <ul>
310 * <li>Some other thread enters the exchange; or
311 * <li>Some other thread {@link Thread#interrupt interrupts} the current
312 * thread.
313 * </ul>
314 * <p>If the current thread:
315 * <ul>
316 * <li>has its interrupted status set on entry to this method; or
317 * <li>is {@link Thread#interrupt interrupted} while waiting
318 * for the exchange,
319 * </ul>
320 * then {@link InterruptedException} is thrown and the current thread's
321 * interrupted status is cleared.
322 *
323 * @param x the object to exchange
324 * @return the object provided by the other thread
325 * @throws InterruptedException if the current thread was
326 * interrupted while waiting
327 */
328 public V exchange(V x) throws InterruptedException {
329 try {
330 return (V)doExchange(x, false, 0);
331 } catch (TimeoutException cannotHappen) {
332 throw new Error(cannotHappen);
333 }
334 }
335
336 /**
337 * Waits for another thread to arrive at this exchange point (unless
338 * it is {@link Thread#interrupt interrupted}, or the specified waiting
339 * time elapses),
340 * and then transfers the given object to it, receiving its object
341 * in return.
342 *
343 * <p>If another thread is already waiting at the exchange point then
344 * it is resumed for thread scheduling purposes and receives the object
345 * passed in by the current thread. The current thread returns immediately,
346 * receiving the object passed to the exchange by that other thread.
347 *
348 * <p>If no other thread is already waiting at the exchange then the
349 * current thread is disabled for thread scheduling purposes and lies
350 * dormant until one of three things happens:
351 * <ul>
352 * <li>Some other thread enters the exchange; or
353 * <li>Some other thread {@link Thread#interrupt interrupts} the current
354 * thread; or
355 * <li>The specified waiting time elapses.
356 * </ul>
357 * <p>If the current thread:
358 * <ul>
359 * <li>has its interrupted status set on entry to this method; or
360 * <li>is {@link Thread#interrupt interrupted} while waiting
361 * for the exchange,
362 * </ul>
363 * then {@link InterruptedException} is thrown and the current thread's
364 * interrupted status is cleared.
365 *
366 * <p>If the specified waiting time elapses then {@link TimeoutException}
367 * is thrown.
368 * If the time is
369 * less than or equal to zero, the method will not wait at all.
370 *
371 * @param x the object to exchange
372 * @param timeout the maximum time to wait
373 * @param unit the time unit of the <tt>timeout</tt> argument
374 * @return the object provided by the other thread
375 * @throws InterruptedException if the current thread was
376 * interrupted while waiting
377 * @throws TimeoutException if the specified waiting time elapses
378 * before another thread enters the exchange
379 */
380 public V exchange(V x, long timeout, TimeUnit unit)
381 throws InterruptedException, TimeoutException {
382 return (V)doExchange(x, true, unit.toNanos(timeout));
383 }
384 }