/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.concurrent.locks.*; /** * A semaphore guaranteeing that threads invoking any of the {@link * #acquire() acquire} methods are allocated permits in the order in * which their invocation of those methods was processed * (first-in-first-out; FIFO). Note that FIFO ordering necessarily * applies to specific internal points of execution within these * methods. So, it is possible for one thread to invoke * acquire before another, but reach the ordering point after * the other, and similarly upon return from the method. * *

Because permits are allocated in-order, this class also provides * convenience methods to {@link #acquire(long) acquire} and {@link * #release(long) release} multiple permits at a time. * * @since 1.5 * @spec JSR-166 * @revised $Date: 2003/07/11 13:12:06 $ * @editor $Author: dl $ * @author Doug Lea * */ public class FairSemaphore extends Semaphore { /* * Basic algorithm is a variant of the one described * in CPJ book 2nd edition, section 3.7.3 */ /** * Nodes of the wait queue. Opportunistically subclasses ReentrantLock. */ private static class Node extends ReentrantLock { /** The condition to wait on */ Condition done = newCondition(); /** True if interrupted before released */ boolean cancelled; /** Number of permits remaining until can release */ long permitsNeeded; /** Next node of queue */ Node next; Node(long n) { permitsNeeded = n; } } /** Head of the wait queue */ private Node head; /** Pointer to last node of the wait queue */ private Node last; /** Add a new node to wait queue with given number of permits needed */ private Node enq(long n) { Node p = new Node(n); if (last == null) last = head = p; else last = last.next = p; return p; } /** Remove first node from wait queue */ private void removeFirst() { Node p = head; assert p != null; if (p != null && (head = p.next) == null) last = null; } /** * Construct a FairSemaphore with the given number of * permits. * @param permits the initial number of permits available */ public FairSemaphore(long permits) { super(permits, new ReentrantLock(true)); } /** * Acquires a permit from this semaphore, blocking until one is * available, or the thread is {@link Thread#interrupt interrupted}. * *

Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. *

If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: *

* *

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * * @see Thread#interrupt */ public void acquire() throws InterruptedException { acquire(1L); } /** * Acquires a permit from this semaphore, blocking until one is * available. * *

Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. *

If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * some other thread invokes the {@link #release} method for this * semaphore and the current thread happens to be chosen as the * thread to receive the permit. * *

If the current thread * is {@link Thread#interrupt interrupted} while waiting * for a permit then it will continue to wait, but the time at which * the thread is assigned a permit may change compared to the time it * would have received the permit had no interruption occurred. When the * thread does return from this method its interrupt status will be set. * */ public void acquireUninterruptibly() { acquireUninterruptibly(1L); } /** * Acquires a permit from this semaphore, only if one is available at the * time of invocation. *

Acquires a permit, if one is available and returns immediately, * with the value true, * reducing the number of available permits by one. * *

If no permit is available then this method will return * immediately with the value false. * * @return true if a permit was acquired and false * otherwise. */ public boolean tryAcquire() { return tryAcquire(1L); } /** * Acquires a permit from this semaphore, if one becomes available * within the given waiting time and the * current thread has not been {@link Thread#interrupt interrupted}. *

Acquires a permit, if one is available and returns immediately, * with the value true, * reducing the number of available permits by one. *

If no permit is available then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: *

*

If a permit is acquired then the value true is returned. *

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. *

If the specified waiting time elapses then the value false * is returned. * The given waiting time is a best-effort lower bound. If the time is * less than or equal to zero, the method will not wait at all. * * @param timeout the maximum time to wait for a permit * @param unit the time unit of the timeout argument. * @return true if a permit was acquired and false * if the waiting time elapsed before a permit was acquired. * * @throws InterruptedException if the current thread is interrupted * * @see Thread#interrupt * */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return tryAcquire(1L, timeout, unit); } /** * Releases a permit, returning it to the semaphore. *

Releases a permit, increasing the number of available permits * by one. * If any threads are blocking trying to acquire a permit, then one * is selected and given the permit that was just released. * That thread is re-enabled for thread scheduling purposes. *

There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */ public void release() { release(1L); } /** * Acquires the given number of permits from this semaphore, * blocking until all are available, * or the thread is {@link Thread#interrupt interrupted}. * *

Acquires the given number of permits, if they are available, * and returns immediately, * reducing the number of available permits by the given amount. * *

If insufficient permits are available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: *

* *

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * Any available permits are assigned to the next waiting thread as if * they had been made available by a call to {@link #release()}. * * @param permits the number of permits to acquire * * @throws InterruptedException if the current thread is interrupted * @throws IllegalArgumentException if permits less than zero. * * @see Thread#interrupt */ public void acquire(long permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); Node node = null; lock.lockInterruptibly(); try { long remaining = count - permits; if (remaining >= 0) { count = remaining; return; } count = 0; node = enq(-remaining); } finally { lock.unlock(); } node.lock(); try { while (node.permitsNeeded > 0) node.done.await(); } catch(InterruptedException ie) { if (node.permitsNeeded > 0) { node.cancelled = true; release(permits - node.permitsNeeded); throw ie; } else { // ignore interrupt Thread.currentThread().interrupt(); } } finally { node.unlock(); } } /** * Acquires the given number of permits from this semaphore, * blocking until all are available. * *

Acquires the given number of permits, if they are available, * and returns immediately, * reducing the number of available permits by the given amount. * *

If insufficient permits are available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * some other thread invokes one of the {@link #release() release} * methods for this semaphore, the current thread is next to be assigned * permits and the number of available permits satisfies this request. * *

If the current thread * is {@link Thread#interrupt interrupted} while waiting * for permits then it will continue to wait and its position in the * queue is not affected. When the * thread does return from this method its interrupt status will be set. * * @param permits the number of permits to acquire * @throws IllegalArgumentException if permits less than zero. * */ public void acquireUninterruptibly(long permits) { if (permits < 0) throw new IllegalArgumentException(); Node node = null; lock.lock(); try { long remaining = count - permits; if (remaining >= 0) { count = remaining; return; } count = 0; node = enq(-remaining); } finally { lock.unlock(); } node.lock(); try { while (node.permitsNeeded > 0) node.done.awaitUninterruptibly(); } finally { node.unlock(); } } /** * Acquires the given number of permits from this semaphore, only if * all are available at the time of invocation. *

Acquires the given number of permits, if they are available, and * returns immediately, with the value true, * reducing the number of available permits by the given amount. * *

If insufficient permits are available then this method will return * immediately with the value false and the number of available * permits is unchanged. * * @param permits the number of permits to acquire * * @return true if the permits were acquired and false * otherwise. * @throws IllegalArgumentException if permits less than zero. */ public boolean tryAcquire(long permits) { if (permits < 0) throw new IllegalArgumentException(); lock.lock(); try { long remaining = count - permits; if (remaining >= 0) { count = remaining; return true; } return false; } finally { lock.unlock(); } } /** * Acquires the given number of permits from this semaphore, if all * become available within the given waiting time and the * current thread has not been {@link Thread#interrupt interrupted}. *

Acquires the given number of permits, if they are available and * returns immediately, with the value true, * reducing the number of available permits by the given amount. *

If insufficient permits are available then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: *

*

If the permits are acquired then the value true is returned. *

If the current thread: *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * Any available permits are assigned to the next waiting thread as if * they had been made available by a call to {@link #release()}. * *

If the specified waiting time elapses then the value false * is returned. * The given waiting time is a best-effort lower bound. If the time is * less than or equal to zero, the method will not wait at all. * Any available permits are assigned to the next waiting thread as if * they had been made available by a call to {@link #release()}. * * @param permits the number of permits to acquire * @param timeout the maximum time to wait for the permits * @param unit the time unit of the timeout argument. * @return true if all permits were acquired and false * if the waiting time elapsed before all permits were acquired. * * @throws InterruptedException if the current thread is interrupted * @throws IllegalArgumentException if permits less than zero. * * @see Thread#interrupt * */ public boolean tryAcquire(long permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); Node node = null; long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { long remaining = count - permits; if (remaining >= 0) { count = remaining; return true; } if (nanos <= 0) return false; count = 0; node = enq(-remaining); } finally { lock.unlock(); } node.lock(); try { while (node.permitsNeeded > 0) { nanos = node.done.awaitNanos(nanos); if (nanos <= 0) { release(permits - node.permitsNeeded); return false; } } return true; } catch(InterruptedException ie) { if (node.permitsNeeded > 0) { node.cancelled = true; release(permits - node.permitsNeeded); throw ie; } else { // ignore interrupt Thread.currentThread().interrupt(); return true; } } finally { node.unlock(); } } /** * Releases the given number of permits, returning them to the semaphore. *

Releases the given number of permits, increasing the number of * available permits by that amount. * If any threads are blocking trying to acquire permits, then the * one that has been waiting the longest * is selected and given the permits that were just released. * If the number of available permits satisfies that thread's request * then that thread is re-enabled for thread scheduling purposes; otherwise * the thread continues to wait. If there are still permits available * after the first thread's request has been satisfied, then those permits * are assigned to the next waiting thread. If it is satisfied then it is * re-enabled for thread scheduling purposes. This continues until there * are insufficient permits to satisfy the next waiting thread, or there * are no more waiting threads. * *

There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link Semaphore#acquire acquire}. * Correct usage of a semaphore is established by programming convention * in the application. * * @param permits the number of permits to release * @throws IllegalArgumentException if permits less than zero. */ public void release(long permits) { if (permits < 0) throw new IllegalArgumentException(); if (permits == 0) return; // Even when using fair locks, releases should try to barge in. if (!lock.tryLock()) lock.lock(); try { long p = permits; while (p > 0) { Node node = head; if (node == null) { count += p; p = 0; } else { node.lock(); try { if (node.cancelled) removeFirst(); else if (node.permitsNeeded > p) { node.permitsNeeded -= p; p = 0; } else { p -= node.permitsNeeded; node.permitsNeeded = 0; node.done.signal(); removeFirst(); } } finally { node.unlock(); } } } } finally { lock.unlock(); } } }