ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.30
Committed: Sun Nov 6 15:30:24 2005 UTC (18 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.29: +35 -26 lines
Log Message:
Incorporate review suggestions

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 dl 1.14 * http://creativecommons.org/licenses/publicdomain
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.21 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 dl 1.4 import java.util.concurrent.locks.*;
11 dl 1.16 import java.util.concurrent.atomic.*;
12     import java.util.Random;
13 tim 1.1
14     /**
15 dl 1.28 * A synchronization point at which threads can pair and swap elements
16     * within pairs. Each thread presents some object on entry to the
17     * {@link #exchange exchange} method, matches with a partner thread,
18     * and receives its partner's object on return.
19 tim 1.1 *
20     * <p><b>Sample Usage:</b>
21 jsr166 1.29 * Here are the highlights of a class that uses an {@code Exchanger}
22     * to swap buffers between threads so that the thread filling the
23     * buffer gets a freshly emptied one when it needs it, handing off the
24     * filled one to the thread emptying the buffer.
25     * <pre>{@code
26 tim 1.1 * class FillAndEmpty {
27 jsr166 1.29 * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
28 dl 1.9 * DataBuffer initialEmptyBuffer = ... a made-up type
29     * DataBuffer initialFullBuffer = ...
30 tim 1.1 *
31     * class FillingLoop implements Runnable {
32     * public void run() {
33 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
34 tim 1.1 * try {
35     * while (currentBuffer != null) {
36     * addToBuffer(currentBuffer);
37 dl 1.30 * if (currentBuffer.isFull())
38 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
39     * }
40 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
41 tim 1.1 * }
42     * }
43     *
44     * class EmptyingLoop implements Runnable {
45     * public void run() {
46 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
47 tim 1.1 * try {
48     * while (currentBuffer != null) {
49     * takeFromBuffer(currentBuffer);
50 dl 1.30 * if (currentBuffer.isEmpty())
51 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
52     * }
53 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
54 tim 1.1 * }
55     * }
56     *
57     * void start() {
58     * new Thread(new FillingLoop()).start();
59     * new Thread(new EmptyingLoop()).start();
60     * }
61     * }
62 jsr166 1.29 * }</pre>
63 tim 1.1 *
64 jsr166 1.27 * <p>Memory consistency effects: For each pair of threads that
65     * successfully exchange objects via an {@code Exchanger}, actions
66     * prior to the {@code exchange()} in each thread
67     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
68     * those subsequent to a return from the corresponding {@code exchange()}
69     * in the other thread.
70 brian 1.22 *
71 tim 1.1 * @since 1.5
72 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
73 dl 1.11 * @param <V> The type of objects that may be exchanged
74 tim 1.1 */
75     public class Exchanger<V> {
76 dl 1.16 /*
77     * The underlying idea is to use a stack to hold nodes containing
78     * pairs of items to be exchanged. Except that:
79     *
80     * * Only one element of the pair is known on creation by a
81     * first-arriving thread; the other is a "hole" waiting to be
82     * filled in. This is a degenerate form of the dual stacks
83     * described in "Nonblocking Concurrent Objects with Condition
84     * Synchronization", by W. N. Scherer III and M. L. Scott.
85     * 18th Annual Conf. on Distributed Computing, Oct. 2004.
86     * It is "degenerate" in that both the items and the holes
87     * are shared in the same nodes.
88     *
89     * * There isn't really a stack here! There can't be -- if two
90     * nodes were both in the stack, they should cancel themselves
91     * out by combining. So that's what we do. The 0th element of
92     * the "arena" array serves only as the top of stack. The
93     * remainder of the array is a form of the elimination backoff
94     * collision array described in "A Scalable Lock-free Stack
95     * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
96     * 16th ACM Symposium on Parallelism in Algorithms and
97     * Architectures, June 2004. Here, threads spin (using short
98     * timed waits with exponential backoff) looking for each
99     * other. If they fail to find others waiting, they try the
100     * top spot again. As shown in that paper, this always
101     * converges.
102     *
103     * The backoff elimination mechanics never come into play in
104     * common usages where only two threads ever meet to exchange
105     * items, but they prevent contention bottlenecks when an
106     * exchanger is used by a large number of threads.
107 dl 1.30 *
108     * For more details, see the paper "A Scalable Elimination-based
109     * Exchange Channel" by William Scherer, Doug Lea, and Michael
110     * Scott in Proceedings of SCOOL05 workshop. Available at:
111     * http://hdl.handle.net/1802/2104
112 dl 1.16 */
113 dl 1.2
114 jsr166 1.17 /**
115 dl 1.16 * Size of collision space. Using a size of half the number of
116     * CPUs provides enough space for threads to find each other but
117     * not so much that it would always require one or more to time
118     * out to become unstuck. Note that the arena array holds SIZE+1
119     * elements, to include the top-of-stack slot.
120     */
121 jsr166 1.17 private static final int SIZE =
122 dl 1.16 (Runtime.getRuntime().availableProcessors() + 1) / 2;
123 jsr166 1.15
124 dl 1.2 /**
125 dl 1.16 * Base unit in nanoseconds for backoffs. Must be a power of two.
126     * Should be small because backoffs exponentially increase from
127     * base.
128 dl 1.2 */
129 dl 1.16 private static final long BACKOFF_BASE = 128L;
130 dl 1.2
131 jsr166 1.17 /**
132 dl 1.16 * Sentinel item representing cancellation. This value is placed
133     * in holes on cancellation, and used as a return value from Node
134     * methods to indicate failure to set or get hole.
135 jsr166 1.17 */
136 dl 1.16 static final Object FAIL = new Object();
137    
138 jsr166 1.17 /**
139 dl 1.16 * The collision arena. arena[0] is used as the top of the stack.
140     * The remainder is used as the collision elimination space.
141 jsr166 1.17 * Each slot holds an AtomicReference<Node>, but this cannot be
142 dl 1.16 * expressed for arrays, so elements are casted on each use.
143 dl 1.3 */
144 dl 1.30 private final AtomicReference<Node>[] arena;
145 dl 1.5
146 dl 1.16 /** Generator for random backoffs and delays. */
147     private final Random random = new Random();
148 dl 1.5
149 dl 1.16 /**
150     * Creates a new Exchanger.
151     */
152     public Exchanger() {
153 dl 1.30 arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
154 dl 1.16 for (int i = 0; i < arena.length; ++i)
155 dl 1.30 arena[i] = new AtomicReference<Node>();
156 dl 1.16 }
157 dl 1.2
158 dl 1.16 /**
159     * Main exchange function, handling the different policy variants.
160     * Uses Object, not "V" as argument and return value to simplify
161     * handling of internal sentinel values. Callers from public
162     * methods cast accordingly.
163 dl 1.30 *
164     * @param item the item to exchange
165     * @param timed true if the wait is timed
166     * @param nanos if timed, the maximum wait time
167     * @return the other thread's item
168 dl 1.16 */
169     private Object doExchange(Object item, boolean timed, long nanos)
170     throws InterruptedException, TimeoutException {
171     Node me = new Node(item);
172 dl 1.30 long lastTime = timed ? System.nanoTime() : 0;
173 dl 1.16 int idx = 0; // start out at slot representing top
174     int backoff = 0; // increases on failure to occupy a slot
175    
176     for (;;) {
177 dl 1.30 AtomicReference<Node> slot = arena[idx];
178 dl 1.16
179     // If this slot is already occupied, there is a waiting item...
180     Node you = slot.get();
181     if (you != null) {
182     Object v = you.fillHole(item);
183     slot.compareAndSet(you, null);
184     if (v != FAIL) // ... unless it was cancelled
185     return v;
186 dl 1.2 }
187    
188 dl 1.16 // Try to occupy this slot
189     if (slot.compareAndSet(null, me)) {
190     // If this is top slot, use regular wait, else backoff-wait
191     Object v = ((idx == 0)?
192     me.waitForHole(timed, nanos) :
193     me.waitForHole(true, randomDelay(backoff)));
194     slot.compareAndSet(me, null);
195     if (v != FAIL)
196     return v;
197     if (Thread.interrupted())
198     throw new InterruptedException();
199     if (timed) {
200     long now = System.nanoTime();
201     nanos -= now - lastTime;
202     lastTime = now;
203     if (nanos <= 0)
204     throw new TimeoutException();
205     }
206 dl 1.2
207 dl 1.16 me = new Node(item); // Throw away nodes on failure
208     if (backoff < SIZE - 1) // Increase or stay saturated
209     ++backoff;
210     idx = 0; // Restart at top
211 dl 1.2 }
212    
213 dl 1.16 else // Retry with a random non-top slot <= backoff
214     idx = 1 + random.nextInt(backoff + 1);
215 dl 1.2
216     }
217     }
218 tim 1.1
219     /**
220 dl 1.16 * Returns a random delay less than (base times (2 raised to backoff))
221     */
222     private long randomDelay(int backoff) {
223     return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
224     }
225    
226     /**
227     * Nodes hold partially exchanged data. This class
228     * opportunistically subclasses AtomicReference to represent the
229     * hole. So get() returns hole, and compareAndSet CAS'es value
230     * into hole. Note that this class cannot be parameterized as V
231     * because the sentinel value FAIL is only of type Object.
232 jsr166 1.15 */
233 dl 1.16 static final class Node extends AtomicReference<Object> {
234 dl 1.20 private static final long serialVersionUID = -3221313401284163686L;
235 jsr166 1.21
236 dl 1.16 /** The element offered by the Thread creating this node. */
237     final Object item;
238     /** The Thread creating this node. */
239     final Thread waiter;
240    
241     /**
242     * Creates node with given item and empty hole.
243     * @param item the item.
244     */
245     Node(Object item) {
246     this.item = item;
247     waiter = Thread.currentThread();
248     }
249    
250     /**
251     * Tries to fill in hole. On success, wakes up the waiter.
252     * @param val the value to place in hole.
253     * @return on success, the item; on failure, FAIL.
254     */
255     Object fillHole(Object val) {
256     if (compareAndSet(null, val)) {
257     LockSupport.unpark(waiter);
258     return item;
259     }
260     return FAIL;
261     }
262    
263     /**
264 jsr166 1.17 * Waits for and gets the hole filled in by another thread.
265     * Fails if timed out or interrupted before hole filled.
266 dl 1.30 *
267     * @param timed true if the wait is timed
268     * @param nanos if timed, the maximum wait time
269     * @return on success, the hole; on failure, FAIL
270 dl 1.16 */
271     Object waitForHole(boolean timed, long nanos) {
272 dl 1.30 long lastTime = timed ? System.nanoTime() : 0;
273 dl 1.18 Object h;
274     while ((h = get()) == null) {
275     // If interrupted or timed out, try to cancel by
276     // CASing FAIL as hole value.
277     if (Thread.currentThread().isInterrupted() ||
278 dl 1.30 (timed && nanos <= 0)) {
279     if (compareAndSet(null, FAIL))
280     return FAIL;
281     }
282 dl 1.18 else if (!timed)
283 dl 1.16 LockSupport.park();
284     else {
285     LockSupport.parkNanos(nanos);
286     long now = System.nanoTime();
287     nanos -= now - lastTime;
288     lastTime = now;
289     }
290     }
291     return h;
292     }
293 tim 1.1 }
294    
295     /**
296     * Waits for another thread to arrive at this exchange point (unless
297     * it is {@link Thread#interrupt interrupted}),
298     * and then transfers the given object to it, receiving its object
299     * in return.
300 jsr166 1.17 *
301 tim 1.1 * <p>If another thread is already waiting at the exchange point then
302     * it is resumed for thread scheduling purposes and receives the object
303     * passed in by the current thread. The current thread returns immediately,
304     * receiving the object passed to the exchange by that other thread.
305 jsr166 1.17 *
306 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
307 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
308     * dormant until one of two things happens:
309     * <ul>
310     * <li>Some other thread enters the exchange; or
311     * <li>Some other thread {@link Thread#interrupt interrupts} the current
312     * thread.
313     * </ul>
314     * <p>If the current thread:
315     * <ul>
316 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
317 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
318 jsr166 1.15 * for the exchange,
319 tim 1.1 * </ul>
320 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
321     * interrupted status is cleared.
322 tim 1.1 *
323     * @param x the object to exchange
324 dl 1.30 * @return the object provided by the other thread
325     * @throws InterruptedException if the current thread was
326     * interrupted while waiting
327 jsr166 1.15 */
328 tim 1.1 public V exchange(V x) throws InterruptedException {
329 dl 1.2 try {
330 dl 1.16 return (V)doExchange(x, false, 0);
331 jsr166 1.15 } catch (TimeoutException cannotHappen) {
332 dl 1.2 throw new Error(cannotHappen);
333     }
334 tim 1.1 }
335    
336     /**
337     * Waits for another thread to arrive at this exchange point (unless
338     * it is {@link Thread#interrupt interrupted}, or the specified waiting
339     * time elapses),
340     * and then transfers the given object to it, receiving its object
341     * in return.
342     *
343     * <p>If another thread is already waiting at the exchange point then
344     * it is resumed for thread scheduling purposes and receives the object
345     * passed in by the current thread. The current thread returns immediately,
346     * receiving the object passed to the exchange by that other thread.
347     *
348 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
349 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
350     * dormant until one of three things happens:
351     * <ul>
352     * <li>Some other thread enters the exchange; or
353     * <li>Some other thread {@link Thread#interrupt interrupts} the current
354     * thread; or
355     * <li>The specified waiting time elapses.
356     * </ul>
357     * <p>If the current thread:
358     * <ul>
359 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
360 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
361 jsr166 1.15 * for the exchange,
362 tim 1.1 * </ul>
363 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
364     * interrupted status is cleared.
365 tim 1.1 *
366     * <p>If the specified waiting time elapses then {@link TimeoutException}
367     * is thrown.
368 jsr166 1.15 * If the time is
369 tim 1.1 * less than or equal to zero, the method will not wait at all.
370     *
371     * @param x the object to exchange
372     * @param timeout the maximum time to wait
373 dl 1.30 * @param unit the time unit of the <tt>timeout</tt> argument
374     * @return the object provided by the other thread
375     * @throws InterruptedException if the current thread was
376     * interrupted while waiting
377     * @throws TimeoutException if the specified waiting time elapses
378     * before another thread enters the exchange
379 jsr166 1.15 */
380     public V exchange(V x, long timeout, TimeUnit unit)
381 tim 1.1 throws InterruptedException, TimeoutException {
382 dl 1.16 return (V)doExchange(x, true, unit.toNanos(timeout));
383 tim 1.1 }
384     }