ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.24
Committed: Thu Sep 8 00:04:00 2005 UTC (18 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.23: +1 -1 lines
Log Message:
Edit pass for happens-before descriptions

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/licenses/publicdomain
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 import java.util.concurrent.locks.*;
11 import java.util.concurrent.atomic.*;
12 import java.util.Random;
13
14 /**
15 * A synchronization point at which two threads can exchange objects.
16 * Each thread presents some object on entry to the {@link #exchange
17 * exchange} method, and receives the object presented by the other
18 * thread on return.
19 *
20 * <p><b>Sample Usage:</b>
21 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
22 * swap buffers between threads so that the thread filling the
23 * buffer gets a freshly
24 * emptied one when it needs it, handing off the filled one to
25 * the thread emptying the buffer.
26 * <pre>
27 * class FillAndEmpty {
28 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
29 * DataBuffer initialEmptyBuffer = ... a made-up type
30 * DataBuffer initialFullBuffer = ...
31 *
32 * class FillingLoop implements Runnable {
33 * public void run() {
34 * DataBuffer currentBuffer = initialEmptyBuffer;
35 * try {
36 * while (currentBuffer != null) {
37 * addToBuffer(currentBuffer);
38 * if (currentBuffer.full())
39 * currentBuffer = exchanger.exchange(currentBuffer);
40 * }
41 * } catch (InterruptedException ex) { ... handle ... }
42 * }
43 * }
44 *
45 * class EmptyingLoop implements Runnable {
46 * public void run() {
47 * DataBuffer currentBuffer = initialFullBuffer;
48 * try {
49 * while (currentBuffer != null) {
50 * takeFromBuffer(currentBuffer);
51 * if (currentBuffer.empty())
52 * currentBuffer = exchanger.exchange(currentBuffer);
53 * }
54 * } catch (InterruptedException ex) { ... handle ...}
55 * }
56 * }
57 *
58 * void start() {
59 * new Thread(new FillingLoop()).start();
60 * new Thread(new EmptyingLoop()).start();
61 * }
62 * }
63 * </pre>
64 *
65
66 * <p> Memory consistency effects: For each pair of threads that
67 * successfully exchange objects via an <tt>Exchanger</tt>,
68 * actions prior to to the <tt>exchange()</tt>
69 * in each thread <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
70 * those subsequent to the corresponding <tt>exchange()</tt> in the other
71 * thread.
72 *
73 * @since 1.5
74 * @author Doug Lea and Bill Scherer and Michael Scott
75 * @param <V> The type of objects that may be exchanged
76 */
77 public class Exchanger<V> {
78 /*
79 * The underlying idea is to use a stack to hold nodes containing
80 * pairs of items to be exchanged. Except that:
81 *
82 * * Only one element of the pair is known on creation by a
83 * first-arriving thread; the other is a "hole" waiting to be
84 * filled in. This is a degenerate form of the dual stacks
85 * described in "Nonblocking Concurrent Objects with Condition
86 * Synchronization", by W. N. Scherer III and M. L. Scott.
87 * 18th Annual Conf. on Distributed Computing, Oct. 2004.
88 * It is "degenerate" in that both the items and the holes
89 * are shared in the same nodes.
90 *
91 * * There isn't really a stack here! There can't be -- if two
92 * nodes were both in the stack, they should cancel themselves
93 * out by combining. So that's what we do. The 0th element of
94 * the "arena" array serves only as the top of stack. The
95 * remainder of the array is a form of the elimination backoff
96 * collision array described in "A Scalable Lock-free Stack
97 * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
98 * 16th ACM Symposium on Parallelism in Algorithms and
99 * Architectures, June 2004. Here, threads spin (using short
100 * timed waits with exponential backoff) looking for each
101 * other. If they fail to find others waiting, they try the
102 * top spot again. As shown in that paper, this always
103 * converges.
104 *
105 * The backoff elimination mechanics never come into play in
106 * common usages where only two threads ever meet to exchange
107 * items, but they prevent contention bottlenecks when an
108 * exchanger is used by a large number of threads.
109 */
110
111 /**
112 * Size of collision space. Using a size of half the number of
113 * CPUs provides enough space for threads to find each other but
114 * not so much that it would always require one or more to time
115 * out to become unstuck. Note that the arena array holds SIZE+1
116 * elements, to include the top-of-stack slot.
117 */
118 private static final int SIZE =
119 (Runtime.getRuntime().availableProcessors() + 1) / 2;
120
121 /**
122 * Base unit in nanoseconds for backoffs. Must be a power of two.
123 * Should be small because backoffs exponentially increase from
124 * base.
125 */
126 private static final long BACKOFF_BASE = 128L;
127
128 /**
129 * Sentinel item representing cancellation. This value is placed
130 * in holes on cancellation, and used as a return value from Node
131 * methods to indicate failure to set or get hole.
132 */
133 static final Object FAIL = new Object();
134
135 /**
136 * The collision arena. arena[0] is used as the top of the stack.
137 * The remainder is used as the collision elimination space.
138 * Each slot holds an AtomicReference<Node>, but this cannot be
139 * expressed for arrays, so elements are casted on each use.
140 */
141 private final AtomicReference[] arena;
142
143 /** Generator for random backoffs and delays. */
144 private final Random random = new Random();
145
146 /**
147 * Creates a new Exchanger.
148 */
149 public Exchanger() {
150 arena = new AtomicReference[SIZE + 1];
151 for (int i = 0; i < arena.length; ++i)
152 arena[i] = new AtomicReference();
153 }
154
155 /**
156 * Main exchange function, handling the different policy variants.
157 * Uses Object, not "V" as argument and return value to simplify
158 * handling of internal sentinel values. Callers from public
159 * methods cast accordingly.
160 * @param item the item to exchange.
161 * @param timed true if the wait is timed.
162 * @param nanos if timed, the maximum wait time.
163 * @return the other thread's item.
164 */
165 private Object doExchange(Object item, boolean timed, long nanos)
166 throws InterruptedException, TimeoutException {
167 Node me = new Node(item);
168 long lastTime = (timed)? System.nanoTime() : 0;
169 int idx = 0; // start out at slot representing top
170 int backoff = 0; // increases on failure to occupy a slot
171
172 for (;;) {
173 AtomicReference<Node> slot = (AtomicReference<Node>)arena[idx];
174
175 // If this slot is already occupied, there is a waiting item...
176 Node you = slot.get();
177 if (you != null) {
178 Object v = you.fillHole(item);
179 slot.compareAndSet(you, null);
180 if (v != FAIL) // ... unless it was cancelled
181 return v;
182 }
183
184 // Try to occupy this slot
185 if (slot.compareAndSet(null, me)) {
186 // If this is top slot, use regular wait, else backoff-wait
187 Object v = ((idx == 0)?
188 me.waitForHole(timed, nanos) :
189 me.waitForHole(true, randomDelay(backoff)));
190 slot.compareAndSet(me, null);
191 if (v != FAIL)
192 return v;
193 if (Thread.interrupted())
194 throw new InterruptedException();
195 if (timed) {
196 long now = System.nanoTime();
197 nanos -= now - lastTime;
198 lastTime = now;
199 if (nanos <= 0)
200 throw new TimeoutException();
201 }
202
203 me = new Node(item); // Throw away nodes on failure
204 if (backoff < SIZE - 1) // Increase or stay saturated
205 ++backoff;
206 idx = 0; // Restart at top
207 }
208
209 else // Retry with a random non-top slot <= backoff
210 idx = 1 + random.nextInt(backoff + 1);
211
212 }
213 }
214
215 /**
216 * Returns a random delay less than (base times (2 raised to backoff))
217 */
218 private long randomDelay(int backoff) {
219 return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
220 }
221
222 /**
223 * Nodes hold partially exchanged data. This class
224 * opportunistically subclasses AtomicReference to represent the
225 * hole. So get() returns hole, and compareAndSet CAS'es value
226 * into hole. Note that this class cannot be parameterized as V
227 * because the sentinel value FAIL is only of type Object.
228 */
229 static final class Node extends AtomicReference<Object> {
230 private static final long serialVersionUID = -3221313401284163686L;
231
232 /** The element offered by the Thread creating this node. */
233 final Object item;
234 /** The Thread creating this node. */
235 final Thread waiter;
236
237 /**
238 * Creates node with given item and empty hole.
239 * @param item the item.
240 */
241 Node(Object item) {
242 this.item = item;
243 waiter = Thread.currentThread();
244 }
245
246 /**
247 * Tries to fill in hole. On success, wakes up the waiter.
248 * @param val the value to place in hole.
249 * @return on success, the item; on failure, FAIL.
250 */
251 Object fillHole(Object val) {
252 if (compareAndSet(null, val)) {
253 LockSupport.unpark(waiter);
254 return item;
255 }
256 return FAIL;
257 }
258
259 /**
260 * Waits for and gets the hole filled in by another thread.
261 * Fails if timed out or interrupted before hole filled.
262 * @param timed true if the wait is timed.
263 * @param nanos if timed, the maximum wait time.
264 * @return on success, the hole; on failure, FAIL.
265 */
266 Object waitForHole(boolean timed, long nanos) {
267 long lastTime = (timed)? System.nanoTime() : 0;
268 Object h;
269 while ((h = get()) == null) {
270 // If interrupted or timed out, try to cancel by
271 // CASing FAIL as hole value.
272 if (Thread.currentThread().isInterrupted() ||
273 (timed && nanos <= 0))
274 compareAndSet(null, FAIL);
275 else if (!timed)
276 LockSupport.park();
277 else {
278 LockSupport.parkNanos(nanos);
279 long now = System.nanoTime();
280 nanos -= now - lastTime;
281 lastTime = now;
282 }
283 }
284 return h;
285 }
286 }
287
288 /**
289 * Waits for another thread to arrive at this exchange point (unless
290 * it is {@link Thread#interrupt interrupted}),
291 * and then transfers the given object to it, receiving its object
292 * in return.
293 *
294 * <p>If another thread is already waiting at the exchange point then
295 * it is resumed for thread scheduling purposes and receives the object
296 * passed in by the current thread. The current thread returns immediately,
297 * receiving the object passed to the exchange by that other thread.
298 *
299 * <p>If no other thread is already waiting at the exchange then the
300 * current thread is disabled for thread scheduling purposes and lies
301 * dormant until one of two things happens:
302 * <ul>
303 * <li>Some other thread enters the exchange; or
304 * <li>Some other thread {@link Thread#interrupt interrupts} the current
305 * thread.
306 * </ul>
307 * <p>If the current thread:
308 * <ul>
309 * <li>has its interrupted status set on entry to this method; or
310 * <li>is {@link Thread#interrupt interrupted} while waiting
311 * for the exchange,
312 * </ul>
313 * then {@link InterruptedException} is thrown and the current thread's
314 * interrupted status is cleared.
315 *
316 * @param x the object to exchange
317 * @return the object provided by the other thread.
318 * @throws InterruptedException if current thread was interrupted
319 * while waiting
320 */
321 public V exchange(V x) throws InterruptedException {
322 try {
323 return (V)doExchange(x, false, 0);
324 } catch (TimeoutException cannotHappen) {
325 throw new Error(cannotHappen);
326 }
327 }
328
329 /**
330 * Waits for another thread to arrive at this exchange point (unless
331 * it is {@link Thread#interrupt interrupted}, or the specified waiting
332 * time elapses),
333 * and then transfers the given object to it, receiving its object
334 * in return.
335 *
336 * <p>If another thread is already waiting at the exchange point then
337 * it is resumed for thread scheduling purposes and receives the object
338 * passed in by the current thread. The current thread returns immediately,
339 * receiving the object passed to the exchange by that other thread.
340 *
341 * <p>If no other thread is already waiting at the exchange then the
342 * current thread is disabled for thread scheduling purposes and lies
343 * dormant until one of three things happens:
344 * <ul>
345 * <li>Some other thread enters the exchange; or
346 * <li>Some other thread {@link Thread#interrupt interrupts} the current
347 * thread; or
348 * <li>The specified waiting time elapses.
349 * </ul>
350 * <p>If the current thread:
351 * <ul>
352 * <li>has its interrupted status set on entry to this method; or
353 * <li>is {@link Thread#interrupt interrupted} while waiting
354 * for the exchange,
355 * </ul>
356 * then {@link InterruptedException} is thrown and the current thread's
357 * interrupted status is cleared.
358 *
359 * <p>If the specified waiting time elapses then {@link TimeoutException}
360 * is thrown.
361 * If the time is
362 * less than or equal to zero, the method will not wait at all.
363 *
364 * @param x the object to exchange
365 * @param timeout the maximum time to wait
366 * @param unit the time unit of the <tt>timeout</tt> argument.
367 * @return the object provided by the other thread.
368 * @throws InterruptedException if current thread was interrupted
369 * while waiting
370 * @throws TimeoutException if the specified waiting time elapses before
371 * another thread enters the exchange.
372 */
373 public V exchange(V x, long timeout, TimeUnit unit)
374 throws InterruptedException, TimeoutException {
375 return (V)doExchange(x, true, unit.toNanos(timeout));
376 }
377 }