ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1, JSR166_PRERELEASE_0_1
Changes since 1.1: +86 -4 lines
Log Message:
re-check-in initial implementations

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8    
9     /**
10     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
11     * can exchange objects. Each thread presents some object on entry to
12     * the {@link #exchange exchange} method, and receives the object presented by
13     * the other thread on return.
14     *
15     * <p><b>Sample Usage:</b>
16     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
17     * swap buffers between threads so that the thread filling the
18     * buffer gets a freshly
19     * emptied one when it needs it, handing off the filled one to
20     * the thread emptying the buffer.
21     * <pre>
22     * class FillAndEmpty {
23     * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
24     * Buffer initialEmptyBuffer = ... a made-up type
25     * Buffer initialFullBuffer = ...
26     *
27     * class FillingLoop implements Runnable {
28     * public void run() {
29     * Buffer currentBuffer = initialEmptyBuffer;
30     * try {
31     * while (currentBuffer != null) {
32     * addToBuffer(currentBuffer);
33     * if (currentBuffer.full())
34     * currentBuffer = exchanger.exchange(currentBuffer);
35     * }
36     * }
37     * catch (InterruptedException ex) { ... handle ... }
38     * }
39     * }
40     *
41     * class EmptyingLoop implements Runnable {
42     * public void run() {
43     * Buffer currentBuffer = initialFullBuffer;
44     * try {
45     * while (currentBuffer != null) {
46     * takeFromBuffer(currentBuffer);
47     * if (currentBuffer.empty())
48     * currentBuffer = exchanger.exchange(currentBuffer);
49     * }
50     * }
51     * catch (InterruptedException ex) { ... handle ...}
52     * }
53     * }
54     *
55     * void start() {
56     * new Thread(new FillingLoop()).start();
57     * new Thread(new EmptyingLoop()).start();
58     * }
59     * }
60     * </pre>
61     *
62     * @fixme change example to use a bounded queue?
63     *
64     * @since 1.5
65     * @spec JSR-166
66     * @revised $Date: 2003/03/31 03:50:08 $
67     * @editor $Author: dholmes $
68     */
69     public class Exchanger<V> {
70 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
71     private final Condition taken = lock.newCondition();
72    
73     /** Holder for the item being exchanged */
74     private V item;
75    
76     /**
77     * Arrival count transitions from 0 to 1 to 2 then back to 0
78     * during an exchange.
79     */
80     private int arrivalCount;
81    
82     private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
83     lock.lock();
84     try {
85     V other;
86     int count = ++arrivalCount;
87    
88     // If item is already waiting, replace it and signal other thread
89     if (count == 2) {
90     other = item;
91     item = x;
92     taken.signal();
93     return other;
94     }
95    
96     // Otherwise, set item and wait for another thread to
97     // replace it and signal us.
98    
99     item = x;
100     InterruptedException interrupted = null;
101     try {
102     while (arrivalCount != 2) {
103     if (!timed)
104     taken.await();
105     else if (nanos > 0)
106     nanos = taken.awaitNanos(nanos);
107     else
108     break; // timed out
109     }
110     }
111     catch (InterruptedException ie) {
112     interrupted = ie;
113     }
114    
115     // get and reset item and count after the wait.
116     other = item;
117     item = null;
118     count = arrivalCount;
119     arrivalCount = 0;
120    
121     // If the other thread replaced item, then we must
122     // continue even if cancelled.
123     if (count == 2) {
124     if (interrupted != null)
125     Thread.currentThread().interrupt();
126     return other;
127     }
128    
129     // Otherwise, no one is waiting for us, so we can just back out
130     if (interrupted != null) {
131     taken.signal(); // propagate to any other waiting thread
132     throw interrupted;
133     }
134     else // must be timeout
135     throw new TimeoutException();
136     }
137     finally {
138     lock.unlock();
139     }
140     }
141 tim 1.1
142     /**
143     * Create a new Exchanger
144     **/
145     public Exchanger() {
146     }
147    
148     /**
149     * Waits for another thread to arrive at this exchange point (unless
150     * it is {@link Thread#interrupt interrupted}),
151     * and then transfers the given object to it, receiving its object
152     * in return.
153     * <p>If another thread is already waiting at the exchange point then
154     * it is resumed for thread scheduling purposes and receives the object
155     * passed in by the current thread. The current thread returns immediately,
156     * receiving the object passed to the exchange by that other thread.
157     * <p>If no other thread is already waiting at the exchange then the
158     * current thread is disabled for thread scheduling purposes and lies
159     * dormant until one of two things happens:
160     * <ul>
161     * <li>Some other thread enters the exchange; or
162     * <li>Some other thread {@link Thread#interrupt interrupts} the current
163     * thread.
164     * </ul>
165     * <p>If the current thread:
166     * <ul>
167     * <li>has its interrupted status set on entry to this method; or
168     * <li>is {@link Thread#interrupt interrupted} while waiting
169     * for the exchange,
170     * </ul>
171     * then {@link InterruptedException} is thrown and the current thread's
172     * interrupted status is cleared.
173     *
174     * @param x the object to exchange
175     * @return the object provided by the other thread.
176     * @throws InterruptedException if current thread was interrupted while waiting
177     **/
178     public V exchange(V x) throws InterruptedException {
179 dl 1.2 try {
180     return doExchange(x, false, 0);
181     }
182     catch (TimeoutException cannotHappen) {
183     throw new Error(cannotHappen);
184     }
185 tim 1.1 }
186    
187     /**
188     * Waits for another thread to arrive at this exchange point (unless
189     * it is {@link Thread#interrupt interrupted}, or the specified waiting
190     * time elapses),
191     * and then transfers the given object to it, receiving its object
192     * in return.
193     *
194     * <p>If another thread is already waiting at the exchange point then
195     * it is resumed for thread scheduling purposes and receives the object
196     * passed in by the current thread. The current thread returns immediately,
197     * receiving the object passed to the exchange by that other thread.
198     *
199     * <p>If no other thread is already waiting at the exchange then the
200     * current thread is disabled for thread scheduling purposes and lies
201     * dormant until one of three things happens:
202     * <ul>
203     * <li>Some other thread enters the exchange; or
204     * <li>Some other thread {@link Thread#interrupt interrupts} the current
205     * thread; or
206     * <li>The specified waiting time elapses.
207     * </ul>
208     * <p>If the current thread:
209     * <ul>
210     * <li>has its interrupted status set on entry to this method; or
211     * <li>is {@link Thread#interrupt interrupted} while waiting
212     * for the exchange,
213     * </ul>
214     * then {@link InterruptedException} is thrown and the current thread's
215     * interrupted status is cleared.
216     *
217     * <p>If the specified waiting time elapses then {@link TimeoutException}
218     * is thrown.
219     * The given waiting time is a best-effort lower bound. If the time is
220     * less than or equal to zero, the method will not wait at all.
221     *
222     * @param x the object to exchange
223     * @param timeout the maximum time to wait
224 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
225 tim 1.1 * @return the object provided by the other thread.
226     * @throws InterruptedException if current thread was interrupted while waiting
227     * @throws TimeoutException if the specified waiting time elapses before
228     * another thread enters the exchange.
229     **/
230 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
231 tim 1.1 throws InterruptedException, TimeoutException {
232 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
233 tim 1.1 }
234    
235     }
236    
237