ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.15
Committed: Tue Apr 26 01:17:18 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.14: +28 -30 lines
Log Message:
doc fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * A synchronization point at which two threads can exchange objects.
12 * Each thread presents some object on entry to the {@link #exchange
13 * exchange} method, and receives the object presented by the other
14 * thread on return.
15 *
16 * <p><b>Sample Usage:</b>
17 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18 * swap buffers between threads so that the thread filling the
19 * buffer gets a freshly
20 * emptied one when it needs it, handing off the filled one to
21 * the thread emptying the buffer.
22 * <pre>
23 * class FillAndEmpty {
24 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
25 * DataBuffer initialEmptyBuffer = ... a made-up type
26 * DataBuffer initialFullBuffer = ...
27 *
28 * class FillingLoop implements Runnable {
29 * public void run() {
30 * DataBuffer currentBuffer = initialEmptyBuffer;
31 * try {
32 * while (currentBuffer != null) {
33 * addToBuffer(currentBuffer);
34 * if (currentBuffer.full())
35 * currentBuffer = exchanger.exchange(currentBuffer);
36 * }
37 * } catch (InterruptedException ex) { ... handle ... }
38 * }
39 * }
40 *
41 * class EmptyingLoop implements Runnable {
42 * public void run() {
43 * DataBuffer currentBuffer = initialFullBuffer;
44 * try {
45 * while (currentBuffer != null) {
46 * takeFromBuffer(currentBuffer);
47 * if (currentBuffer.empty())
48 * currentBuffer = exchanger.exchange(currentBuffer);
49 * }
50 * } catch (InterruptedException ex) { ... handle ...}
51 * }
52 * }
53 *
54 * void start() {
55 * new Thread(new FillingLoop()).start();
56 * new Thread(new EmptyingLoop()).start();
57 * }
58 * }
59 * </pre>
60 *
61 * @since 1.5
62 * @author Doug Lea
63 * @param <V> The type of objects that may be exchanged
64 */
65 public class Exchanger<V> {
66 private final ReentrantLock lock = new ReentrantLock();
67 private final Condition taken = lock.newCondition();
68
69 /** Holder for the item being exchanged */
70 private V item;
71
72 /**
73 * Arrival count transitions from 0 to 1 to 2 then back to 0
74 * during an exchange.
75 */
76 private int arrivalCount;
77
78 /**
79 * Main exchange function, handling the different policy variants.
80 */
81 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
82 lock.lock();
83 try {
84 V other;
85
86 // If arrival count already at two, we must wait for
87 // a previous pair to finish and reset the count;
88 while (arrivalCount == 2) {
89 if (!timed)
90 taken.await();
91 else if (nanos > 0)
92 nanos = taken.awaitNanos(nanos);
93 else
94 throw new TimeoutException();
95 }
96
97 int count = ++arrivalCount;
98
99 // If item is already waiting, replace it and signal other thread
100 if (count == 2) {
101 other = item;
102 item = x;
103 taken.signal();
104 return other;
105 }
106
107 // Otherwise, set item and wait for another thread to
108 // replace it and signal us.
109
110 item = x;
111 InterruptedException interrupted = null;
112 try {
113 while (arrivalCount != 2) {
114 if (!timed)
115 taken.await();
116 else if (nanos > 0)
117 nanos = taken.awaitNanos(nanos);
118 else
119 break; // timed out
120 }
121 } catch (InterruptedException ie) {
122 interrupted = ie;
123 }
124
125 // Get and reset item and count after the wait.
126 // (We need to do this even if wait was aborted.)
127 other = item;
128 item = null;
129 count = arrivalCount;
130 arrivalCount = 0;
131 taken.signal();
132
133 // If the other thread replaced item, then we must
134 // continue even if cancelled.
135 if (count == 2) {
136 if (interrupted != null)
137 Thread.currentThread().interrupt();
138 return other;
139 }
140
141 // If no one is waiting for us, we can back out
142 if (interrupted != null)
143 throw interrupted;
144 else // must be timeout
145 throw new TimeoutException();
146 } finally {
147 lock.unlock();
148 }
149 }
150
151 /**
152 * Creates a new Exchanger.
153 */
154 public Exchanger() {
155 }
156
157 /**
158 * Waits for another thread to arrive at this exchange point (unless
159 * it is {@link Thread#interrupt interrupted}),
160 * and then transfers the given object to it, receiving its object
161 * in return.
162 * <p>If another thread is already waiting at the exchange point then
163 * it is resumed for thread scheduling purposes and receives the object
164 * passed in by the current thread. The current thread returns immediately,
165 * receiving the object passed to the exchange by that other thread.
166 * <p>If no other thread is already waiting at the exchange then the
167 * current thread is disabled for thread scheduling purposes and lies
168 * dormant until one of two things happens:
169 * <ul>
170 * <li>Some other thread enters the exchange; or
171 * <li>Some other thread {@link Thread#interrupt interrupts} the current
172 * thread.
173 * </ul>
174 * <p>If the current thread:
175 * <ul>
176 * <li>has its interrupted status set on entry to this method; or
177 * <li>is {@link Thread#interrupt interrupted} while waiting
178 * for the exchange,
179 * </ul>
180 * then {@link InterruptedException} is thrown and the current thread's
181 * interrupted status is cleared.
182 *
183 * @param x the object to exchange
184 * @return the object provided by the other thread.
185 * @throws InterruptedException if current thread was interrupted
186 * while waiting
187 */
188 public V exchange(V x) throws InterruptedException {
189 try {
190 return doExchange(x, false, 0);
191 } catch (TimeoutException cannotHappen) {
192 throw new Error(cannotHappen);
193 }
194 }
195
196 /**
197 * Waits for another thread to arrive at this exchange point (unless
198 * it is {@link Thread#interrupt interrupted}, or the specified waiting
199 * time elapses),
200 * and then transfers the given object to it, receiving its object
201 * in return.
202 *
203 * <p>If another thread is already waiting at the exchange point then
204 * it is resumed for thread scheduling purposes and receives the object
205 * passed in by the current thread. The current thread returns immediately,
206 * receiving the object passed to the exchange by that other thread.
207 *
208 * <p>If no other thread is already waiting at the exchange then the
209 * current thread is disabled for thread scheduling purposes and lies
210 * dormant until one of three things happens:
211 * <ul>
212 * <li>Some other thread enters the exchange; or
213 * <li>Some other thread {@link Thread#interrupt interrupts} the current
214 * thread; or
215 * <li>The specified waiting time elapses.
216 * </ul>
217 * <p>If the current thread:
218 * <ul>
219 * <li>has its interrupted status set on entry to this method; or
220 * <li>is {@link Thread#interrupt interrupted} while waiting
221 * for the exchange,
222 * </ul>
223 * then {@link InterruptedException} is thrown and the current thread's
224 * interrupted status is cleared.
225 *
226 * <p>If the specified waiting time elapses then {@link TimeoutException}
227 * is thrown.
228 * If the time is
229 * less than or equal to zero, the method will not wait at all.
230 *
231 * @param x the object to exchange
232 * @param timeout the maximum time to wait
233 * @param unit the time unit of the <tt>timeout</tt> argument.
234 * @return the object provided by the other thread.
235 * @throws InterruptedException if current thread was interrupted
236 * while waiting
237 * @throws TimeoutException if the specified waiting time elapses before
238 * another thread enters the exchange.
239 */
240 public V exchange(V x, long timeout, TimeUnit unit)
241 throws InterruptedException, TimeoutException {
242 return doExchange(x, true, unit.toNanos(timeout));
243 }
244
245 }