ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Exchanger.java (file contents):
Revision 1.29 by jsr166, Fri Sep 16 03:59:07 2005 UTC vs.
Revision 1.30 by dl, Sun Nov 6 15:30:24 2005 UTC

# Line 34 | Line 34 | import java.util.Random;
34   *       try {
35   *         while (currentBuffer != null) {
36   *           addToBuffer(currentBuffer);
37 < *           if (currentBuffer.full())
37 > *           if (currentBuffer.isFull())
38   *             currentBuffer = exchanger.exchange(currentBuffer);
39   *         }
40   *       } catch (InterruptedException ex) { ... handle ... }
# Line 47 | Line 47 | import java.util.Random;
47   *       try {
48   *         while (currentBuffer != null) {
49   *           takeFromBuffer(currentBuffer);
50 < *           if (currentBuffer.empty())
50 > *           if (currentBuffer.isEmpty())
51   *             currentBuffer = exchanger.exchange(currentBuffer);
52   *         }
53   *       } catch (InterruptedException ex) { ... handle ...}
# Line 104 | Line 104 | public class Exchanger<V> {
104       * common usages where only two threads ever meet to exchange
105       * items, but they prevent contention bottlenecks when an
106       * exchanger is used by a large number of threads.
107 +     *
108 +     * For more details, see the paper "A Scalable Elimination-based
109 +     * Exchange Channel" by William Scherer, Doug Lea, and Michael
110 +     * Scott in Proceedings of SCOOL05 workshop. Available at:
111 +     * http://hdl.handle.net/1802/2104
112       */
113  
114      /**
# Line 136 | Line 141 | public class Exchanger<V> {
141       * Each slot holds an AtomicReference<Node>, but this cannot be
142       * expressed for arrays, so elements are casted on each use.
143       */
144 <    private final AtomicReference[] arena;
144 >    private final AtomicReference<Node>[] arena;
145  
146      /** Generator for random backoffs and delays. */
147      private final Random random = new Random();
# Line 145 | Line 150 | public class Exchanger<V> {
150       * Creates a new Exchanger.
151       */
152      public Exchanger() {
153 <        arena = new AtomicReference[SIZE + 1];
153 >        arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1];
154          for (int i = 0; i < arena.length; ++i)
155 <            arena[i] = new AtomicReference();
155 >            arena[i] = new AtomicReference<Node>();
156      }
157  
158      /**
# Line 155 | Line 160 | public class Exchanger<V> {
160       * Uses Object, not "V" as argument and return value to simplify
161       * handling of internal sentinel values. Callers from public
162       * methods cast accordingly.
163 <     * @param item the item to exchange.
164 <     * @param timed true if the wait is timed.
165 <     * @param nanos if timed, the maximum wait time.
166 <     * @return the other thread's item.
163 >     *
164 >     * @param item the item to exchange
165 >     * @param timed true if the wait is timed
166 >     * @param nanos if timed, the maximum wait time
167 >     * @return the other thread's item
168       */
169      private Object doExchange(Object item, boolean timed, long nanos)
170          throws InterruptedException, TimeoutException {
171          Node me = new Node(item);
172 <        long lastTime = (timed)? System.nanoTime() : 0;
172 >        long lastTime = timed ? System.nanoTime() : 0;
173          int idx = 0;     // start out at slot representing top
174          int backoff = 0; // increases on failure to occupy a slot
175  
176          for (;;) {
177 <            AtomicReference<Node> slot = (AtomicReference<Node>)arena[idx];
177 >            AtomicReference<Node> slot = arena[idx];
178  
179              // If this slot is already occupied, there is a waiting item...
180              Node you = slot.get();
# Line 257 | Line 263 | public class Exchanger<V> {
263          /**
264           * Waits for and gets the hole filled in by another thread.
265           * Fails if timed out or interrupted before hole filled.
266 <         * @param timed true if the wait is timed.
267 <         * @param nanos if timed, the maximum wait time.
268 <         * @return on success, the hole; on failure, FAIL.
266 >         *
267 >         * @param timed true if the wait is timed
268 >         * @param nanos if timed, the maximum wait time
269 >         * @return on success, the hole; on failure, FAIL
270           */
271          Object waitForHole(boolean timed, long nanos) {
272 <            long lastTime = (timed)? System.nanoTime() : 0;
272 >            long lastTime = timed ? System.nanoTime() : 0;
273              Object h;
274              while ((h = get()) == null) {
275                  // If interrupted or timed out, try to cancel by
276                  // CASing FAIL as hole value.
277                  if (Thread.currentThread().isInterrupted() ||
278 <                    (timed && nanos <= 0))
279 <                    compareAndSet(null, FAIL);
278 >                    (timed && nanos <= 0)) {
279 >                    if (compareAndSet(null, FAIL))
280 >                        return FAIL;
281 >                }
282                  else if (!timed)
283                      LockSupport.park();
284                  else {
# Line 312 | Line 321 | public class Exchanger<V> {
321       * interrupted status is cleared.
322       *
323       * @param x the object to exchange
324 <     * @return the object provided by the other thread.
325 <     * @throws InterruptedException if current thread was interrupted
326 <     * while waiting
324 >     * @return the object provided by the other thread
325 >     * @throws InterruptedException if the current thread was
326 >     *         interrupted while waiting
327       */
328      public V exchange(V x) throws InterruptedException {
329          try {
# Line 361 | Line 370 | public class Exchanger<V> {
370       *
371       * @param x the object to exchange
372       * @param timeout the maximum time to wait
373 <     * @param unit the time unit of the <tt>timeout</tt> argument.
374 <     * @return the object provided by the other thread.
375 <     * @throws InterruptedException if current thread was interrupted
376 <     * while waiting
377 <     * @throws TimeoutException if the specified waiting time elapses before
378 <     * another thread enters the exchange.
373 >     * @param unit the time unit of the <tt>timeout</tt> argument
374 >     * @return the object provided by the other thread
375 >     * @throws InterruptedException if the current thread was
376 >     *         interrupted while waiting
377 >     * @throws TimeoutException if the specified waiting time elapses
378 >     *         before another thread enters the exchange
379       */
380      public V exchange(V x, long timeout, TimeUnit unit)
381          throws InterruptedException, TimeoutException {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines