ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.9
Committed: Sun Aug 31 13:33:13 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.8: +5 -10 lines
Log Message:
Removed non-standard tags and misc javadoc cleanup

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12 * can exchange objects. Each thread presents some object on entry to
13 * the {@link #exchange exchange} method, and receives the object presented by
14 * the other thread on return.
15 *
16 * <p><b>Sample Usage:</b>
17 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18 * swap buffers between threads so that the thread filling the
19 * buffer gets a freshly
20 * emptied one when it needs it, handing off the filled one to
21 * the thread emptying the buffer.
22 * <pre>
23 * class FillAndEmpty {
24 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
25 * DataBuffer initialEmptyBuffer = ... a made-up type
26 * DataBuffer initialFullBuffer = ...
27 *
28 * class FillingLoop implements Runnable {
29 * public void run() {
30 * DataBuffer currentBuffer = initialEmptyBuffer;
31 * try {
32 * while (currentBuffer != null) {
33 * addToBuffer(currentBuffer);
34 * if (currentBuffer.full())
35 * currentBuffer = exchanger.exchange(currentBuffer);
36 * }
37 * } catch (InterruptedException ex) { ... handle ... }
38 * }
39 * }
40 *
41 * class EmptyingLoop implements Runnable {
42 * public void run() {
43 * DataBuffer currentBuffer = initialFullBuffer;
44 * try {
45 * while (currentBuffer != null) {
46 * takeFromBuffer(currentBuffer);
47 * if (currentBuffer.empty())
48 * currentBuffer = exchanger.exchange(currentBuffer);
49 * }
50 * } catch (InterruptedException ex) { ... handle ...}
51 * }
52 * }
53 *
54 * void start() {
55 * new Thread(new FillingLoop()).start();
56 * new Thread(new EmptyingLoop()).start();
57 * }
58 * }
59 * </pre>
60 *
61 * @since 1.5
62 * @author Doug Lea
63 */
64 public class Exchanger<V> {
65 private final ReentrantLock lock = new ReentrantLock();
66 private final Condition taken = lock.newCondition();
67
68 /** Holder for the item being exchanged */
69 private V item;
70
71 /**
72 * Arrival count transitions from 0 to 1 to 2 then back to 0
73 * during an exchange.
74 */
75 private int arrivalCount;
76
77 /**
78 * Main exchange function, handling the different policy variants.
79 */
80 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
81 lock.lock();
82 try {
83 V other;
84
85 // If arrival count already at two, we must wait for
86 // a previous pair to finish and reset the count;
87 while (arrivalCount == 2) {
88 if (!timed)
89 taken.await();
90 else if (nanos > 0)
91 nanos = taken.awaitNanos(nanos);
92 else
93 throw new TimeoutException();
94 }
95
96 int count = ++arrivalCount;
97
98 // If item is already waiting, replace it and signal other thread
99 if (count == 2) {
100 other = item;
101 item = x;
102 taken.signal();
103 return other;
104 }
105
106 // Otherwise, set item and wait for another thread to
107 // replace it and signal us.
108
109 item = x;
110 InterruptedException interrupted = null;
111 try {
112 while (arrivalCount != 2) {
113 if (!timed)
114 taken.await();
115 else if (nanos > 0)
116 nanos = taken.awaitNanos(nanos);
117 else
118 break; // timed out
119 }
120 } catch (InterruptedException ie) {
121 interrupted = ie;
122 }
123
124 // Get and reset item and count after the wait.
125 // (We need to do this even if wait was aborted.)
126 other = item;
127 item = null;
128 count = arrivalCount;
129 arrivalCount = 0;
130 taken.signal();
131
132 // If the other thread replaced item, then we must
133 // continue even if cancelled.
134 if (count == 2) {
135 if (interrupted != null)
136 Thread.currentThread().interrupt();
137 return other;
138 }
139
140 // If no one is waiting for us, we can back out
141 if (interrupted != null)
142 throw interrupted;
143 else // must be timeout
144 throw new TimeoutException();
145 } finally {
146 lock.unlock();
147 }
148 }
149
150 /**
151 * Create a new Exchanger.
152 **/
153 public Exchanger() {
154 }
155
156 /**
157 * Waits for another thread to arrive at this exchange point (unless
158 * it is {@link Thread#interrupt interrupted}),
159 * and then transfers the given object to it, receiving its object
160 * in return.
161 * <p>If another thread is already waiting at the exchange point then
162 * it is resumed for thread scheduling purposes and receives the object
163 * passed in by the current thread. The current thread returns immediately,
164 * receiving the object passed to the exchange by that other thread.
165 * <p>If no other thread is already waiting at the exchange then the
166 * current thread is disabled for thread scheduling purposes and lies
167 * dormant until one of two things happens:
168 * <ul>
169 * <li>Some other thread enters the exchange; or
170 * <li>Some other thread {@link Thread#interrupt interrupts} the current
171 * thread.
172 * </ul>
173 * <p>If the current thread:
174 * <ul>
175 * <li>has its interrupted status set on entry to this method; or
176 * <li>is {@link Thread#interrupt interrupted} while waiting
177 * for the exchange,
178 * </ul>
179 * then {@link InterruptedException} is thrown and the current thread's
180 * interrupted status is cleared.
181 *
182 * @param x the object to exchange
183 * @return the object provided by the other thread.
184 * @throws InterruptedException if current thread was interrupted
185 * while waiting
186 **/
187 public V exchange(V x) throws InterruptedException {
188 try {
189 return doExchange(x, false, 0);
190 } catch (TimeoutException cannotHappen) {
191 throw new Error(cannotHappen);
192 }
193 }
194
195 /**
196 * Waits for another thread to arrive at this exchange point (unless
197 * it is {@link Thread#interrupt interrupted}, or the specified waiting
198 * time elapses),
199 * and then transfers the given object to it, receiving its object
200 * in return.
201 *
202 * <p>If another thread is already waiting at the exchange point then
203 * it is resumed for thread scheduling purposes and receives the object
204 * passed in by the current thread. The current thread returns immediately,
205 * receiving the object passed to the exchange by that other thread.
206 *
207 * <p>If no other thread is already waiting at the exchange then the
208 * current thread is disabled for thread scheduling purposes and lies
209 * dormant until one of three things happens:
210 * <ul>
211 * <li>Some other thread enters the exchange; or
212 * <li>Some other thread {@link Thread#interrupt interrupts} the current
213 * thread; or
214 * <li>The specified waiting time elapses.
215 * </ul>
216 * <p>If the current thread:
217 * <ul>
218 * <li>has its interrupted status set on entry to this method; or
219 * <li>is {@link Thread#interrupt interrupted} while waiting
220 * for the exchange,
221 * </ul>
222 * then {@link InterruptedException} is thrown and the current thread's
223 * interrupted status is cleared.
224 *
225 * <p>If the specified waiting time elapses then {@link TimeoutException}
226 * is thrown.
227 * If the time is
228 * less than or equal to zero, the method will not wait at all.
229 *
230 * @param x the object to exchange
231 * @param timeout the maximum time to wait
232 * @param unit the time unit of the <tt>timeout</tt> argument.
233 * @return the object provided by the other thread.
234 * @throws InterruptedException if current thread was interrupted
235 * while waiting
236 * @throws TimeoutException if the specified waiting time elapses before
237 * another thread enters the exchange.
238 **/
239 public V exchange(V x, long timeout, TimeUnit unit)
240 throws InterruptedException, TimeoutException {
241 return doExchange(x, true, unit.toNanos(timeout));
242 }
243
244 }
245
246