/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; /** * An Exchanger provides a synchronization point at which two threads * can exchange objects. Each thread presents some object on entry to * the {@link #exchange exchange} method, and receives the object presented by * the other thread on return. * *

Sample Usage: * Here are the highlights of a class that uses an Exchanger to * swap buffers between threads so that the thread filling the * buffer gets a freshly * emptied one when it needs it, handing off the filled one to * the thread emptying the buffer. *

 * class FillAndEmpty {
 *   Exchanger<Buffer> exchanger = new Exchanger();
 *   Buffer initialEmptyBuffer = ... a made-up type
 *   Buffer initialFullBuffer = ...
 *
 *   class FillingLoop implements Runnable {
 *     public void run() {
 *       Buffer currentBuffer = initialEmptyBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           addToBuffer(currentBuffer);
 *           if (currentBuffer.full())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       }
 *       catch (InterruptedException ex) { ... handle ... }
 *     }
 *   }
 *
 *   class EmptyingLoop implements Runnable {
 *     public void run() {
 *       Buffer currentBuffer = initialFullBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           takeFromBuffer(currentBuffer);
 *           if (currentBuffer.empty())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       }
 *       catch (InterruptedException ex) { ... handle ...}
 *     }
 *   }
 *
 *   void start() {
 *     new Thread(new FillingLoop()).start();
 *     new Thread(new EmptyingLoop()).start();
 *   }
 * }
 * 
* * @fixme change example to use a bounded queue? * * @since 1.5 * @spec JSR-166 * @revised $Date: 2003/06/24 14:34:47 $ * @editor $Author: dl $ * @author Doug Lea */ public class Exchanger { private final ReentrantLock lock = new ReentrantLock(); private final Condition taken = lock.newCondition(); /** Holder for the item being exchanged */ private V item; /** * Arrival count transitions from 0 to 1 to 2 then back to 0 * during an exchange. */ private int arrivalCount; /** * Main exchange function, handling the different policy variants. */ private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException { lock.lock(); try { V other; int count = ++arrivalCount; // If item is already waiting, replace it and signal other thread if (count == 2) { other = item; item = x; taken.signal(); return other; } // Otherwise, set item and wait for another thread to // replace it and signal us. item = x; InterruptedException interrupted = null; try { while (arrivalCount != 2) { if (!timed) taken.await(); else if (nanos > 0) nanos = taken.awaitNanos(nanos); else break; // timed out } } catch (InterruptedException ie) { interrupted = ie; } // get and reset item and count after the wait. other = item; item = null; count = arrivalCount; arrivalCount = 0; // If the other thread replaced item, then we must // continue even if cancelled. if (count == 2) { if (interrupted != null) Thread.currentThread().interrupt(); return other; } // Otherwise, no one is waiting for us, so we can just back out if (interrupted != null) { taken.signal(); // propagate to any other waiting thread throw interrupted; } else // must be timeout throw new TimeoutException(); } finally { lock.unlock(); } } /** * Create a new Exchanger **/ public Exchanger() { } /** * Waits for another thread to arrive at this exchange point (unless * it is {@link Thread#interrupt interrupted}), * and then transfers the given object to it, receiving its object * in return. *

If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. *

If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of two things happens: *

*

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @param x the object to exchange * @return the object provided by the other thread. * @throws InterruptedException if current thread was interrupted * while waiting **/ public V exchange(V x) throws InterruptedException { try { return doExchange(x, false, 0); } catch (TimeoutException cannotHappen) { throw new Error(cannotHappen); } } /** * Waits for another thread to arrive at this exchange point (unless * it is {@link Thread#interrupt interrupted}, or the specified waiting * time elapses), * and then transfers the given object to it, receiving its object * in return. * *

If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * *

If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of three things happens: *

*

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * *

If the specified waiting time elapses then {@link TimeoutException} * is thrown. * The given waiting time is a best-effort lower bound. If the time is * less than or equal to zero, the method will not wait at all. * * @param x the object to exchange * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument. * @return the object provided by the other thread. * @throws InterruptedException if current thread was interrupted * while waiting * @throws TimeoutException if the specified waiting time elapses before * another thread enters the exchange. **/ public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { return doExchange(x, true, unit.toNanos(timeout)); } }