ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.7
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.6: +7 -12 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 tim 1.1
10     /**
11     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12     * can exchange objects. Each thread presents some object on entry to
13     * the {@link #exchange exchange} method, and receives the object presented by
14     * the other thread on return.
15     *
16     * <p><b>Sample Usage:</b>
17     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18     * swap buffers between threads so that the thread filling the
19     * buffer gets a freshly
20     * emptied one when it needs it, handing off the filled one to
21     * the thread emptying the buffer.
22     * <pre>
23     * class FillAndEmpty {
24     * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25     * Buffer initialEmptyBuffer = ... a made-up type
26     * Buffer initialFullBuffer = ...
27     *
28     * class FillingLoop implements Runnable {
29     * public void run() {
30     * Buffer currentBuffer = initialEmptyBuffer;
31     * try {
32     * while (currentBuffer != null) {
33     * addToBuffer(currentBuffer);
34     * if (currentBuffer.full())
35     * currentBuffer = exchanger.exchange(currentBuffer);
36     * }
37 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
38 tim 1.1 * }
39     * }
40     *
41     * class EmptyingLoop implements Runnable {
42     * public void run() {
43     * Buffer currentBuffer = initialFullBuffer;
44     * try {
45     * while (currentBuffer != null) {
46     * takeFromBuffer(currentBuffer);
47     * if (currentBuffer.empty())
48     * currentBuffer = exchanger.exchange(currentBuffer);
49     * }
50 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
51 tim 1.1 * }
52     * }
53     *
54     * void start() {
55     * new Thread(new FillingLoop()).start();
56     * new Thread(new EmptyingLoop()).start();
57     * }
58     * }
59     * </pre>
60     *
61     * @fixme change example to use a bounded queue?
62     *
63     * @since 1.5
64     * @spec JSR-166
65 tim 1.7 * @revised $Date: 2003/08/06 18:22:09 $
66     * @editor $Author: tim $
67 dl 1.3 * @author Doug Lea
68 tim 1.1 */
69     public class Exchanger<V> {
70 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
71     private final Condition taken = lock.newCondition();
72    
73     /** Holder for the item being exchanged */
74     private V item;
75    
76     /**
77     * Arrival count transitions from 0 to 1 to 2 then back to 0
78     * during an exchange.
79     */
80     private int arrivalCount;
81    
82 dl 1.3 /**
83     * Main exchange function, handling the different policy variants.
84     */
85 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
86     lock.lock();
87     try {
88     V other;
89 dl 1.5
90     // If arrival count already at two, we must wait for
91     // a previous pair to finish and reset the count;
92     while (arrivalCount == 2) {
93     if (!timed)
94     taken.await();
95     else if (nanos > 0)
96     nanos = taken.awaitNanos(nanos);
97     else
98     throw new TimeoutException();
99     }
100    
101 dl 1.2 int count = ++arrivalCount;
102    
103     // If item is already waiting, replace it and signal other thread
104     if (count == 2) {
105     other = item;
106     item = x;
107     taken.signal();
108     return other;
109     }
110    
111     // Otherwise, set item and wait for another thread to
112     // replace it and signal us.
113    
114     item = x;
115     InterruptedException interrupted = null;
116     try {
117     while (arrivalCount != 2) {
118     if (!timed)
119     taken.await();
120     else if (nanos > 0)
121     nanos = taken.awaitNanos(nanos);
122     else
123     break; // timed out
124     }
125 tim 1.7 } catch (InterruptedException ie) {
126 dl 1.2 interrupted = ie;
127     }
128    
129 dl 1.5 // Get and reset item and count after the wait.
130     // (We need to do this even if wait was aborted.)
131 dl 1.2 other = item;
132     item = null;
133     count = arrivalCount;
134     arrivalCount = 0;
135 dl 1.5 taken.signal();
136 dl 1.2
137     // If the other thread replaced item, then we must
138     // continue even if cancelled.
139     if (count == 2) {
140     if (interrupted != null)
141     Thread.currentThread().interrupt();
142     return other;
143     }
144    
145 dl 1.5 // If no one is waiting for us, we can back out
146     if (interrupted != null)
147 dl 1.2 throw interrupted;
148     else // must be timeout
149     throw new TimeoutException();
150 tim 1.7 } finally {
151 dl 1.2 lock.unlock();
152     }
153     }
154 tim 1.1
155     /**
156 tim 1.6 * Create a new Exchanger.
157 tim 1.1 **/
158     public Exchanger() {
159     }
160    
161     /**
162     * Waits for another thread to arrive at this exchange point (unless
163     * it is {@link Thread#interrupt interrupted}),
164     * and then transfers the given object to it, receiving its object
165     * in return.
166     * <p>If another thread is already waiting at the exchange point then
167     * it is resumed for thread scheduling purposes and receives the object
168     * passed in by the current thread. The current thread returns immediately,
169     * receiving the object passed to the exchange by that other thread.
170     * <p>If no other thread is already waiting at the exchange then the
171     * current thread is disabled for thread scheduling purposes and lies
172     * dormant until one of two things happens:
173     * <ul>
174     * <li>Some other thread enters the exchange; or
175     * <li>Some other thread {@link Thread#interrupt interrupts} the current
176     * thread.
177     * </ul>
178     * <p>If the current thread:
179     * <ul>
180     * <li>has its interrupted status set on entry to this method; or
181     * <li>is {@link Thread#interrupt interrupted} while waiting
182     * for the exchange,
183     * </ul>
184     * then {@link InterruptedException} is thrown and the current thread's
185     * interrupted status is cleared.
186     *
187     * @param x the object to exchange
188     * @return the object provided by the other thread.
189 dl 1.3 * @throws InterruptedException if current thread was interrupted
190     * while waiting
191 tim 1.1 **/
192     public V exchange(V x) throws InterruptedException {
193 dl 1.2 try {
194     return doExchange(x, false, 0);
195 tim 1.7 } catch (TimeoutException cannotHappen) {
196 dl 1.2 throw new Error(cannotHappen);
197     }
198 tim 1.1 }
199    
200     /**
201     * Waits for another thread to arrive at this exchange point (unless
202     * it is {@link Thread#interrupt interrupted}, or the specified waiting
203     * time elapses),
204     * and then transfers the given object to it, receiving its object
205     * in return.
206     *
207     * <p>If another thread is already waiting at the exchange point then
208     * it is resumed for thread scheduling purposes and receives the object
209     * passed in by the current thread. The current thread returns immediately,
210     * receiving the object passed to the exchange by that other thread.
211     *
212     * <p>If no other thread is already waiting at the exchange then the
213     * current thread is disabled for thread scheduling purposes and lies
214     * dormant until one of three things happens:
215     * <ul>
216     * <li>Some other thread enters the exchange; or
217     * <li>Some other thread {@link Thread#interrupt interrupts} the current
218     * thread; or
219     * <li>The specified waiting time elapses.
220     * </ul>
221     * <p>If the current thread:
222     * <ul>
223     * <li>has its interrupted status set on entry to this method; or
224     * <li>is {@link Thread#interrupt interrupted} while waiting
225     * for the exchange,
226     * </ul>
227     * then {@link InterruptedException} is thrown and the current thread's
228     * interrupted status is cleared.
229     *
230     * <p>If the specified waiting time elapses then {@link TimeoutException}
231     * is thrown.
232     * The given waiting time is a best-effort lower bound. If the time is
233     * less than or equal to zero, the method will not wait at all.
234     *
235     * @param x the object to exchange
236     * @param timeout the maximum time to wait
237 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
238 tim 1.1 * @return the object provided by the other thread.
239 dl 1.3 * @throws InterruptedException if current thread was interrupted
240     * while waiting
241 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
242     * another thread enters the exchange.
243     **/
244 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
245 tim 1.1 throws InterruptedException, TimeoutException {
246 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
247 tim 1.1 }
248    
249     }
250    
251