ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.8
Committed: Tue Aug 26 00:09:18 2003 UTC (20 years, 9 months ago) by dholmes
Branch: MAIN
Changes since 1.7: +2 -2 lines
Log Message:
In response to Eamonn's comment that "best effort lower bound" is not
defined for waiting times, all such references have been deleted. The
preceding text makes it clear that the time must elapse before the
method will return, and trying to say anything about the maximum waiting
time is pointless. We can still say something to this effect in the package
docs if we want.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12 * can exchange objects. Each thread presents some object on entry to
13 * the {@link #exchange exchange} method, and receives the object presented by
14 * the other thread on return.
15 *
16 * <p><b>Sample Usage:</b>
17 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18 * swap buffers between threads so that the thread filling the
19 * buffer gets a freshly
20 * emptied one when it needs it, handing off the filled one to
21 * the thread emptying the buffer.
22 * <pre>
23 * class FillAndEmpty {
24 * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25 * Buffer initialEmptyBuffer = ... a made-up type
26 * Buffer initialFullBuffer = ...
27 *
28 * class FillingLoop implements Runnable {
29 * public void run() {
30 * Buffer currentBuffer = initialEmptyBuffer;
31 * try {
32 * while (currentBuffer != null) {
33 * addToBuffer(currentBuffer);
34 * if (currentBuffer.full())
35 * currentBuffer = exchanger.exchange(currentBuffer);
36 * }
37 * } catch (InterruptedException ex) { ... handle ... }
38 * }
39 * }
40 *
41 * class EmptyingLoop implements Runnable {
42 * public void run() {
43 * Buffer currentBuffer = initialFullBuffer;
44 * try {
45 * while (currentBuffer != null) {
46 * takeFromBuffer(currentBuffer);
47 * if (currentBuffer.empty())
48 * currentBuffer = exchanger.exchange(currentBuffer);
49 * }
50 * } catch (InterruptedException ex) { ... handle ...}
51 * }
52 * }
53 *
54 * void start() {
55 * new Thread(new FillingLoop()).start();
56 * new Thread(new EmptyingLoop()).start();
57 * }
58 * }
59 * </pre>
60 *
61 * @fixme change example to use a bounded queue?
62 *
63 * @since 1.5
64 * @spec JSR-166
65 * @revised $Date: 2003/08/08 20:05:07 $
66 * @editor $Author: tim $
67 * @author Doug Lea
68 */
69 public class Exchanger<V> {
70 private final ReentrantLock lock = new ReentrantLock();
71 private final Condition taken = lock.newCondition();
72
73 /** Holder for the item being exchanged */
74 private V item;
75
76 /**
77 * Arrival count transitions from 0 to 1 to 2 then back to 0
78 * during an exchange.
79 */
80 private int arrivalCount;
81
82 /**
83 * Main exchange function, handling the different policy variants.
84 */
85 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
86 lock.lock();
87 try {
88 V other;
89
90 // If arrival count already at two, we must wait for
91 // a previous pair to finish and reset the count;
92 while (arrivalCount == 2) {
93 if (!timed)
94 taken.await();
95 else if (nanos > 0)
96 nanos = taken.awaitNanos(nanos);
97 else
98 throw new TimeoutException();
99 }
100
101 int count = ++arrivalCount;
102
103 // If item is already waiting, replace it and signal other thread
104 if (count == 2) {
105 other = item;
106 item = x;
107 taken.signal();
108 return other;
109 }
110
111 // Otherwise, set item and wait for another thread to
112 // replace it and signal us.
113
114 item = x;
115 InterruptedException interrupted = null;
116 try {
117 while (arrivalCount != 2) {
118 if (!timed)
119 taken.await();
120 else if (nanos > 0)
121 nanos = taken.awaitNanos(nanos);
122 else
123 break; // timed out
124 }
125 } catch (InterruptedException ie) {
126 interrupted = ie;
127 }
128
129 // Get and reset item and count after the wait.
130 // (We need to do this even if wait was aborted.)
131 other = item;
132 item = null;
133 count = arrivalCount;
134 arrivalCount = 0;
135 taken.signal();
136
137 // If the other thread replaced item, then we must
138 // continue even if cancelled.
139 if (count == 2) {
140 if (interrupted != null)
141 Thread.currentThread().interrupt();
142 return other;
143 }
144
145 // If no one is waiting for us, we can back out
146 if (interrupted != null)
147 throw interrupted;
148 else // must be timeout
149 throw new TimeoutException();
150 } finally {
151 lock.unlock();
152 }
153 }
154
155 /**
156 * Create a new Exchanger.
157 **/
158 public Exchanger() {
159 }
160
161 /**
162 * Waits for another thread to arrive at this exchange point (unless
163 * it is {@link Thread#interrupt interrupted}),
164 * and then transfers the given object to it, receiving its object
165 * in return.
166 * <p>If another thread is already waiting at the exchange point then
167 * it is resumed for thread scheduling purposes and receives the object
168 * passed in by the current thread. The current thread returns immediately,
169 * receiving the object passed to the exchange by that other thread.
170 * <p>If no other thread is already waiting at the exchange then the
171 * current thread is disabled for thread scheduling purposes and lies
172 * dormant until one of two things happens:
173 * <ul>
174 * <li>Some other thread enters the exchange; or
175 * <li>Some other thread {@link Thread#interrupt interrupts} the current
176 * thread.
177 * </ul>
178 * <p>If the current thread:
179 * <ul>
180 * <li>has its interrupted status set on entry to this method; or
181 * <li>is {@link Thread#interrupt interrupted} while waiting
182 * for the exchange,
183 * </ul>
184 * then {@link InterruptedException} is thrown and the current thread's
185 * interrupted status is cleared.
186 *
187 * @param x the object to exchange
188 * @return the object provided by the other thread.
189 * @throws InterruptedException if current thread was interrupted
190 * while waiting
191 **/
192 public V exchange(V x) throws InterruptedException {
193 try {
194 return doExchange(x, false, 0);
195 } catch (TimeoutException cannotHappen) {
196 throw new Error(cannotHappen);
197 }
198 }
199
200 /**
201 * Waits for another thread to arrive at this exchange point (unless
202 * it is {@link Thread#interrupt interrupted}, or the specified waiting
203 * time elapses),
204 * and then transfers the given object to it, receiving its object
205 * in return.
206 *
207 * <p>If another thread is already waiting at the exchange point then
208 * it is resumed for thread scheduling purposes and receives the object
209 * passed in by the current thread. The current thread returns immediately,
210 * receiving the object passed to the exchange by that other thread.
211 *
212 * <p>If no other thread is already waiting at the exchange then the
213 * current thread is disabled for thread scheduling purposes and lies
214 * dormant until one of three things happens:
215 * <ul>
216 * <li>Some other thread enters the exchange; or
217 * <li>Some other thread {@link Thread#interrupt interrupts} the current
218 * thread; or
219 * <li>The specified waiting time elapses.
220 * </ul>
221 * <p>If the current thread:
222 * <ul>
223 * <li>has its interrupted status set on entry to this method; or
224 * <li>is {@link Thread#interrupt interrupted} while waiting
225 * for the exchange,
226 * </ul>
227 * then {@link InterruptedException} is thrown and the current thread's
228 * interrupted status is cleared.
229 *
230 * <p>If the specified waiting time elapses then {@link TimeoutException}
231 * is thrown.
232 * If the time is
233 * less than or equal to zero, the method will not wait at all.
234 *
235 * @param x the object to exchange
236 * @param timeout the maximum time to wait
237 * @param unit the time unit of the <tt>timeout</tt> argument.
238 * @return the object provided by the other thread.
239 * @throws InterruptedException if current thread was interrupted
240 * while waiting
241 * @throws TimeoutException if the specified waiting time elapses before
242 * another thread enters the exchange.
243 **/
244 public V exchange(V x, long timeout, TimeUnit unit)
245 throws InterruptedException, TimeoutException {
246 return doExchange(x, true, unit.toNanos(timeout));
247 }
248
249 }
250
251