ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1, JSR166_PRERELEASE_0_1
Changes since 1.1: +86 -4 lines
Log Message:
re-check-in initial implementations

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 /**
10 * An <tt>Exchanger</tt> provides a synchronization point at which two threads
11 * can exchange objects. Each thread presents some object on entry to
12 * the {@link #exchange exchange} method, and receives the object presented by
13 * the other thread on return.
14 *
15 * <p><b>Sample Usage:</b>
16 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
17 * swap buffers between threads so that the thread filling the
18 * buffer gets a freshly
19 * emptied one when it needs it, handing off the filled one to
20 * the thread emptying the buffer.
21 * <pre>
22 * class FillAndEmpty {
23 * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
24 * Buffer initialEmptyBuffer = ... a made-up type
25 * Buffer initialFullBuffer = ...
26 *
27 * class FillingLoop implements Runnable {
28 * public void run() {
29 * Buffer currentBuffer = initialEmptyBuffer;
30 * try {
31 * while (currentBuffer != null) {
32 * addToBuffer(currentBuffer);
33 * if (currentBuffer.full())
34 * currentBuffer = exchanger.exchange(currentBuffer);
35 * }
36 * }
37 * catch (InterruptedException ex) { ... handle ... }
38 * }
39 * }
40 *
41 * class EmptyingLoop implements Runnable {
42 * public void run() {
43 * Buffer currentBuffer = initialFullBuffer;
44 * try {
45 * while (currentBuffer != null) {
46 * takeFromBuffer(currentBuffer);
47 * if (currentBuffer.empty())
48 * currentBuffer = exchanger.exchange(currentBuffer);
49 * }
50 * }
51 * catch (InterruptedException ex) { ... handle ...}
52 * }
53 * }
54 *
55 * void start() {
56 * new Thread(new FillingLoop()).start();
57 * new Thread(new EmptyingLoop()).start();
58 * }
59 * }
60 * </pre>
61 *
62 * @fixme change example to use a bounded queue?
63 *
64 * @since 1.5
65 * @spec JSR-166
66 * @revised $Date: 2003/03/31 03:50:08 $
67 * @editor $Author: dholmes $
68 */
69 public class Exchanger<V> {
70 private final ReentrantLock lock = new ReentrantLock();
71 private final Condition taken = lock.newCondition();
72
73 /** Holder for the item being exchanged */
74 private V item;
75
76 /**
77 * Arrival count transitions from 0 to 1 to 2 then back to 0
78 * during an exchange.
79 */
80 private int arrivalCount;
81
82 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
83 lock.lock();
84 try {
85 V other;
86 int count = ++arrivalCount;
87
88 // If item is already waiting, replace it and signal other thread
89 if (count == 2) {
90 other = item;
91 item = x;
92 taken.signal();
93 return other;
94 }
95
96 // Otherwise, set item and wait for another thread to
97 // replace it and signal us.
98
99 item = x;
100 InterruptedException interrupted = null;
101 try {
102 while (arrivalCount != 2) {
103 if (!timed)
104 taken.await();
105 else if (nanos > 0)
106 nanos = taken.awaitNanos(nanos);
107 else
108 break; // timed out
109 }
110 }
111 catch (InterruptedException ie) {
112 interrupted = ie;
113 }
114
115 // get and reset item and count after the wait.
116 other = item;
117 item = null;
118 count = arrivalCount;
119 arrivalCount = 0;
120
121 // If the other thread replaced item, then we must
122 // continue even if cancelled.
123 if (count == 2) {
124 if (interrupted != null)
125 Thread.currentThread().interrupt();
126 return other;
127 }
128
129 // Otherwise, no one is waiting for us, so we can just back out
130 if (interrupted != null) {
131 taken.signal(); // propagate to any other waiting thread
132 throw interrupted;
133 }
134 else // must be timeout
135 throw new TimeoutException();
136 }
137 finally {
138 lock.unlock();
139 }
140 }
141
142 /**
143 * Create a new Exchanger
144 **/
145 public Exchanger() {
146 }
147
148 /**
149 * Waits for another thread to arrive at this exchange point (unless
150 * it is {@link Thread#interrupt interrupted}),
151 * and then transfers the given object to it, receiving its object
152 * in return.
153 * <p>If another thread is already waiting at the exchange point then
154 * it is resumed for thread scheduling purposes and receives the object
155 * passed in by the current thread. The current thread returns immediately,
156 * receiving the object passed to the exchange by that other thread.
157 * <p>If no other thread is already waiting at the exchange then the
158 * current thread is disabled for thread scheduling purposes and lies
159 * dormant until one of two things happens:
160 * <ul>
161 * <li>Some other thread enters the exchange; or
162 * <li>Some other thread {@link Thread#interrupt interrupts} the current
163 * thread.
164 * </ul>
165 * <p>If the current thread:
166 * <ul>
167 * <li>has its interrupted status set on entry to this method; or
168 * <li>is {@link Thread#interrupt interrupted} while waiting
169 * for the exchange,
170 * </ul>
171 * then {@link InterruptedException} is thrown and the current thread's
172 * interrupted status is cleared.
173 *
174 * @param x the object to exchange
175 * @return the object provided by the other thread.
176 * @throws InterruptedException if current thread was interrupted while waiting
177 **/
178 public V exchange(V x) throws InterruptedException {
179 try {
180 return doExchange(x, false, 0);
181 }
182 catch (TimeoutException cannotHappen) {
183 throw new Error(cannotHappen);
184 }
185 }
186
187 /**
188 * Waits for another thread to arrive at this exchange point (unless
189 * it is {@link Thread#interrupt interrupted}, or the specified waiting
190 * time elapses),
191 * and then transfers the given object to it, receiving its object
192 * in return.
193 *
194 * <p>If another thread is already waiting at the exchange point then
195 * it is resumed for thread scheduling purposes and receives the object
196 * passed in by the current thread. The current thread returns immediately,
197 * receiving the object passed to the exchange by that other thread.
198 *
199 * <p>If no other thread is already waiting at the exchange then the
200 * current thread is disabled for thread scheduling purposes and lies
201 * dormant until one of three things happens:
202 * <ul>
203 * <li>Some other thread enters the exchange; or
204 * <li>Some other thread {@link Thread#interrupt interrupts} the current
205 * thread; or
206 * <li>The specified waiting time elapses.
207 * </ul>
208 * <p>If the current thread:
209 * <ul>
210 * <li>has its interrupted status set on entry to this method; or
211 * <li>is {@link Thread#interrupt interrupted} while waiting
212 * for the exchange,
213 * </ul>
214 * then {@link InterruptedException} is thrown and the current thread's
215 * interrupted status is cleared.
216 *
217 * <p>If the specified waiting time elapses then {@link TimeoutException}
218 * is thrown.
219 * The given waiting time is a best-effort lower bound. If the time is
220 * less than or equal to zero, the method will not wait at all.
221 *
222 * @param x the object to exchange
223 * @param timeout the maximum time to wait
224 * @param unit the time unit of the <tt>timeout</tt> argument.
225 * @return the object provided by the other thread.
226 * @throws InterruptedException if current thread was interrupted while waiting
227 * @throws TimeoutException if the specified waiting time elapses before
228 * another thread enters the exchange.
229 **/
230 public V exchange(V x, long timeout, TimeUnit unit)
231 throws InterruptedException, TimeoutException {
232 return doExchange(x, true, unit.toNanos(timeout));
233 }
234
235 }
236
237