ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.23
Committed: Fri Sep 2 03:03:17 2005 UTC (18 years, 9 months ago) by brian
Branch: MAIN
Changes since 1.22: +7 -4 lines
Log Message:
More tweaks to HB markup

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 dl 1.14 * http://creativecommons.org/licenses/publicdomain
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.21 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
10 dl 1.4 import java.util.concurrent.locks.*;
11 dl 1.16 import java.util.concurrent.atomic.*;
12     import java.util.Random;
13 tim 1.1
14     /**
15 dl 1.12 * A synchronization point at which two threads can exchange objects.
16     * Each thread presents some object on entry to the {@link #exchange
17     * exchange} method, and receives the object presented by the other
18     * thread on return.
19 tim 1.1 *
20     * <p><b>Sample Usage:</b>
21     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
22     * swap buffers between threads so that the thread filling the
23     * buffer gets a freshly
24     * emptied one when it needs it, handing off the filled one to
25     * the thread emptying the buffer.
26     * <pre>
27     * class FillAndEmpty {
28 dl 1.9 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
29     * DataBuffer initialEmptyBuffer = ... a made-up type
30     * DataBuffer initialFullBuffer = ...
31 tim 1.1 *
32     * class FillingLoop implements Runnable {
33     * public void run() {
34 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
35 tim 1.1 * try {
36     * while (currentBuffer != null) {
37     * addToBuffer(currentBuffer);
38     * if (currentBuffer.full())
39     * currentBuffer = exchanger.exchange(currentBuffer);
40     * }
41 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
42 tim 1.1 * }
43     * }
44     *
45     * class EmptyingLoop implements Runnable {
46     * public void run() {
47 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
48 tim 1.1 * try {
49     * while (currentBuffer != null) {
50     * takeFromBuffer(currentBuffer);
51     * if (currentBuffer.empty())
52     * currentBuffer = exchanger.exchange(currentBuffer);
53     * }
54 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
55 tim 1.1 * }
56     * }
57     *
58     * void start() {
59     * new Thread(new FillingLoop()).start();
60     * new Thread(new EmptyingLoop()).start();
61     * }
62     * }
63     * </pre>
64     *
65 brian 1.23
66     * <p> Memory visibility effects: For each pair of threads that
67     * successfully exchange objects via an <tt>Exchanger</tt>,
68     * actions prior to to the <tt>exchange()</tt>
69     * in each thread <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
70     * those subsequent to the corresponding <tt>exchange()</tt> in the other
71     * thread.
72 brian 1.22 *
73 tim 1.1 * @since 1.5
74 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
75 dl 1.11 * @param <V> The type of objects that may be exchanged
76 tim 1.1 */
77     public class Exchanger<V> {
78 dl 1.16 /*
79     * The underlying idea is to use a stack to hold nodes containing
80     * pairs of items to be exchanged. Except that:
81     *
82     * * Only one element of the pair is known on creation by a
83     * first-arriving thread; the other is a "hole" waiting to be
84     * filled in. This is a degenerate form of the dual stacks
85     * described in "Nonblocking Concurrent Objects with Condition
86     * Synchronization", by W. N. Scherer III and M. L. Scott.
87     * 18th Annual Conf. on Distributed Computing, Oct. 2004.
88     * It is "degenerate" in that both the items and the holes
89     * are shared in the same nodes.
90     *
91     * * There isn't really a stack here! There can't be -- if two
92     * nodes were both in the stack, they should cancel themselves
93     * out by combining. So that's what we do. The 0th element of
94     * the "arena" array serves only as the top of stack. The
95     * remainder of the array is a form of the elimination backoff
96     * collision array described in "A Scalable Lock-free Stack
97     * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
98     * 16th ACM Symposium on Parallelism in Algorithms and
99     * Architectures, June 2004. Here, threads spin (using short
100     * timed waits with exponential backoff) looking for each
101     * other. If they fail to find others waiting, they try the
102     * top spot again. As shown in that paper, this always
103     * converges.
104     *
105     * The backoff elimination mechanics never come into play in
106     * common usages where only two threads ever meet to exchange
107     * items, but they prevent contention bottlenecks when an
108     * exchanger is used by a large number of threads.
109     */
110 dl 1.2
111 jsr166 1.17 /**
112 dl 1.16 * Size of collision space. Using a size of half the number of
113     * CPUs provides enough space for threads to find each other but
114     * not so much that it would always require one or more to time
115     * out to become unstuck. Note that the arena array holds SIZE+1
116     * elements, to include the top-of-stack slot.
117     */
118 jsr166 1.17 private static final int SIZE =
119 dl 1.16 (Runtime.getRuntime().availableProcessors() + 1) / 2;
120 jsr166 1.15
121 dl 1.2 /**
122 dl 1.16 * Base unit in nanoseconds for backoffs. Must be a power of two.
123     * Should be small because backoffs exponentially increase from
124     * base.
125 dl 1.2 */
126 dl 1.16 private static final long BACKOFF_BASE = 128L;
127 dl 1.2
128 jsr166 1.17 /**
129 dl 1.16 * Sentinel item representing cancellation. This value is placed
130     * in holes on cancellation, and used as a return value from Node
131     * methods to indicate failure to set or get hole.
132 jsr166 1.17 */
133 dl 1.16 static final Object FAIL = new Object();
134    
135 jsr166 1.17 /**
136 dl 1.16 * The collision arena. arena[0] is used as the top of the stack.
137     * The remainder is used as the collision elimination space.
138 jsr166 1.17 * Each slot holds an AtomicReference<Node>, but this cannot be
139 dl 1.16 * expressed for arrays, so elements are casted on each use.
140 dl 1.3 */
141 dl 1.16 private final AtomicReference[] arena;
142 dl 1.5
143 dl 1.16 /** Generator for random backoffs and delays. */
144     private final Random random = new Random();
145 dl 1.5
146 dl 1.16 /**
147     * Creates a new Exchanger.
148     */
149     public Exchanger() {
150     arena = new AtomicReference[SIZE + 1];
151     for (int i = 0; i < arena.length; ++i)
152     arena[i] = new AtomicReference();
153     }
154 dl 1.2
155 dl 1.16 /**
156     * Main exchange function, handling the different policy variants.
157     * Uses Object, not "V" as argument and return value to simplify
158     * handling of internal sentinel values. Callers from public
159     * methods cast accordingly.
160     * @param item the item to exchange.
161     * @param timed true if the wait is timed.
162     * @param nanos if timed, the maximum wait time.
163     * @return the other thread's item.
164     */
165     private Object doExchange(Object item, boolean timed, long nanos)
166     throws InterruptedException, TimeoutException {
167     Node me = new Node(item);
168     long lastTime = (timed)? System.nanoTime() : 0;
169     int idx = 0; // start out at slot representing top
170     int backoff = 0; // increases on failure to occupy a slot
171    
172     for (;;) {
173     AtomicReference<Node> slot = (AtomicReference<Node>)arena[idx];
174    
175     // If this slot is already occupied, there is a waiting item...
176     Node you = slot.get();
177     if (you != null) {
178     Object v = you.fillHole(item);
179     slot.compareAndSet(you, null);
180     if (v != FAIL) // ... unless it was cancelled
181     return v;
182 dl 1.2 }
183    
184 dl 1.16 // Try to occupy this slot
185     if (slot.compareAndSet(null, me)) {
186     // If this is top slot, use regular wait, else backoff-wait
187     Object v = ((idx == 0)?
188     me.waitForHole(timed, nanos) :
189     me.waitForHole(true, randomDelay(backoff)));
190     slot.compareAndSet(me, null);
191     if (v != FAIL)
192     return v;
193     if (Thread.interrupted())
194     throw new InterruptedException();
195     if (timed) {
196     long now = System.nanoTime();
197     nanos -= now - lastTime;
198     lastTime = now;
199     if (nanos <= 0)
200     throw new TimeoutException();
201     }
202 dl 1.2
203 dl 1.16 me = new Node(item); // Throw away nodes on failure
204     if (backoff < SIZE - 1) // Increase or stay saturated
205     ++backoff;
206     idx = 0; // Restart at top
207 dl 1.2 }
208    
209 dl 1.16 else // Retry with a random non-top slot <= backoff
210     idx = 1 + random.nextInt(backoff + 1);
211 dl 1.2
212     }
213     }
214 tim 1.1
215     /**
216 dl 1.16 * Returns a random delay less than (base times (2 raised to backoff))
217     */
218     private long randomDelay(int backoff) {
219     return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
220     }
221    
222     /**
223     * Nodes hold partially exchanged data. This class
224     * opportunistically subclasses AtomicReference to represent the
225     * hole. So get() returns hole, and compareAndSet CAS'es value
226     * into hole. Note that this class cannot be parameterized as V
227     * because the sentinel value FAIL is only of type Object.
228 jsr166 1.15 */
229 dl 1.16 static final class Node extends AtomicReference<Object> {
230 dl 1.20 private static final long serialVersionUID = -3221313401284163686L;
231 jsr166 1.21
232 dl 1.16 /** The element offered by the Thread creating this node. */
233     final Object item;
234     /** The Thread creating this node. */
235     final Thread waiter;
236    
237     /**
238     * Creates node with given item and empty hole.
239     * @param item the item.
240     */
241     Node(Object item) {
242     this.item = item;
243     waiter = Thread.currentThread();
244     }
245    
246     /**
247     * Tries to fill in hole. On success, wakes up the waiter.
248     * @param val the value to place in hole.
249     * @return on success, the item; on failure, FAIL.
250     */
251     Object fillHole(Object val) {
252     if (compareAndSet(null, val)) {
253     LockSupport.unpark(waiter);
254     return item;
255     }
256     return FAIL;
257     }
258    
259     /**
260 jsr166 1.17 * Waits for and gets the hole filled in by another thread.
261     * Fails if timed out or interrupted before hole filled.
262 dl 1.16 * @param timed true if the wait is timed.
263     * @param nanos if timed, the maximum wait time.
264     * @return on success, the hole; on failure, FAIL.
265     */
266     Object waitForHole(boolean timed, long nanos) {
267     long lastTime = (timed)? System.nanoTime() : 0;
268 dl 1.18 Object h;
269     while ((h = get()) == null) {
270     // If interrupted or timed out, try to cancel by
271     // CASing FAIL as hole value.
272     if (Thread.currentThread().isInterrupted() ||
273 jsr166 1.19 (timed && nanos <= 0))
274 dl 1.18 compareAndSet(null, FAIL);
275     else if (!timed)
276 dl 1.16 LockSupport.park();
277     else {
278     LockSupport.parkNanos(nanos);
279     long now = System.nanoTime();
280     nanos -= now - lastTime;
281     lastTime = now;
282     }
283     }
284     return h;
285     }
286 tim 1.1 }
287    
288     /**
289     * Waits for another thread to arrive at this exchange point (unless
290     * it is {@link Thread#interrupt interrupted}),
291     * and then transfers the given object to it, receiving its object
292     * in return.
293 jsr166 1.17 *
294 tim 1.1 * <p>If another thread is already waiting at the exchange point then
295     * it is resumed for thread scheduling purposes and receives the object
296     * passed in by the current thread. The current thread returns immediately,
297     * receiving the object passed to the exchange by that other thread.
298 jsr166 1.17 *
299 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
300 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
301     * dormant until one of two things happens:
302     * <ul>
303     * <li>Some other thread enters the exchange; or
304     * <li>Some other thread {@link Thread#interrupt interrupts} the current
305     * thread.
306     * </ul>
307     * <p>If the current thread:
308     * <ul>
309 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
310 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
311 jsr166 1.15 * for the exchange,
312 tim 1.1 * </ul>
313 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
314     * interrupted status is cleared.
315 tim 1.1 *
316     * @param x the object to exchange
317     * @return the object provided by the other thread.
318 jsr166 1.15 * @throws InterruptedException if current thread was interrupted
319 dl 1.3 * while waiting
320 jsr166 1.15 */
321 tim 1.1 public V exchange(V x) throws InterruptedException {
322 dl 1.2 try {
323 dl 1.16 return (V)doExchange(x, false, 0);
324 jsr166 1.15 } catch (TimeoutException cannotHappen) {
325 dl 1.2 throw new Error(cannotHappen);
326     }
327 tim 1.1 }
328    
329     /**
330     * Waits for another thread to arrive at this exchange point (unless
331     * it is {@link Thread#interrupt interrupted}, or the specified waiting
332     * time elapses),
333     * and then transfers the given object to it, receiving its object
334     * in return.
335     *
336     * <p>If another thread is already waiting at the exchange point then
337     * it is resumed for thread scheduling purposes and receives the object
338     * passed in by the current thread. The current thread returns immediately,
339     * receiving the object passed to the exchange by that other thread.
340     *
341 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
342 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
343     * dormant until one of three things happens:
344     * <ul>
345     * <li>Some other thread enters the exchange; or
346     * <li>Some other thread {@link Thread#interrupt interrupts} the current
347     * thread; or
348     * <li>The specified waiting time elapses.
349     * </ul>
350     * <p>If the current thread:
351     * <ul>
352 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
353 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
354 jsr166 1.15 * for the exchange,
355 tim 1.1 * </ul>
356 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
357     * interrupted status is cleared.
358 tim 1.1 *
359     * <p>If the specified waiting time elapses then {@link TimeoutException}
360     * is thrown.
361 jsr166 1.15 * If the time is
362 tim 1.1 * less than or equal to zero, the method will not wait at all.
363     *
364     * @param x the object to exchange
365     * @param timeout the maximum time to wait
366 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
367 tim 1.1 * @return the object provided by the other thread.
368 dl 1.3 * @throws InterruptedException if current thread was interrupted
369     * while waiting
370 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
371     * another thread enters the exchange.
372 jsr166 1.15 */
373     public V exchange(V x, long timeout, TimeUnit unit)
374 tim 1.1 throws InterruptedException, TimeoutException {
375 dl 1.16 return (V)doExchange(x, true, unit.toNanos(timeout));
376 tim 1.1 }
377     }