ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.11
Committed: Sun Aug 31 13:33:13 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.10: +1 -4 lines
Log Message:
Removed non-standard tags and misc javadoc cleanup

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 dl 1.1
10     /**
11 dl 1.2 * A semaphore guaranteeing that threads invoking any of the {@link
12     * #acquire() acquire} methods are allocated permits in the order in
13     * which their invocation of those methods was processed
14     * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15     * applies to specific internal points of execution within these
16     * methods. So, it is possible for one thread to invoke
17     * <tt>acquire</tt> before another, but reach the ordering point after
18     * the other, and similarly upon return from the method.
19 dl 1.1 *
20 dl 1.2 * <p>Because permits are allocated in-order, this class also provides
21     * convenience methods to {@link #acquire(long) acquire} and {@link
22 dl 1.11 * #release(long) release} multiple permits at a time.
23 dl 1.1 *
24     * @since 1.5
25 dl 1.3 * @author Doug Lea
26 dl 1.1 *
27     */
28     public class FairSemaphore extends Semaphore {
29 dl 1.8 private static final long serialVersionUID = -3222578661600680210L;
30 dl 1.1
31     /*
32 dl 1.6 * Basic algorithm is a variant of the one described
33     * in CPJ book 2nd edition, section 3.7.3
34 dl 1.1 */
35    
36     /**
37 dl 1.6 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
38     */
39     private static class Node extends ReentrantLock {
40     /** The condition to wait on */
41     Condition done = newCondition();
42     /** True if interrupted before released */
43     boolean cancelled;
44     /** Number of permits remaining until can release */
45     long permitsNeeded;
46     /** Next node of queue */
47     Node next;
48    
49     Node(long n) { permitsNeeded = n; }
50     }
51    
52     /** Head of the wait queue */
53 dl 1.8 private transient Node head;
54 dl 1.6
55     /** Pointer to last node of the wait queue */
56 dl 1.8 private transient Node last;
57 dl 1.6
58     /** Add a new node to wait queue with given number of permits needed */
59     private Node enq(long n) {
60     Node p = new Node(n);
61     if (last == null)
62     last = head = p;
63     else
64     last = last.next = p;
65     return p;
66     }
67    
68     /** Remove first node from wait queue */
69     private void removeFirst() {
70     Node p = head;
71     assert p != null;
72     if (p != null && (head = p.next) == null)
73     last = null;
74     }
75    
76     /**
77     * Construct a <tt>FairSemaphore</tt> with the given number of
78 dl 1.1 * permits.
79     * @param permits the initial number of permits available
80     */
81     public FairSemaphore(long permits) {
82 dl 1.4 super(permits, new ReentrantLock(true));
83 dl 1.1 }
84    
85     /**
86 dl 1.6 * Acquires a permit from this semaphore, blocking until one is
87     * available, or the thread is {@link Thread#interrupt interrupted}.
88     *
89     * <p>Acquires a permit, if one is available and returns immediately,
90     * reducing the number of available permits by one.
91     * <p>If no permit is available then the current thread becomes
92     * disabled for thread scheduling purposes and lies dormant until
93     * one of two things happens:
94     * <ul>
95     * <li>Some other thread invokes the {@link #release} method for this
96 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit; or
97 dl 1.6 * <li>Some other thread {@link Thread#interrupt interrupts} the current
98     * thread.
99     * </ul>
100     *
101     * <p>If the current thread:
102     * <ul>
103     * <li>has its interrupted status set on entry to this method; or
104     * <li>is {@link Thread#interrupt interrupted} while waiting
105     * for a permit,
106     * </ul>
107     * then {@link InterruptedException} is thrown and the current thread's
108     * interrupted status is cleared.
109     *
110     * @throws InterruptedException if the current thread is interrupted
111     *
112     * @see Thread#interrupt
113     */
114     public void acquire() throws InterruptedException {
115     acquire(1L);
116     }
117    
118     /**
119     * Acquires a permit from this semaphore, blocking until one is
120     * available.
121     *
122     * <p>Acquires a permit, if one is available and returns immediately,
123     * reducing the number of available permits by one.
124     * <p>If no permit is available then the current thread becomes
125     * disabled for thread scheduling purposes and lies dormant until
126     * some other thread invokes the {@link #release} method for this
127 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit.
128 dl 1.6 *
129     * <p>If the current thread
130     * is {@link Thread#interrupt interrupted} while waiting
131     * for a permit then it will continue to wait, but the time at which
132     * the thread is assigned a permit may change compared to the time it
133     * would have received the permit had no interruption occurred. When the
134     * thread does return from this method its interrupt status will be set.
135     *
136     */
137     public void acquireUninterruptibly() {
138     acquireUninterruptibly(1L);
139     }
140    
141     /**
142     * Acquires a permit from this semaphore, only if one is available at the
143     * time of invocation.
144     * <p>Acquires a permit, if one is available and returns immediately,
145     * with the value <tt>true</tt>,
146     * reducing the number of available permits by one.
147     *
148     * <p>If no permit is available then this method will return
149     * immediately with the value <tt>false</tt>.
150     *
151     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
152     * otherwise.
153     */
154     public boolean tryAcquire() {
155     return tryAcquire(1L);
156     }
157    
158     /**
159     * Acquires a permit from this semaphore, if one becomes available
160     * within the given waiting time and the
161     * current thread has not been {@link Thread#interrupt interrupted}.
162     * <p>Acquires a permit, if one is available and returns immediately,
163     * with the value <tt>true</tt>,
164     * reducing the number of available permits by one.
165     * <p>If no permit is available then
166     * the current thread becomes disabled for thread scheduling
167     * purposes and lies dormant until one of three things happens:
168     * <ul>
169     * <li>Some other thread invokes the {@link #release} method for this
170 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit; or
171 dl 1.6 * <li>Some other thread {@link Thread#interrupt interrupts} the current
172     * thread; or
173     * <li>The specified waiting time elapses.
174     * </ul>
175     * <p>If a permit is acquired then the value <tt>true</tt> is returned.
176     * <p>If the current thread:
177     * <ul>
178     * <li>has its interrupted status set on entry to this method; or
179     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
180     * a permit,
181     * </ul>
182     * then {@link InterruptedException} is thrown and the current thread's
183     * interrupted status is cleared.
184     * <p>If the specified waiting time elapses then the value <tt>false</tt>
185     * is returned.
186 dholmes 1.10 * If the time is less than or equal to zero, the method will not wait
187     * at all.
188 dl 1.6 *
189     * @param timeout the maximum time to wait for a permit
190     * @param unit the time unit of the <tt>timeout</tt> argument.
191     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
192     * if the waiting time elapsed before a permit was acquired.
193     *
194     * @throws InterruptedException if the current thread is interrupted
195     *
196     * @see Thread#interrupt
197     *
198     */
199     public boolean tryAcquire(long timeout, TimeUnit unit)
200     throws InterruptedException {
201     return tryAcquire(1L, timeout, unit);
202     }
203    
204     /**
205     * Releases a permit, returning it to the semaphore.
206     * <p>Releases a permit, increasing the number of available permits
207     * by one.
208     * If any threads are blocking trying to acquire a permit, then one
209     * is selected and given the permit that was just released.
210     * That thread is re-enabled for thread scheduling purposes.
211     * <p>There is no requirement that a thread that releases a permit must
212     * have acquired that permit by calling {@link #acquire}.
213     * Correct usage of a semaphore is established by programming convention
214     * in the application.
215     */
216     public void release() {
217     release(1L);
218     }
219    
220    
221     /**
222 dl 1.1 * Acquires the given number of permits from this semaphore,
223     * blocking until all are available,
224     * or the thread is {@link Thread#interrupt interrupted}.
225     *
226     * <p>Acquires the given number of permits, if they are available,
227     * and returns immediately,
228     * reducing the number of available permits by the given amount.
229     *
230     * <p>If insufficient permits are available then the current thread becomes
231     * disabled for thread scheduling purposes and lies dormant until
232     * one of two things happens:
233     * <ul>
234     * <li>Some other thread invokes one of the {@link #release() release}
235     * methods for this semaphore, the current thread is next to be assigned
236     * permits and the number of available permits satisfies this request; or
237     * <li>Some other thread {@link Thread#interrupt interrupts} the current
238     * thread.
239     * </ul>
240     *
241     * <p>If the current thread:
242     * <ul>
243     * <li>has its interrupted status set on entry to this method; or
244     * <li>is {@link Thread#interrupt interrupted} while waiting
245     * for a permit,
246     * </ul>
247     * then {@link InterruptedException} is thrown and the current thread's
248     * interrupted status is cleared.
249 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
250     * assigned to the next waiting thread(s), as if
251 dl 1.1 * they had been made available by a call to {@link #release()}.
252     *
253     * @param permits the number of permits to acquire
254     *
255     * @throws InterruptedException if the current thread is interrupted
256     * @throws IllegalArgumentException if permits less than zero.
257     *
258     * @see Thread#interrupt
259     */
260     public void acquire(long permits) throws InterruptedException {
261     if (permits < 0) throw new IllegalArgumentException();
262 dl 1.6 Node node = null;
263 dl 1.1 lock.lockInterruptibly();
264     try {
265 dl 1.6 long remaining = count - permits;
266     if (remaining >= 0) {
267     count = remaining;
268     return;
269     }
270     count = 0;
271     node = enq(-remaining);
272 tim 1.7 } finally {
273 dl 1.1 lock.unlock();
274     }
275 dl 1.6
276     node.lock();
277     try {
278     while (node.permitsNeeded > 0)
279     node.done.await();
280 tim 1.7 } catch(InterruptedException ie) {
281 dl 1.6 if (node.permitsNeeded > 0) {
282     node.cancelled = true;
283     release(permits - node.permitsNeeded);
284     throw ie;
285 tim 1.7 } else { // ignore interrupt
286 dl 1.6 Thread.currentThread().interrupt();
287     }
288 tim 1.7 } finally {
289 dl 1.6 node.unlock();
290     }
291 dl 1.1 }
292    
293     /**
294     * Acquires the given number of permits from this semaphore,
295     * blocking until all are available.
296     *
297     * <p>Acquires the given number of permits, if they are available,
298     * and returns immediately,
299     * reducing the number of available permits by the given amount.
300     *
301     * <p>If insufficient permits are available then the current thread becomes
302     * disabled for thread scheduling purposes and lies dormant until
303     * some other thread invokes one of the {@link #release() release}
304     * methods for this semaphore, the current thread is next to be assigned
305     * permits and the number of available permits satisfies this request.
306     *
307     * <p>If the current thread
308     * is {@link Thread#interrupt interrupted} while waiting
309     * for permits then it will continue to wait and its position in the
310     * queue is not affected. When the
311     * thread does return from this method its interrupt status will be set.
312     *
313     * @param permits the number of permits to acquire
314     * @throws IllegalArgumentException if permits less than zero.
315     *
316     */
317     public void acquireUninterruptibly(long permits) {
318     if (permits < 0) throw new IllegalArgumentException();
319 dl 1.6 Node node = null;
320 dl 1.1 lock.lock();
321     try {
322 dl 1.6 long remaining = count - permits;
323     if (remaining >= 0) {
324     count = remaining;
325     return;
326     }
327     count = 0;
328     node = enq(-remaining);
329 tim 1.7 } finally {
330 dl 1.1 lock.unlock();
331     }
332 dl 1.6
333     node.lock();
334     try {
335     while (node.permitsNeeded > 0)
336     node.done.awaitUninterruptibly();
337 tim 1.7 } finally {
338 dl 1.6 node.unlock();
339     }
340 dl 1.1 }
341    
342     /**
343     * Acquires the given number of permits from this semaphore, only if
344     * all are available at the time of invocation.
345     * <p>Acquires the given number of permits, if they are available, and
346     * returns immediately, with the value <tt>true</tt>,
347     * reducing the number of available permits by the given amount.
348     *
349     * <p>If insufficient permits are available then this method will return
350     * immediately with the value <tt>false</tt> and the number of available
351     * permits is unchanged.
352     *
353     * @param permits the number of permits to acquire
354     *
355     * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
356     * otherwise.
357     * @throws IllegalArgumentException if permits less than zero.
358     */
359     public boolean tryAcquire(long permits) {
360     if (permits < 0) throw new IllegalArgumentException();
361     lock.lock();
362     try {
363 dl 1.6 long remaining = count - permits;
364     if (remaining >= 0) {
365     count = remaining;
366 dl 1.1 return true;
367     }
368     return false;
369 tim 1.7 } finally {
370 dl 1.1 lock.unlock();
371     }
372     }
373    
374     /**
375     * Acquires the given number of permits from this semaphore, if all
376     * become available within the given waiting time and the
377     * current thread has not been {@link Thread#interrupt interrupted}.
378     * <p>Acquires the given number of permits, if they are available and
379     * returns immediately, with the value <tt>true</tt>,
380     * reducing the number of available permits by the given amount.
381     * <p>If insufficient permits are available then
382     * the current thread becomes disabled for thread scheduling
383     * purposes and lies dormant until one of three things happens:
384     * <ul>
385     * <li>Some other thread invokes one of the {@link #release() release}
386     * methods for this semaphore, the current thread is next to be assigned
387     * permits and the number of available permits satisfies this request; or
388     * <li>Some other thread {@link Thread#interrupt interrupts} the current
389     * thread; or
390     * <li>The specified waiting time elapses.
391     * </ul>
392     * <p>If the permits are acquired then the value <tt>true</tt> is returned.
393     * <p>If the current thread:
394     * <ul>
395     * <li>has its interrupted status set on entry to this method; or
396     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
397     * the permits,
398     * </ul>
399     * then {@link InterruptedException} is thrown and the current thread's
400     * interrupted status is cleared.
401 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
402     * assigned to the next waiting thread(s), as if
403 dl 1.1 * they had been made available by a call to {@link #release()}.
404     *
405     * <p>If the specified waiting time elapses then the value <tt>false</tt>
406     * is returned.
407 dholmes 1.10 * If the time is
408 dl 1.1 * less than or equal to zero, the method will not wait at all.
409 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
410     * assigned to the next waiting thread(s), as if
411 dl 1.1 * they had been made available by a call to {@link #release()}.
412     *
413     * @param permits the number of permits to acquire
414     * @param timeout the maximum time to wait for the permits
415 dl 1.6 * @param unit the time unit of the <tt>timeout</tt> argument.
416 dl 1.1 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
417     * if the waiting time elapsed before all permits were acquired.
418     *
419     * @throws InterruptedException if the current thread is interrupted
420     * @throws IllegalArgumentException if permits less than zero.
421     *
422     * @see Thread#interrupt
423     *
424     */
425 dl 1.6 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
426 dl 1.1 throws InterruptedException {
427     if (permits < 0) throw new IllegalArgumentException();
428 dl 1.6 Node node = null;
429     long nanos = unit.toNanos(timeout);
430 dl 1.1 lock.lockInterruptibly();
431     try {
432 dl 1.6 long remaining = count - permits;
433     if (remaining >= 0) {
434     count = remaining;
435     return true;
436     }
437     if (nanos <= 0)
438     return false;
439     count = 0;
440     node = enq(-remaining);
441 tim 1.7 } finally {
442 dl 1.6 lock.unlock();
443     }
444    
445     node.lock();
446     try {
447     while (node.permitsNeeded > 0) {
448     nanos = node.done.awaitNanos(nanos);
449     if (nanos <= 0) {
450     release(permits - node.permitsNeeded);
451     return false;
452 dl 1.1 }
453     }
454 dl 1.6 return true;
455 tim 1.7 } catch(InterruptedException ie) {
456 dl 1.6 if (node.permitsNeeded > 0) {
457     node.cancelled = true;
458     release(permits - node.permitsNeeded);
459     throw ie;
460 tim 1.7 } else { // ignore interrupt
461 dl 1.6 Thread.currentThread().interrupt();
462     return true;
463     }
464 tim 1.7 } finally {
465 dl 1.6 node.unlock();
466 dl 1.1 }
467     }
468    
469    
470     /**
471     * Releases the given number of permits, returning them to the semaphore.
472     * <p>Releases the given number of permits, increasing the number of
473     * available permits by that amount.
474     * If any threads are blocking trying to acquire permits, then the
475     * one that has been waiting the longest
476     * is selected and given the permits that were just released.
477     * If the number of available permits satisfies that thread's request
478     * then that thread is re-enabled for thread scheduling purposes; otherwise
479     * the thread continues to wait. If there are still permits available
480     * after the first thread's request has been satisfied, then those permits
481     * are assigned to the next waiting thread. If it is satisfied then it is
482     * re-enabled for thread scheduling purposes. This continues until there
483     * are insufficient permits to satisfy the next waiting thread, or there
484     * are no more waiting threads.
485     *
486     * <p>There is no requirement that a thread that releases a permit must
487     * have acquired that permit by calling {@link Semaphore#acquire acquire}.
488     * Correct usage of a semaphore is established by programming convention
489     * in the application.
490     *
491     * @param permits the number of permits to release
492     * @throws IllegalArgumentException if permits less than zero.
493     */
494     public void release(long permits) {
495     if (permits < 0) throw new IllegalArgumentException();
496 dl 1.6 if (permits == 0)
497     return;
498     // Even when using fair locks, releases should try to barge in.
499 dl 1.5 if (!lock.tryLock())
500     lock.lock();
501 dl 1.1 try {
502 dl 1.6 long p = permits;
503     while (p > 0) {
504     Node node = head;
505     if (node == null) {
506     count += p;
507     p = 0;
508 tim 1.7 } else {
509 dl 1.6 node.lock();
510     try {
511     if (node.cancelled)
512     removeFirst();
513     else if (node.permitsNeeded > p) {
514     node.permitsNeeded -= p;
515     p = 0;
516 tim 1.7 } else {
517 dl 1.6 p -= node.permitsNeeded;
518     node.permitsNeeded = 0;
519     node.done.signal();
520     removeFirst();
521     }
522 tim 1.7 } finally {
523 dl 1.6 node.unlock();
524     }
525     }
526     }
527 tim 1.7 } finally {
528 dl 1.1 lock.unlock();
529     }
530     }
531     }