ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.16
Committed: Sun May 1 11:48:44 2005 UTC (19 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.15: +197 -75 lines
Log Message:
Replace algorithm with more scalable one that also fixes timeout bug

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/licenses/publicdomain
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.Random;
12
13 /**
14 * A synchronization point at which two threads can exchange objects.
15 * Each thread presents some object on entry to the {@link #exchange
16 * exchange} method, and receives the object presented by the other
17 * thread on return.
18 *
19 * <p><b>Sample Usage:</b>
20 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
21 * swap buffers between threads so that the thread filling the
22 * buffer gets a freshly
23 * emptied one when it needs it, handing off the filled one to
24 * the thread emptying the buffer.
25 * <pre>
26 * class FillAndEmpty {
27 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
28 * DataBuffer initialEmptyBuffer = ... a made-up type
29 * DataBuffer initialFullBuffer = ...
30 *
31 * class FillingLoop implements Runnable {
32 * public void run() {
33 * DataBuffer currentBuffer = initialEmptyBuffer;
34 * try {
35 * while (currentBuffer != null) {
36 * addToBuffer(currentBuffer);
37 * if (currentBuffer.full())
38 * currentBuffer = exchanger.exchange(currentBuffer);
39 * }
40 * } catch (InterruptedException ex) { ... handle ... }
41 * }
42 * }
43 *
44 * class EmptyingLoop implements Runnable {
45 * public void run() {
46 * DataBuffer currentBuffer = initialFullBuffer;
47 * try {
48 * while (currentBuffer != null) {
49 * takeFromBuffer(currentBuffer);
50 * if (currentBuffer.empty())
51 * currentBuffer = exchanger.exchange(currentBuffer);
52 * }
53 * } catch (InterruptedException ex) { ... handle ...}
54 * }
55 * }
56 *
57 * void start() {
58 * new Thread(new FillingLoop()).start();
59 * new Thread(new EmptyingLoop()).start();
60 * }
61 * }
62 * </pre>
63 *
64 * @since 1.5
65 * @author Doug Lea and Bill Scherer and Michael Scott
66 * @param <V> The type of objects that may be exchanged
67 */
68 public class Exchanger<V> {
69 /*
70 * The underlying idea is to use a stack to hold nodes containing
71 * pairs of items to be exchanged. Except that:
72 *
73 * * Only one element of the pair is known on creation by a
74 * first-arriving thread; the other is a "hole" waiting to be
75 * filled in. This is a degenerate form of the dual stacks
76 * described in "Nonblocking Concurrent Objects with Condition
77 * Synchronization", by W. N. Scherer III and M. L. Scott.
78 * 18th Annual Conf. on Distributed Computing, Oct. 2004.
79 * It is "degenerate" in that both the items and the holes
80 * are shared in the same nodes.
81 *
82 * * There isn't really a stack here! There can't be -- if two
83 * nodes were both in the stack, they should cancel themselves
84 * out by combining. So that's what we do. The 0th element of
85 * the "arena" array serves only as the top of stack. The
86 * remainder of the array is a form of the elimination backoff
87 * collision array described in "A Scalable Lock-free Stack
88 * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
89 * 16th ACM Symposium on Parallelism in Algorithms and
90 * Architectures, June 2004. Here, threads spin (using short
91 * timed waits with exponential backoff) looking for each
92 * other. If they fail to find others waiting, they try the
93 * top spot again. As shown in that paper, this always
94 * converges.
95 *
96 * The backoff elimination mechanics never come into play in
97 * common usages where only two threads ever meet to exchange
98 * items, but they prevent contention bottlenecks when an
99 * exchanger is used by a large number of threads.
100 */
101
102 /**
103 * Size of collision space. Using a size of half the number of
104 * CPUs provides enough space for threads to find each other but
105 * not so much that it would always require one or more to time
106 * out to become unstuck. Note that the arena array holds SIZE+1
107 * elements, to include the top-of-stack slot.
108 */
109 private static final int SIZE =
110 (Runtime.getRuntime().availableProcessors() + 1) / 2;
111
112 /**
113 * Base unit in nanoseconds for backoffs. Must be a power of two.
114 * Should be small because backoffs exponentially increase from
115 * base.
116 */
117 private static final long BACKOFF_BASE = 128L;
118
119 /**
120 * Sentinel item representing cancellation. This value is placed
121 * in holes on cancellation, and used as a return value from Node
122 * methods to indicate failure to set or get hole.
123 **/
124 static final Object FAIL = new Object();
125
126 /**
127 * The collision arena. arena[0] is used as the top of the stack.
128 * The remainder is used as the collision elimination space.
129 * Each slot holds an AtomicReference<Node>, but this cannot be
130 * expressed for arrays, so elements are casted on each use.
131 */
132 private final AtomicReference[] arena;
133
134 /** Generator for random backoffs and delays. */
135 private final Random random = new Random();
136
137 /**
138 * Creates a new Exchanger.
139 */
140 public Exchanger() {
141 arena = new AtomicReference[SIZE + 1];
142 for (int i = 0; i < arena.length; ++i)
143 arena[i] = new AtomicReference();
144 }
145
146 /**
147 * Main exchange function, handling the different policy variants.
148 * Uses Object, not "V" as argument and return value to simplify
149 * handling of internal sentinel values. Callers from public
150 * methods cast accordingly.
151 * @param item the item to exchange.
152 * @param timed true if the wait is timed.
153 * @param nanos if timed, the maximum wait time.
154 * @return the other thread's item.
155 */
156 private Object doExchange(Object item, boolean timed, long nanos)
157 throws InterruptedException, TimeoutException {
158 Node me = new Node(item);
159 long lastTime = (timed)? System.nanoTime() : 0;
160 int idx = 0; // start out at slot representing top
161 int backoff = 0; // increases on failure to occupy a slot
162
163 for (;;) {
164 AtomicReference<Node> slot = (AtomicReference<Node>)arena[idx];
165
166 // If this slot is already occupied, there is a waiting item...
167 Node you = slot.get();
168 if (you != null) {
169 Object v = you.fillHole(item);
170 slot.compareAndSet(you, null);
171 if (v != FAIL) // ... unless it was cancelled
172 return v;
173 }
174
175 // Try to occupy this slot
176 if (slot.compareAndSet(null, me)) {
177 // If this is top slot, use regular wait, else backoff-wait
178 Object v = ((idx == 0)?
179 me.waitForHole(timed, nanos) :
180 me.waitForHole(true, randomDelay(backoff)));
181 slot.compareAndSet(me, null);
182 if (v != FAIL)
183 return v;
184 if (Thread.interrupted())
185 throw new InterruptedException();
186 if (timed) {
187 long now = System.nanoTime();
188 nanos -= now - lastTime;
189 lastTime = now;
190 if (nanos <= 0)
191 throw new TimeoutException();
192 }
193
194 me = new Node(item); // Throw away nodes on failure
195 if (backoff < SIZE - 1) // Increase or stay saturated
196 ++backoff;
197 idx = 0; // Restart at top
198 }
199
200 else // Retry with a random non-top slot <= backoff
201 idx = 1 + random.nextInt(backoff + 1);
202
203 }
204 }
205
206 /**
207 * Returns a random delay less than (base times (2 raised to backoff))
208 */
209 private long randomDelay(int backoff) {
210 return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
211 }
212
213 /**
214 * Nodes hold partially exchanged data. This class
215 * opportunistically subclasses AtomicReference to represent the
216 * hole. So get() returns hole, and compareAndSet CAS'es value
217 * into hole. Note that this class cannot be parameterized as V
218 * because the sentinel value FAIL is only of type Object.
219 */
220 static final class Node extends AtomicReference<Object> {
221 /** The element offered by the Thread creating this node. */
222 final Object item;
223 /** The Thread creating this node. */
224 final Thread waiter;
225
226 /**
227 * Creates node with given item and empty hole.
228 * @param item the item.
229 */
230 Node(Object item) {
231 this.item = item;
232 waiter = Thread.currentThread();
233 }
234
235 /**
236 * Tries to fill in hole. On success, wakes up the waiter.
237 * @param val the value to place in hole.
238 * @return on success, the item; on failure, FAIL.
239 */
240 Object fillHole(Object val) {
241 if (compareAndSet(null, val)) {
242 LockSupport.unpark(waiter);
243 return item;
244 }
245 return FAIL;
246 }
247
248 /**
249 * Wait for and get the hole filled in by another thread.
250 * Fail if timed out or interrupted before hole filled.
251 * @param timed true if the wait is timed.
252 * @param nanos if timed, the maximum wait time.
253 * @return on success, the hole; on failure, FAIL.
254 */
255 Object waitForHole(boolean timed, long nanos) {
256 long lastTime = (timed)? System.nanoTime() : 0;
257 Object h = get();
258 while (h == null) {
259 if (!timed)
260 LockSupport.park();
261 else {
262 LockSupport.parkNanos(nanos);
263 long now = System.nanoTime();
264 nanos -= now - lastTime;
265 lastTime = now;
266 }
267 // If interrupted or timed out, try to cancel by
268 // CASing FAIL as hole value.
269 if ((h = get()) == null &&
270 (Thread.currentThread().isInterrupted() ||
271 (timed && nanos <= 0))) {
272 compareAndSet(null, FAIL);
273 h = get();
274 }
275 }
276 return h;
277 }
278 }
279
280 /**
281 * Waits for another thread to arrive at this exchange point (unless
282 * it is {@link Thread#interrupt interrupted}),
283 * and then transfers the given object to it, receiving its object
284 * in return.
285 * <p>If another thread is already waiting at the exchange point then
286 * it is resumed for thread scheduling purposes and receives the object
287 * passed in by the current thread. The current thread returns immediately,
288 * receiving the object passed to the exchange by that other thread.
289 * <p>If no other thread is already waiting at the exchange then the
290 * current thread is disabled for thread scheduling purposes and lies
291 * dormant until one of two things happens:
292 * <ul>
293 * <li>Some other thread enters the exchange; or
294 * <li>Some other thread {@link Thread#interrupt interrupts} the current
295 * thread.
296 * </ul>
297 * <p>If the current thread:
298 * <ul>
299 * <li>has its interrupted status set on entry to this method; or
300 * <li>is {@link Thread#interrupt interrupted} while waiting
301 * for the exchange,
302 * </ul>
303 * then {@link InterruptedException} is thrown and the current thread's
304 * interrupted status is cleared.
305 *
306 * @param x the object to exchange
307 * @return the object provided by the other thread.
308 * @throws InterruptedException if current thread was interrupted
309 * while waiting
310 */
311 public V exchange(V x) throws InterruptedException {
312 try {
313 return (V)doExchange(x, false, 0);
314 } catch (TimeoutException cannotHappen) {
315 throw new Error(cannotHappen);
316 }
317 }
318
319 /**
320 * Waits for another thread to arrive at this exchange point (unless
321 * it is {@link Thread#interrupt interrupted}, or the specified waiting
322 * time elapses),
323 * and then transfers the given object to it, receiving its object
324 * in return.
325 *
326 * <p>If another thread is already waiting at the exchange point then
327 * it is resumed for thread scheduling purposes and receives the object
328 * passed in by the current thread. The current thread returns immediately,
329 * receiving the object passed to the exchange by that other thread.
330 *
331 * <p>If no other thread is already waiting at the exchange then the
332 * current thread is disabled for thread scheduling purposes and lies
333 * dormant until one of three things happens:
334 * <ul>
335 * <li>Some other thread enters the exchange; or
336 * <li>Some other thread {@link Thread#interrupt interrupts} the current
337 * thread; or
338 * <li>The specified waiting time elapses.
339 * </ul>
340 * <p>If the current thread:
341 * <ul>
342 * <li>has its interrupted status set on entry to this method; or
343 * <li>is {@link Thread#interrupt interrupted} while waiting
344 * for the exchange,
345 * </ul>
346 * then {@link InterruptedException} is thrown and the current thread's
347 * interrupted status is cleared.
348 *
349 * <p>If the specified waiting time elapses then {@link TimeoutException}
350 * is thrown.
351 * If the time is
352 * less than or equal to zero, the method will not wait at all.
353 *
354 * @param x the object to exchange
355 * @param timeout the maximum time to wait
356 * @param unit the time unit of the <tt>timeout</tt> argument.
357 * @return the object provided by the other thread.
358 * @throws InterruptedException if current thread was interrupted
359 * while waiting
360 * @throws TimeoutException if the specified waiting time elapses before
361 * another thread enters the exchange.
362 */
363 public V exchange(V x, long timeout, TimeUnit unit)
364 throws InterruptedException, TimeoutException {
365 return (V)doExchange(x, true, unit.toNanos(timeout));
366 }
367 }