ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.9
Committed: Mon Aug 25 22:35:45 2003 UTC (20 years, 9 months ago) by dholmes
Branch: MAIN
Changes since 1.8: +11 -11 lines
Log Message:
As per Eamonn's comments:
 - changed text that referred to threads being chosen (as if random)
to say "if the thread is next to be assigned permits" (as per FIFO)
 - clarified what happens to permits when acquire(n) times out or is interrupted

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 dl 1.1
10     /**
11 dl 1.2 * A semaphore guaranteeing that threads invoking any of the {@link
12     * #acquire() acquire} methods are allocated permits in the order in
13     * which their invocation of those methods was processed
14     * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15     * applies to specific internal points of execution within these
16     * methods. So, it is possible for one thread to invoke
17     * <tt>acquire</tt> before another, but reach the ordering point after
18     * the other, and similarly upon return from the method.
19 dl 1.1 *
20 dl 1.2 * <p>Because permits are allocated in-order, this class also provides
21     * convenience methods to {@link #acquire(long) acquire} and {@link
22     * #release(long) release} multiple permits at a time.
23 dl 1.1 *
24     * @since 1.5
25     * @spec JSR-166
26 dholmes 1.9 * @revised $Date: 2003/08/25 19:27:58 $
27     * @editor $Author: dl $
28 dl 1.3 * @author Doug Lea
29 dl 1.1 *
30     */
31     public class FairSemaphore extends Semaphore {
32 dl 1.8 private static final long serialVersionUID = -3222578661600680210L;
33 dl 1.1
34     /*
35 dl 1.6 * Basic algorithm is a variant of the one described
36     * in CPJ book 2nd edition, section 3.7.3
37 dl 1.1 */
38    
39     /**
40 dl 1.6 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
41     */
42     private static class Node extends ReentrantLock {
43     /** The condition to wait on */
44     Condition done = newCondition();
45     /** True if interrupted before released */
46     boolean cancelled;
47     /** Number of permits remaining until can release */
48     long permitsNeeded;
49     /** Next node of queue */
50     Node next;
51    
52     Node(long n) { permitsNeeded = n; }
53     }
54    
55     /** Head of the wait queue */
56 dl 1.8 private transient Node head;
57 dl 1.6
58     /** Pointer to last node of the wait queue */
59 dl 1.8 private transient Node last;
60 dl 1.6
61     /** Add a new node to wait queue with given number of permits needed */
62     private Node enq(long n) {
63     Node p = new Node(n);
64     if (last == null)
65     last = head = p;
66     else
67     last = last.next = p;
68     return p;
69     }
70    
71     /** Remove first node from wait queue */
72     private void removeFirst() {
73     Node p = head;
74     assert p != null;
75     if (p != null && (head = p.next) == null)
76     last = null;
77     }
78    
79     /**
80     * Construct a <tt>FairSemaphore</tt> with the given number of
81 dl 1.1 * permits.
82     * @param permits the initial number of permits available
83     */
84     public FairSemaphore(long permits) {
85 dl 1.4 super(permits, new ReentrantLock(true));
86 dl 1.1 }
87    
88     /**
89 dl 1.6 * Acquires a permit from this semaphore, blocking until one is
90     * available, or the thread is {@link Thread#interrupt interrupted}.
91     *
92     * <p>Acquires a permit, if one is available and returns immediately,
93     * reducing the number of available permits by one.
94     * <p>If no permit is available then the current thread becomes
95     * disabled for thread scheduling purposes and lies dormant until
96     * one of two things happens:
97     * <ul>
98     * <li>Some other thread invokes the {@link #release} method for this
99 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit; or
100 dl 1.6 * <li>Some other thread {@link Thread#interrupt interrupts} the current
101     * thread.
102     * </ul>
103     *
104     * <p>If the current thread:
105     * <ul>
106     * <li>has its interrupted status set on entry to this method; or
107     * <li>is {@link Thread#interrupt interrupted} while waiting
108     * for a permit,
109     * </ul>
110     * then {@link InterruptedException} is thrown and the current thread's
111     * interrupted status is cleared.
112     *
113     * @throws InterruptedException if the current thread is interrupted
114     *
115     * @see Thread#interrupt
116     */
117     public void acquire() throws InterruptedException {
118     acquire(1L);
119     }
120    
121     /**
122     * Acquires a permit from this semaphore, blocking until one is
123     * available.
124     *
125     * <p>Acquires a permit, if one is available and returns immediately,
126     * reducing the number of available permits by one.
127     * <p>If no permit is available then the current thread becomes
128     * disabled for thread scheduling purposes and lies dormant until
129     * some other thread invokes the {@link #release} method for this
130 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit.
131 dl 1.6 *
132     * <p>If the current thread
133     * is {@link Thread#interrupt interrupted} while waiting
134     * for a permit then it will continue to wait, but the time at which
135     * the thread is assigned a permit may change compared to the time it
136     * would have received the permit had no interruption occurred. When the
137     * thread does return from this method its interrupt status will be set.
138     *
139     */
140     public void acquireUninterruptibly() {
141     acquireUninterruptibly(1L);
142     }
143    
144     /**
145     * Acquires a permit from this semaphore, only if one is available at the
146     * time of invocation.
147     * <p>Acquires a permit, if one is available and returns immediately,
148     * with the value <tt>true</tt>,
149     * reducing the number of available permits by one.
150     *
151     * <p>If no permit is available then this method will return
152     * immediately with the value <tt>false</tt>.
153     *
154     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
155     * otherwise.
156     */
157     public boolean tryAcquire() {
158     return tryAcquire(1L);
159     }
160    
161     /**
162     * Acquires a permit from this semaphore, if one becomes available
163     * within the given waiting time and the
164     * current thread has not been {@link Thread#interrupt interrupted}.
165     * <p>Acquires a permit, if one is available and returns immediately,
166     * with the value <tt>true</tt>,
167     * reducing the number of available permits by one.
168     * <p>If no permit is available then
169     * the current thread becomes disabled for thread scheduling
170     * purposes and lies dormant until one of three things happens:
171     * <ul>
172     * <li>Some other thread invokes the {@link #release} method for this
173 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit; or
174 dl 1.6 * <li>Some other thread {@link Thread#interrupt interrupts} the current
175     * thread; or
176     * <li>The specified waiting time elapses.
177     * </ul>
178     * <p>If a permit is acquired then the value <tt>true</tt> is returned.
179     * <p>If the current thread:
180     * <ul>
181     * <li>has its interrupted status set on entry to this method; or
182     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
183     * a permit,
184     * </ul>
185     * then {@link InterruptedException} is thrown and the current thread's
186     * interrupted status is cleared.
187     * <p>If the specified waiting time elapses then the value <tt>false</tt>
188     * is returned.
189     * The given waiting time is a best-effort lower bound. If the time is
190     * less than or equal to zero, the method will not wait at all.
191     *
192     * @param timeout the maximum time to wait for a permit
193     * @param unit the time unit of the <tt>timeout</tt> argument.
194     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
195     * if the waiting time elapsed before a permit was acquired.
196     *
197     * @throws InterruptedException if the current thread is interrupted
198     *
199     * @see Thread#interrupt
200     *
201     */
202     public boolean tryAcquire(long timeout, TimeUnit unit)
203     throws InterruptedException {
204     return tryAcquire(1L, timeout, unit);
205     }
206    
207     /**
208     * Releases a permit, returning it to the semaphore.
209     * <p>Releases a permit, increasing the number of available permits
210     * by one.
211     * If any threads are blocking trying to acquire a permit, then one
212     * is selected and given the permit that was just released.
213     * That thread is re-enabled for thread scheduling purposes.
214     * <p>There is no requirement that a thread that releases a permit must
215     * have acquired that permit by calling {@link #acquire}.
216     * Correct usage of a semaphore is established by programming convention
217     * in the application.
218     */
219     public void release() {
220     release(1L);
221     }
222    
223    
224     /**
225 dl 1.1 * Acquires the given number of permits from this semaphore,
226     * blocking until all are available,
227     * or the thread is {@link Thread#interrupt interrupted}.
228     *
229     * <p>Acquires the given number of permits, if they are available,
230     * and returns immediately,
231     * reducing the number of available permits by the given amount.
232     *
233     * <p>If insufficient permits are available then the current thread becomes
234     * disabled for thread scheduling purposes and lies dormant until
235     * one of two things happens:
236     * <ul>
237     * <li>Some other thread invokes one of the {@link #release() release}
238     * methods for this semaphore, the current thread is next to be assigned
239     * permits and the number of available permits satisfies this request; or
240     * <li>Some other thread {@link Thread#interrupt interrupts} the current
241     * thread.
242     * </ul>
243     *
244     * <p>If the current thread:
245     * <ul>
246     * <li>has its interrupted status set on entry to this method; or
247     * <li>is {@link Thread#interrupt interrupted} while waiting
248     * for a permit,
249     * </ul>
250     * then {@link InterruptedException} is thrown and the current thread's
251     * interrupted status is cleared.
252 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
253     * assigned to the next waiting thread(s), as if
254 dl 1.1 * they had been made available by a call to {@link #release()}.
255     *
256     * @param permits the number of permits to acquire
257     *
258     * @throws InterruptedException if the current thread is interrupted
259     * @throws IllegalArgumentException if permits less than zero.
260     *
261     * @see Thread#interrupt
262     */
263     public void acquire(long permits) throws InterruptedException {
264     if (permits < 0) throw new IllegalArgumentException();
265 dl 1.6 Node node = null;
266 dl 1.1 lock.lockInterruptibly();
267     try {
268 dl 1.6 long remaining = count - permits;
269     if (remaining >= 0) {
270     count = remaining;
271     return;
272     }
273     count = 0;
274     node = enq(-remaining);
275 tim 1.7 } finally {
276 dl 1.1 lock.unlock();
277     }
278 dl 1.6
279     node.lock();
280     try {
281     while (node.permitsNeeded > 0)
282     node.done.await();
283 tim 1.7 } catch(InterruptedException ie) {
284 dl 1.6 if (node.permitsNeeded > 0) {
285     node.cancelled = true;
286     release(permits - node.permitsNeeded);
287     throw ie;
288 tim 1.7 } else { // ignore interrupt
289 dl 1.6 Thread.currentThread().interrupt();
290     }
291 tim 1.7 } finally {
292 dl 1.6 node.unlock();
293     }
294 dl 1.1 }
295    
296     /**
297     * Acquires the given number of permits from this semaphore,
298     * blocking until all are available.
299     *
300     * <p>Acquires the given number of permits, if they are available,
301     * and returns immediately,
302     * reducing the number of available permits by the given amount.
303     *
304     * <p>If insufficient permits are available then the current thread becomes
305     * disabled for thread scheduling purposes and lies dormant until
306     * some other thread invokes one of the {@link #release() release}
307     * methods for this semaphore, the current thread is next to be assigned
308     * permits and the number of available permits satisfies this request.
309     *
310     * <p>If the current thread
311     * is {@link Thread#interrupt interrupted} while waiting
312     * for permits then it will continue to wait and its position in the
313     * queue is not affected. When the
314     * thread does return from this method its interrupt status will be set.
315     *
316     * @param permits the number of permits to acquire
317     * @throws IllegalArgumentException if permits less than zero.
318     *
319     */
320     public void acquireUninterruptibly(long permits) {
321     if (permits < 0) throw new IllegalArgumentException();
322 dl 1.6 Node node = null;
323 dl 1.1 lock.lock();
324     try {
325 dl 1.6 long remaining = count - permits;
326     if (remaining >= 0) {
327     count = remaining;
328     return;
329     }
330     count = 0;
331     node = enq(-remaining);
332 tim 1.7 } finally {
333 dl 1.1 lock.unlock();
334     }
335 dl 1.6
336     node.lock();
337     try {
338     while (node.permitsNeeded > 0)
339     node.done.awaitUninterruptibly();
340 tim 1.7 } finally {
341 dl 1.6 node.unlock();
342     }
343 dl 1.1 }
344    
345     /**
346     * Acquires the given number of permits from this semaphore, only if
347     * all are available at the time of invocation.
348     * <p>Acquires the given number of permits, if they are available, and
349     * returns immediately, with the value <tt>true</tt>,
350     * reducing the number of available permits by the given amount.
351     *
352     * <p>If insufficient permits are available then this method will return
353     * immediately with the value <tt>false</tt> and the number of available
354     * permits is unchanged.
355     *
356     * @param permits the number of permits to acquire
357     *
358     * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
359     * otherwise.
360     * @throws IllegalArgumentException if permits less than zero.
361     */
362     public boolean tryAcquire(long permits) {
363     if (permits < 0) throw new IllegalArgumentException();
364     lock.lock();
365     try {
366 dl 1.6 long remaining = count - permits;
367     if (remaining >= 0) {
368     count = remaining;
369 dl 1.1 return true;
370     }
371     return false;
372 tim 1.7 } finally {
373 dl 1.1 lock.unlock();
374     }
375     }
376    
377     /**
378     * Acquires the given number of permits from this semaphore, if all
379     * become available within the given waiting time and the
380     * current thread has not been {@link Thread#interrupt interrupted}.
381     * <p>Acquires the given number of permits, if they are available and
382     * returns immediately, with the value <tt>true</tt>,
383     * reducing the number of available permits by the given amount.
384     * <p>If insufficient permits are available then
385     * the current thread becomes disabled for thread scheduling
386     * purposes and lies dormant until one of three things happens:
387     * <ul>
388     * <li>Some other thread invokes one of the {@link #release() release}
389     * methods for this semaphore, the current thread is next to be assigned
390     * permits and the number of available permits satisfies this request; or
391     * <li>Some other thread {@link Thread#interrupt interrupts} the current
392     * thread; or
393     * <li>The specified waiting time elapses.
394     * </ul>
395     * <p>If the permits are acquired then the value <tt>true</tt> is returned.
396     * <p>If the current thread:
397     * <ul>
398     * <li>has its interrupted status set on entry to this method; or
399     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
400     * the permits,
401     * </ul>
402     * then {@link InterruptedException} is thrown and the current thread's
403     * interrupted status is cleared.
404 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
405     * assigned to the next waiting thread(s), as if
406 dl 1.1 * they had been made available by a call to {@link #release()}.
407     *
408     * <p>If the specified waiting time elapses then the value <tt>false</tt>
409     * is returned.
410     * The given waiting time is a best-effort lower bound. If the time is
411     * less than or equal to zero, the method will not wait at all.
412 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
413     * assigned to the next waiting thread(s), as if
414 dl 1.1 * they had been made available by a call to {@link #release()}.
415     *
416     * @param permits the number of permits to acquire
417     * @param timeout the maximum time to wait for the permits
418 dl 1.6 * @param unit the time unit of the <tt>timeout</tt> argument.
419 dl 1.1 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
420     * if the waiting time elapsed before all permits were acquired.
421     *
422     * @throws InterruptedException if the current thread is interrupted
423     * @throws IllegalArgumentException if permits less than zero.
424     *
425     * @see Thread#interrupt
426     *
427     */
428 dl 1.6 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
429 dl 1.1 throws InterruptedException {
430     if (permits < 0) throw new IllegalArgumentException();
431 dl 1.6 Node node = null;
432     long nanos = unit.toNanos(timeout);
433 dl 1.1 lock.lockInterruptibly();
434     try {
435 dl 1.6 long remaining = count - permits;
436     if (remaining >= 0) {
437     count = remaining;
438     return true;
439     }
440     if (nanos <= 0)
441     return false;
442     count = 0;
443     node = enq(-remaining);
444 tim 1.7 } finally {
445 dl 1.6 lock.unlock();
446     }
447    
448     node.lock();
449     try {
450     while (node.permitsNeeded > 0) {
451     nanos = node.done.awaitNanos(nanos);
452     if (nanos <= 0) {
453     release(permits - node.permitsNeeded);
454     return false;
455 dl 1.1 }
456     }
457 dl 1.6 return true;
458 tim 1.7 } catch(InterruptedException ie) {
459 dl 1.6 if (node.permitsNeeded > 0) {
460     node.cancelled = true;
461     release(permits - node.permitsNeeded);
462     throw ie;
463 tim 1.7 } else { // ignore interrupt
464 dl 1.6 Thread.currentThread().interrupt();
465     return true;
466     }
467 tim 1.7 } finally {
468 dl 1.6 node.unlock();
469 dl 1.1 }
470     }
471    
472    
473     /**
474     * Releases the given number of permits, returning them to the semaphore.
475     * <p>Releases the given number of permits, increasing the number of
476     * available permits by that amount.
477     * If any threads are blocking trying to acquire permits, then the
478     * one that has been waiting the longest
479     * is selected and given the permits that were just released.
480     * If the number of available permits satisfies that thread's request
481     * then that thread is re-enabled for thread scheduling purposes; otherwise
482     * the thread continues to wait. If there are still permits available
483     * after the first thread's request has been satisfied, then those permits
484     * are assigned to the next waiting thread. If it is satisfied then it is
485     * re-enabled for thread scheduling purposes. This continues until there
486     * are insufficient permits to satisfy the next waiting thread, or there
487     * are no more waiting threads.
488     *
489     * <p>There is no requirement that a thread that releases a permit must
490     * have acquired that permit by calling {@link Semaphore#acquire acquire}.
491     * Correct usage of a semaphore is established by programming convention
492     * in the application.
493     *
494     * @param permits the number of permits to release
495     * @throws IllegalArgumentException if permits less than zero.
496     */
497     public void release(long permits) {
498     if (permits < 0) throw new IllegalArgumentException();
499 dl 1.6 if (permits == 0)
500     return;
501     // Even when using fair locks, releases should try to barge in.
502 dl 1.5 if (!lock.tryLock())
503     lock.lock();
504 dl 1.1 try {
505 dl 1.6 long p = permits;
506     while (p > 0) {
507     Node node = head;
508     if (node == null) {
509     count += p;
510     p = 0;
511 tim 1.7 } else {
512 dl 1.6 node.lock();
513     try {
514     if (node.cancelled)
515     removeFirst();
516     else if (node.permitsNeeded > p) {
517     node.permitsNeeded -= p;
518     p = 0;
519 tim 1.7 } else {
520 dl 1.6 p -= node.permitsNeeded;
521     node.permitsNeeded = 0;
522     node.done.signal();
523     removeFirst();
524     }
525 tim 1.7 } finally {
526 dl 1.6 node.unlock();
527     }
528     }
529     }
530 tim 1.7 } finally {
531 dl 1.1 lock.unlock();
532     }
533     }
534     }