ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.5
Committed: Wed Jul 9 23:23:17 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2
Changes since 1.4: +18 -6 lines
Log Message:
Misc performance tunings

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 tim 1.1
10     /**
11     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12     * can exchange objects. Each thread presents some object on entry to
13     * the {@link #exchange exchange} method, and receives the object presented by
14     * the other thread on return.
15     *
16     * <p><b>Sample Usage:</b>
17     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18     * swap buffers between threads so that the thread filling the
19     * buffer gets a freshly
20     * emptied one when it needs it, handing off the filled one to
21     * the thread emptying the buffer.
22     * <pre>
23     * class FillAndEmpty {
24     * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25     * Buffer initialEmptyBuffer = ... a made-up type
26     * Buffer initialFullBuffer = ...
27     *
28     * class FillingLoop implements Runnable {
29     * public void run() {
30     * Buffer currentBuffer = initialEmptyBuffer;
31     * try {
32     * while (currentBuffer != null) {
33     * addToBuffer(currentBuffer);
34     * if (currentBuffer.full())
35     * currentBuffer = exchanger.exchange(currentBuffer);
36     * }
37     * }
38     * catch (InterruptedException ex) { ... handle ... }
39     * }
40     * }
41     *
42     * class EmptyingLoop implements Runnable {
43     * public void run() {
44     * Buffer currentBuffer = initialFullBuffer;
45     * try {
46     * while (currentBuffer != null) {
47     * takeFromBuffer(currentBuffer);
48     * if (currentBuffer.empty())
49     * currentBuffer = exchanger.exchange(currentBuffer);
50     * }
51     * }
52     * catch (InterruptedException ex) { ... handle ...}
53     * }
54     * }
55     *
56     * void start() {
57     * new Thread(new FillingLoop()).start();
58     * new Thread(new EmptyingLoop()).start();
59     * }
60     * }
61     * </pre>
62     *
63     * @fixme change example to use a bounded queue?
64     *
65     * @since 1.5
66     * @spec JSR-166
67 dl 1.5 * @revised $Date: 2003/07/08 00:46:33 $
68 dl 1.3 * @editor $Author: dl $
69     * @author Doug Lea
70 tim 1.1 */
71     public class Exchanger<V> {
72 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
73     private final Condition taken = lock.newCondition();
74    
75     /** Holder for the item being exchanged */
76     private V item;
77    
78     /**
79     * Arrival count transitions from 0 to 1 to 2 then back to 0
80     * during an exchange.
81     */
82     private int arrivalCount;
83    
84 dl 1.3 /**
85     * Main exchange function, handling the different policy variants.
86     */
87 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
88     lock.lock();
89     try {
90     V other;
91 dl 1.5
92     // If arrival count already at two, we must wait for
93     // a previous pair to finish and reset the count;
94     while (arrivalCount == 2) {
95     if (!timed)
96     taken.await();
97     else if (nanos > 0)
98     nanos = taken.awaitNanos(nanos);
99     else
100     throw new TimeoutException();
101     }
102    
103 dl 1.2 int count = ++arrivalCount;
104    
105     // If item is already waiting, replace it and signal other thread
106     if (count == 2) {
107     other = item;
108     item = x;
109     taken.signal();
110     return other;
111     }
112    
113     // Otherwise, set item and wait for another thread to
114     // replace it and signal us.
115    
116     item = x;
117     InterruptedException interrupted = null;
118     try {
119     while (arrivalCount != 2) {
120     if (!timed)
121     taken.await();
122     else if (nanos > 0)
123     nanos = taken.awaitNanos(nanos);
124     else
125     break; // timed out
126     }
127     }
128     catch (InterruptedException ie) {
129     interrupted = ie;
130     }
131    
132 dl 1.5 // Get and reset item and count after the wait.
133     // (We need to do this even if wait was aborted.)
134 dl 1.2 other = item;
135     item = null;
136     count = arrivalCount;
137     arrivalCount = 0;
138 dl 1.5 taken.signal();
139 dl 1.2
140     // If the other thread replaced item, then we must
141     // continue even if cancelled.
142     if (count == 2) {
143     if (interrupted != null)
144     Thread.currentThread().interrupt();
145     return other;
146     }
147    
148 dl 1.5 // If no one is waiting for us, we can back out
149     if (interrupted != null)
150 dl 1.2 throw interrupted;
151     else // must be timeout
152     throw new TimeoutException();
153     }
154     finally {
155     lock.unlock();
156     }
157     }
158 tim 1.1
159     /**
160     * Create a new Exchanger
161     **/
162     public Exchanger() {
163     }
164    
165     /**
166     * Waits for another thread to arrive at this exchange point (unless
167     * it is {@link Thread#interrupt interrupted}),
168     * and then transfers the given object to it, receiving its object
169     * in return.
170     * <p>If another thread is already waiting at the exchange point then
171     * it is resumed for thread scheduling purposes and receives the object
172     * passed in by the current thread. The current thread returns immediately,
173     * receiving the object passed to the exchange by that other thread.
174     * <p>If no other thread is already waiting at the exchange then the
175     * current thread is disabled for thread scheduling purposes and lies
176     * dormant until one of two things happens:
177     * <ul>
178     * <li>Some other thread enters the exchange; or
179     * <li>Some other thread {@link Thread#interrupt interrupts} the current
180     * thread.
181     * </ul>
182     * <p>If the current thread:
183     * <ul>
184     * <li>has its interrupted status set on entry to this method; or
185     * <li>is {@link Thread#interrupt interrupted} while waiting
186     * for the exchange,
187     * </ul>
188     * then {@link InterruptedException} is thrown and the current thread's
189     * interrupted status is cleared.
190     *
191     * @param x the object to exchange
192     * @return the object provided by the other thread.
193 dl 1.3 * @throws InterruptedException if current thread was interrupted
194     * while waiting
195 tim 1.1 **/
196     public V exchange(V x) throws InterruptedException {
197 dl 1.2 try {
198     return doExchange(x, false, 0);
199     }
200     catch (TimeoutException cannotHappen) {
201     throw new Error(cannotHappen);
202     }
203 tim 1.1 }
204    
205     /**
206     * Waits for another thread to arrive at this exchange point (unless
207     * it is {@link Thread#interrupt interrupted}, or the specified waiting
208     * time elapses),
209     * and then transfers the given object to it, receiving its object
210     * in return.
211     *
212     * <p>If another thread is already waiting at the exchange point then
213     * it is resumed for thread scheduling purposes and receives the object
214     * passed in by the current thread. The current thread returns immediately,
215     * receiving the object passed to the exchange by that other thread.
216     *
217     * <p>If no other thread is already waiting at the exchange then the
218     * current thread is disabled for thread scheduling purposes and lies
219     * dormant until one of three things happens:
220     * <ul>
221     * <li>Some other thread enters the exchange; or
222     * <li>Some other thread {@link Thread#interrupt interrupts} the current
223     * thread; or
224     * <li>The specified waiting time elapses.
225     * </ul>
226     * <p>If the current thread:
227     * <ul>
228     * <li>has its interrupted status set on entry to this method; or
229     * <li>is {@link Thread#interrupt interrupted} while waiting
230     * for the exchange,
231     * </ul>
232     * then {@link InterruptedException} is thrown and the current thread's
233     * interrupted status is cleared.
234     *
235     * <p>If the specified waiting time elapses then {@link TimeoutException}
236     * is thrown.
237     * The given waiting time is a best-effort lower bound. If the time is
238     * less than or equal to zero, the method will not wait at all.
239     *
240     * @param x the object to exchange
241     * @param timeout the maximum time to wait
242 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
243 tim 1.1 * @return the object provided by the other thread.
244 dl 1.3 * @throws InterruptedException if current thread was interrupted
245     * while waiting
246 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
247     * another thread enters the exchange.
248     **/
249 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
250 tim 1.1 throws InterruptedException, TimeoutException {
251 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
252 tim 1.1 }
253    
254     }
255    
256