ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.31
Committed: Sun Nov 6 21:34:52 2005 UTC (18 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.30: +11 -11 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 dl 1.14 * http://creativecommons.org/licenses/publicdomain
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.21 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 dl 1.4 import java.util.concurrent.locks.*;
11 dl 1.16 import java.util.concurrent.atomic.*;
12     import java.util.Random;
13 tim 1.1
14     /**
15 dl 1.28 * A synchronization point at which threads can pair and swap elements
16     * within pairs. Each thread presents some object on entry to the
17     * {@link #exchange exchange} method, matches with a partner thread,
18     * and receives its partner's object on return.
19 tim 1.1 *
20     * <p><b>Sample Usage:</b>
21 jsr166 1.29 * Here are the highlights of a class that uses an {@code Exchanger}
22     * to swap buffers between threads so that the thread filling the
23     * buffer gets a freshly emptied one when it needs it, handing off the
24     * filled one to the thread emptying the buffer.
25     * <pre>{@code
26 tim 1.1 * class FillAndEmpty {
27 jsr166 1.29 * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
28 dl 1.9 * DataBuffer initialEmptyBuffer = ... a made-up type
29     * DataBuffer initialFullBuffer = ...
30 tim 1.1 *
31     * class FillingLoop implements Runnable {
32     * public void run() {
33 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
34 tim 1.1 * try {
35     * while (currentBuffer != null) {
36     * addToBuffer(currentBuffer);
37 dl 1.30 * if (currentBuffer.isFull())
38 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
39     * }
40 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
41 tim 1.1 * }
42     * }
43     *
44     * class EmptyingLoop implements Runnable {
45     * public void run() {
46 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
47 tim 1.1 * try {
48     * while (currentBuffer != null) {
49     * takeFromBuffer(currentBuffer);
50 dl 1.30 * if (currentBuffer.isEmpty())
51 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
52     * }
53 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
54 tim 1.1 * }
55     * }
56     *
57     * void start() {
58     * new Thread(new FillingLoop()).start();
59     * new Thread(new EmptyingLoop()).start();
60     * }
61     * }
62 jsr166 1.29 * }</pre>
63 tim 1.1 *
64 jsr166 1.27 * <p>Memory consistency effects: For each pair of threads that
65     * successfully exchange objects via an {@code Exchanger}, actions
66     * prior to the {@code exchange()} in each thread
67     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
68     * those subsequent to a return from the corresponding {@code exchange()}
69     * in the other thread.
70 brian 1.22 *
71 tim 1.1 * @since 1.5
72 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
73 dl 1.11 * @param <V> The type of objects that may be exchanged
74 tim 1.1 */
75     public class Exchanger<V> {
76 dl 1.16 /*
77     * The underlying idea is to use a stack to hold nodes containing
78     * pairs of items to be exchanged. Except that:
79     *
80     * * Only one element of the pair is known on creation by a
81     * first-arriving thread; the other is a "hole" waiting to be
82     * filled in. This is a degenerate form of the dual stacks
83     * described in "Nonblocking Concurrent Objects with Condition
84     * Synchronization", by W. N. Scherer III and M. L. Scott.
85     * 18th Annual Conf. on Distributed Computing, Oct. 2004.
86     * It is "degenerate" in that both the items and the holes
87     * are shared in the same nodes.
88     *
89     * * There isn't really a stack here! There can't be -- if two
90     * nodes were both in the stack, they should cancel themselves
91     * out by combining. So that's what we do. The 0th element of
92     * the "arena" array serves only as the top of stack. The
93     * remainder of the array is a form of the elimination backoff
94     * collision array described in "A Scalable Lock-free Stack
95     * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
96     * 16th ACM Symposium on Parallelism in Algorithms and
97     * Architectures, June 2004. Here, threads spin (using short
98     * timed waits with exponential backoff) looking for each
99     * other. If they fail to find others waiting, they try the
100     * top spot again. As shown in that paper, this always
101     * converges.
102     *
103     * The backoff elimination mechanics never come into play in
104     * common usages where only two threads ever meet to exchange
105     * items, but they prevent contention bottlenecks when an
106     * exchanger is used by a large number of threads.
107 dl 1.30 *
108     * For more details, see the paper "A Scalable Elimination-based
109     * Exchange Channel" by William Scherer, Doug Lea, and Michael
110     * Scott in Proceedings of SCOOL05 workshop. Available at:
111     * http://hdl.handle.net/1802/2104
112 dl 1.16 */
113 dl 1.2
114 jsr166 1.17 /**
115 dl 1.16 * Size of collision space. Using a size of half the number of
116     * CPUs provides enough space for threads to find each other but
117     * not so much that it would always require one or more to time
118     * out to become unstuck. Note that the arena array holds SIZE+1
119     * elements, to include the top-of-stack slot.
120     */
121 jsr166 1.17 private static final int SIZE =
122 dl 1.16 (Runtime.getRuntime().availableProcessors() + 1) / 2;
123 jsr166 1.15
124 dl 1.2 /**
125 dl 1.16 * Base unit in nanoseconds for backoffs. Must be a power of two.
126     * Should be small because backoffs exponentially increase from
127     * base.
128 dl 1.2 */
129 dl 1.16 private static final long BACKOFF_BASE = 128L;
130 dl 1.2
131 jsr166 1.17 /**
132 dl 1.16 * Sentinel item representing cancellation. This value is placed
133     * in holes on cancellation, and used as a return value from Node
134     * methods to indicate failure to set or get hole.
135 jsr166 1.17 */
136 dl 1.16 static final Object FAIL = new Object();
137    
138 jsr166 1.17 /**
139 dl 1.16 * The collision arena. arena[0] is used as the top of the stack.
140     * The remainder is used as the collision elimination space.
141 dl 1.3 */
142 dl 1.30 private final AtomicReference<Node>[] arena;
143 dl 1.5
144 dl 1.16 /** Generator for random backoffs and delays. */
145     private final Random random = new Random();
146 dl 1.5
147 dl 1.16 /**
148     * Creates a new Exchanger.
149     */
150     public Exchanger() {
151 dl 1.30 arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
152 dl 1.16 for (int i = 0; i < arena.length; ++i)
153 dl 1.30 arena[i] = new AtomicReference<Node>();
154 dl 1.16 }
155 dl 1.2
156 dl 1.16 /**
157     * Main exchange function, handling the different policy variants.
158     * Uses Object, not "V" as argument and return value to simplify
159     * handling of internal sentinel values. Callers from public
160     * methods cast accordingly.
161 dl 1.30 *
162     * @param item the item to exchange
163     * @param timed true if the wait is timed
164     * @param nanos if timed, the maximum wait time
165     * @return the other thread's item
166 dl 1.16 */
167     private Object doExchange(Object item, boolean timed, long nanos)
168     throws InterruptedException, TimeoutException {
169     Node me = new Node(item);
170 dl 1.30 long lastTime = timed ? System.nanoTime() : 0;
171 dl 1.16 int idx = 0; // start out at slot representing top
172     int backoff = 0; // increases on failure to occupy a slot
173    
174     for (;;) {
175 dl 1.30 AtomicReference<Node> slot = arena[idx];
176 dl 1.16
177     // If this slot is already occupied, there is a waiting item...
178     Node you = slot.get();
179     if (you != null) {
180     Object v = you.fillHole(item);
181     slot.compareAndSet(you, null);
182     if (v != FAIL) // ... unless it was cancelled
183     return v;
184 dl 1.2 }
185    
186 dl 1.16 // Try to occupy this slot
187     if (slot.compareAndSet(null, me)) {
188     // If this is top slot, use regular wait, else backoff-wait
189     Object v = ((idx == 0)?
190     me.waitForHole(timed, nanos) :
191     me.waitForHole(true, randomDelay(backoff)));
192     slot.compareAndSet(me, null);
193     if (v != FAIL)
194     return v;
195     if (Thread.interrupted())
196     throw new InterruptedException();
197     if (timed) {
198     long now = System.nanoTime();
199     nanos -= now - lastTime;
200     lastTime = now;
201     if (nanos <= 0)
202     throw new TimeoutException();
203     }
204 dl 1.2
205 dl 1.16 me = new Node(item); // Throw away nodes on failure
206     if (backoff < SIZE - 1) // Increase or stay saturated
207     ++backoff;
208     idx = 0; // Restart at top
209 dl 1.2 }
210    
211 dl 1.16 else // Retry with a random non-top slot <= backoff
212     idx = 1 + random.nextInt(backoff + 1);
213 dl 1.2
214     }
215     }
216 tim 1.1
217     /**
218 jsr166 1.31 * Returns a random delay less than (base times (2 raised to backoff)).
219 dl 1.16 */
220     private long randomDelay(int backoff) {
221     return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
222     }
223    
224     /**
225     * Nodes hold partially exchanged data. This class
226     * opportunistically subclasses AtomicReference to represent the
227     * hole. So get() returns hole, and compareAndSet CAS'es value
228     * into hole. Note that this class cannot be parameterized as V
229     * because the sentinel value FAIL is only of type Object.
230 jsr166 1.15 */
231 dl 1.16 static final class Node extends AtomicReference<Object> {
232 dl 1.20 private static final long serialVersionUID = -3221313401284163686L;
233 jsr166 1.21
234 dl 1.16 /** The element offered by the Thread creating this node. */
235     final Object item;
236 jsr166 1.31
237 dl 1.16 /** The Thread creating this node. */
238     final Thread waiter;
239    
240     /**
241     * Creates node with given item and empty hole.
242 jsr166 1.31 *
243     * @param item the item
244 dl 1.16 */
245     Node(Object item) {
246     this.item = item;
247     waiter = Thread.currentThread();
248     }
249    
250     /**
251     * Tries to fill in hole. On success, wakes up the waiter.
252 jsr166 1.31 *
253     * @param val the value to place in hole
254     * @return on success, the item; on failure, FAIL
255 dl 1.16 */
256     Object fillHole(Object val) {
257     if (compareAndSet(null, val)) {
258     LockSupport.unpark(waiter);
259     return item;
260     }
261     return FAIL;
262     }
263    
264     /**
265 jsr166 1.17 * Waits for and gets the hole filled in by another thread.
266     * Fails if timed out or interrupted before hole filled.
267 dl 1.30 *
268     * @param timed true if the wait is timed
269     * @param nanos if timed, the maximum wait time
270     * @return on success, the hole; on failure, FAIL
271 dl 1.16 */
272     Object waitForHole(boolean timed, long nanos) {
273 dl 1.30 long lastTime = timed ? System.nanoTime() : 0;
274 dl 1.18 Object h;
275     while ((h = get()) == null) {
276     // If interrupted or timed out, try to cancel by
277     // CASing FAIL as hole value.
278     if (Thread.currentThread().isInterrupted() ||
279 dl 1.30 (timed && nanos <= 0)) {
280     if (compareAndSet(null, FAIL))
281     return FAIL;
282     }
283 dl 1.18 else if (!timed)
284 dl 1.16 LockSupport.park();
285     else {
286     LockSupport.parkNanos(nanos);
287     long now = System.nanoTime();
288     nanos -= now - lastTime;
289     lastTime = now;
290     }
291     }
292     return h;
293     }
294 tim 1.1 }
295    
296     /**
297     * Waits for another thread to arrive at this exchange point (unless
298 jsr166 1.31 * the current thread is {@link Thread#interrupt interrupted}),
299 tim 1.1 * and then transfers the given object to it, receiving its object
300     * in return.
301 jsr166 1.17 *
302 tim 1.1 * <p>If another thread is already waiting at the exchange point then
303     * it is resumed for thread scheduling purposes and receives the object
304     * passed in by the current thread. The current thread returns immediately,
305     * receiving the object passed to the exchange by that other thread.
306 jsr166 1.17 *
307 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
308 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
309     * dormant until one of two things happens:
310     * <ul>
311     * <li>Some other thread enters the exchange; or
312     * <li>Some other thread {@link Thread#interrupt interrupts} the current
313     * thread.
314     * </ul>
315     * <p>If the current thread:
316     * <ul>
317 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
318 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
319 jsr166 1.15 * for the exchange,
320 tim 1.1 * </ul>
321 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
322     * interrupted status is cleared.
323 tim 1.1 *
324     * @param x the object to exchange
325 dl 1.30 * @return the object provided by the other thread
326     * @throws InterruptedException if the current thread was
327     * interrupted while waiting
328 jsr166 1.15 */
329 tim 1.1 public V exchange(V x) throws InterruptedException {
330 dl 1.2 try {
331 dl 1.16 return (V)doExchange(x, false, 0);
332 jsr166 1.15 } catch (TimeoutException cannotHappen) {
333 dl 1.2 throw new Error(cannotHappen);
334     }
335 tim 1.1 }
336    
337     /**
338     * Waits for another thread to arrive at this exchange point (unless
339 jsr166 1.31 * the current thread is {@link Thread#interrupt interrupted} or
340     * the specified waiting time elapses), and then transfers the given
341     * object to it, receiving its object in return.
342 tim 1.1 *
343     * <p>If another thread is already waiting at the exchange point then
344     * it is resumed for thread scheduling purposes and receives the object
345     * passed in by the current thread. The current thread returns immediately,
346     * receiving the object passed to the exchange by that other thread.
347     *
348 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
349 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
350     * dormant until one of three things happens:
351     * <ul>
352     * <li>Some other thread enters the exchange; or
353     * <li>Some other thread {@link Thread#interrupt interrupts} the current
354     * thread; or
355     * <li>The specified waiting time elapses.
356     * </ul>
357     * <p>If the current thread:
358     * <ul>
359 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
360 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
361 jsr166 1.15 * for the exchange,
362 tim 1.1 * </ul>
363 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
364     * interrupted status is cleared.
365 tim 1.1 *
366     * <p>If the specified waiting time elapses then {@link TimeoutException}
367     * is thrown.
368 jsr166 1.15 * If the time is
369 tim 1.1 * less than or equal to zero, the method will not wait at all.
370     *
371     * @param x the object to exchange
372     * @param timeout the maximum time to wait
373 dl 1.30 * @param unit the time unit of the <tt>timeout</tt> argument
374     * @return the object provided by the other thread
375     * @throws InterruptedException if the current thread was
376     * interrupted while waiting
377     * @throws TimeoutException if the specified waiting time elapses
378     * before another thread enters the exchange
379 jsr166 1.15 */
380     public V exchange(V x, long timeout, TimeUnit unit)
381 tim 1.1 throws InterruptedException, TimeoutException {
382 dl 1.16 return (V)doExchange(x, true, unit.toNanos(timeout));
383 tim 1.1 }
384     }