ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.8
Committed: Tue Aug 26 00:09:18 2003 UTC (20 years, 9 months ago) by dholmes
Branch: MAIN
Changes since 1.7: +2 -2 lines
Log Message:
In response to Eamonn's comment that "best effort lower bound" is not
defined for waiting times, all such references have been deleted. The
preceding text makes it clear that the time must elapse before the
method will return, and trying to say anything about the maximum waiting
time is pointless. We can still say something to this effect in the package
docs if we want.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 tim 1.1
10     /**
11     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12     * can exchange objects. Each thread presents some object on entry to
13     * the {@link #exchange exchange} method, and receives the object presented by
14     * the other thread on return.
15     *
16     * <p><b>Sample Usage:</b>
17     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18     * swap buffers between threads so that the thread filling the
19     * buffer gets a freshly
20     * emptied one when it needs it, handing off the filled one to
21     * the thread emptying the buffer.
22     * <pre>
23     * class FillAndEmpty {
24     * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25     * Buffer initialEmptyBuffer = ... a made-up type
26     * Buffer initialFullBuffer = ...
27     *
28     * class FillingLoop implements Runnable {
29     * public void run() {
30     * Buffer currentBuffer = initialEmptyBuffer;
31     * try {
32     * while (currentBuffer != null) {
33     * addToBuffer(currentBuffer);
34     * if (currentBuffer.full())
35     * currentBuffer = exchanger.exchange(currentBuffer);
36     * }
37 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
38 tim 1.1 * }
39     * }
40     *
41     * class EmptyingLoop implements Runnable {
42     * public void run() {
43     * Buffer currentBuffer = initialFullBuffer;
44     * try {
45     * while (currentBuffer != null) {
46     * takeFromBuffer(currentBuffer);
47     * if (currentBuffer.empty())
48     * currentBuffer = exchanger.exchange(currentBuffer);
49     * }
50 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
51 tim 1.1 * }
52     * }
53     *
54     * void start() {
55     * new Thread(new FillingLoop()).start();
56     * new Thread(new EmptyingLoop()).start();
57     * }
58     * }
59     * </pre>
60     *
61     * @fixme change example to use a bounded queue?
62     *
63     * @since 1.5
64     * @spec JSR-166
65 dholmes 1.8 * @revised $Date: 2003/08/08 20:05:07 $
66 tim 1.7 * @editor $Author: tim $
67 dl 1.3 * @author Doug Lea
68 tim 1.1 */
69     public class Exchanger<V> {
70 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
71     private final Condition taken = lock.newCondition();
72    
73     /** Holder for the item being exchanged */
74     private V item;
75    
76     /**
77     * Arrival count transitions from 0 to 1 to 2 then back to 0
78     * during an exchange.
79     */
80     private int arrivalCount;
81    
82 dl 1.3 /**
83     * Main exchange function, handling the different policy variants.
84     */
85 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
86     lock.lock();
87     try {
88     V other;
89 dl 1.5
90     // If arrival count already at two, we must wait for
91     // a previous pair to finish and reset the count;
92     while (arrivalCount == 2) {
93     if (!timed)
94     taken.await();
95     else if (nanos > 0)
96     nanos = taken.awaitNanos(nanos);
97     else
98     throw new TimeoutException();
99     }
100    
101 dl 1.2 int count = ++arrivalCount;
102    
103     // If item is already waiting, replace it and signal other thread
104     if (count == 2) {
105     other = item;
106     item = x;
107     taken.signal();
108     return other;
109     }
110    
111     // Otherwise, set item and wait for another thread to
112     // replace it and signal us.
113    
114     item = x;
115     InterruptedException interrupted = null;
116     try {
117     while (arrivalCount != 2) {
118     if (!timed)
119     taken.await();
120     else if (nanos > 0)
121     nanos = taken.awaitNanos(nanos);
122     else
123     break; // timed out
124     }
125 tim 1.7 } catch (InterruptedException ie) {
126 dl 1.2 interrupted = ie;
127     }
128    
129 dl 1.5 // Get and reset item and count after the wait.
130     // (We need to do this even if wait was aborted.)
131 dl 1.2 other = item;
132     item = null;
133     count = arrivalCount;
134     arrivalCount = 0;
135 dl 1.5 taken.signal();
136 dl 1.2
137     // If the other thread replaced item, then we must
138     // continue even if cancelled.
139     if (count == 2) {
140     if (interrupted != null)
141     Thread.currentThread().interrupt();
142     return other;
143     }
144    
145 dl 1.5 // If no one is waiting for us, we can back out
146     if (interrupted != null)
147 dl 1.2 throw interrupted;
148     else // must be timeout
149     throw new TimeoutException();
150 tim 1.7 } finally {
151 dl 1.2 lock.unlock();
152     }
153     }
154 tim 1.1
155     /**
156 tim 1.6 * Create a new Exchanger.
157 tim 1.1 **/
158     public Exchanger() {
159     }
160    
161     /**
162     * Waits for another thread to arrive at this exchange point (unless
163     * it is {@link Thread#interrupt interrupted}),
164     * and then transfers the given object to it, receiving its object
165     * in return.
166     * <p>If another thread is already waiting at the exchange point then
167     * it is resumed for thread scheduling purposes and receives the object
168     * passed in by the current thread. The current thread returns immediately,
169     * receiving the object passed to the exchange by that other thread.
170     * <p>If no other thread is already waiting at the exchange then the
171     * current thread is disabled for thread scheduling purposes and lies
172     * dormant until one of two things happens:
173     * <ul>
174     * <li>Some other thread enters the exchange; or
175     * <li>Some other thread {@link Thread#interrupt interrupts} the current
176     * thread.
177     * </ul>
178     * <p>If the current thread:
179     * <ul>
180     * <li>has its interrupted status set on entry to this method; or
181     * <li>is {@link Thread#interrupt interrupted} while waiting
182     * for the exchange,
183     * </ul>
184     * then {@link InterruptedException} is thrown and the current thread's
185     * interrupted status is cleared.
186     *
187     * @param x the object to exchange
188     * @return the object provided by the other thread.
189 dl 1.3 * @throws InterruptedException if current thread was interrupted
190     * while waiting
191 tim 1.1 **/
192     public V exchange(V x) throws InterruptedException {
193 dl 1.2 try {
194     return doExchange(x, false, 0);
195 tim 1.7 } catch (TimeoutException cannotHappen) {
196 dl 1.2 throw new Error(cannotHappen);
197     }
198 tim 1.1 }
199    
200     /**
201     * Waits for another thread to arrive at this exchange point (unless
202     * it is {@link Thread#interrupt interrupted}, or the specified waiting
203     * time elapses),
204     * and then transfers the given object to it, receiving its object
205     * in return.
206     *
207     * <p>If another thread is already waiting at the exchange point then
208     * it is resumed for thread scheduling purposes and receives the object
209     * passed in by the current thread. The current thread returns immediately,
210     * receiving the object passed to the exchange by that other thread.
211     *
212     * <p>If no other thread is already waiting at the exchange then the
213     * current thread is disabled for thread scheduling purposes and lies
214     * dormant until one of three things happens:
215     * <ul>
216     * <li>Some other thread enters the exchange; or
217     * <li>Some other thread {@link Thread#interrupt interrupts} the current
218     * thread; or
219     * <li>The specified waiting time elapses.
220     * </ul>
221     * <p>If the current thread:
222     * <ul>
223     * <li>has its interrupted status set on entry to this method; or
224     * <li>is {@link Thread#interrupt interrupted} while waiting
225     * for the exchange,
226     * </ul>
227     * then {@link InterruptedException} is thrown and the current thread's
228     * interrupted status is cleared.
229     *
230     * <p>If the specified waiting time elapses then {@link TimeoutException}
231     * is thrown.
232 dholmes 1.8 * If the time is
233 tim 1.1 * less than or equal to zero, the method will not wait at all.
234     *
235     * @param x the object to exchange
236     * @param timeout the maximum time to wait
237 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
238 tim 1.1 * @return the object provided by the other thread.
239 dl 1.3 * @throws InterruptedException if current thread was interrupted
240     * while waiting
241 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
242     * another thread enters the exchange.
243     **/
244 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
245 tim 1.1 throws InterruptedException, TimeoutException {
246 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
247 tim 1.1 }
248    
249     }
250    
251