ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.13
Committed: Mon Nov 3 13:49:56 2003 UTC (20 years, 6 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.12: +0 -0 lines
State: FILE REMOVED
Log Message:
Merged FairSemaphore into Semaphore

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * A semaphore guaranteeing that threads invoking any of the {@link
12 * #acquire() acquire} methods are allocated permits in the order in
13 * which their invocation of those methods was processed
14 * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15 * applies to specific internal points of execution within these
16 * methods. So, it is possible for one thread to invoke
17 * <tt>acquire</tt> before another, but reach the ordering point after
18 * the other, and similarly upon return from the method.
19 *
20 * <p>Because permits are allocated in-order, this class also provides
21 * convenience methods to {@link #acquire(long) acquire} and {@link
22 * #release(long) release} multiple permits at a time.
23 *
24 * @since 1.5
25 * @author Doug Lea
26 *
27 */
28 public class FairSemaphore extends Semaphore {
29 private static final long serialVersionUID = -3222578661600680210L;
30
31 /*
32 * Basic algorithm is a variant of the one described
33 * in CPJ book 2nd edition, section 3.7.3
34 */
35
36 /**
37 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
38 */
39 private static class Node extends ReentrantLock {
40 /** The condition to wait on */
41 ReentrantLock.ConditionObject done = newCondition();
42 /** True if interrupted before released */
43 boolean cancelled;
44 /** Number of permits remaining until can release */
45 long permitsNeeded;
46 /** Next node of queue */
47 Node next;
48
49 Node(long n) { permitsNeeded = n; }
50 }
51
52 /** Head of the wait queue */
53 private transient Node head;
54
55 /** Pointer to last node of the wait queue */
56 private transient Node last;
57
58 /** Add a new node to wait queue with given number of permits needed */
59 private Node enq(long n) {
60 Node p = new Node(n);
61 if (last == null)
62 last = head = p;
63 else
64 last = last.next = p;
65 return p;
66 }
67
68 /** Remove first node from wait queue */
69 private void removeFirst() {
70 Node p = head;
71 assert p != null;
72 if (p != null && (head = p.next) == null)
73 last = null;
74 }
75
76 /**
77 * Construct a <tt>FairSemaphore</tt> with the given number of
78 * permits.
79 * @param permits the initial number of permits available
80 */
81 public FairSemaphore(long permits) {
82 super(permits, new ReentrantLock(true));
83 }
84
85 /**
86 * Acquires a permit from this semaphore, blocking until one is
87 * available, or the thread is {@link Thread#interrupt interrupted}.
88 *
89 * <p>Acquires a permit, if one is available and returns immediately,
90 * reducing the number of available permits by one.
91 * <p>If no permit is available then the current thread becomes
92 * disabled for thread scheduling purposes and lies dormant until
93 * one of two things happens:
94 * <ul>
95 * <li>Some other thread invokes the {@link #release} method for this
96 * semaphore and the current thread is next to be assigned a permit; or
97 * <li>Some other thread {@link Thread#interrupt interrupts} the current
98 * thread.
99 * </ul>
100 *
101 * <p>If the current thread:
102 * <ul>
103 * <li>has its interrupted status set on entry to this method; or
104 * <li>is {@link Thread#interrupt interrupted} while waiting
105 * for a permit,
106 * </ul>
107 * then {@link InterruptedException} is thrown and the current thread's
108 * interrupted status is cleared.
109 *
110 * @throws InterruptedException if the current thread is interrupted
111 *
112 * @see Thread#interrupt
113 */
114 public void acquire() throws InterruptedException {
115 acquire(1L);
116 }
117
118 /**
119 * Acquires a permit from this semaphore, blocking until one is
120 * available.
121 *
122 * <p>Acquires a permit, if one is available and returns immediately,
123 * reducing the number of available permits by one.
124 * <p>If no permit is available then the current thread becomes
125 * disabled for thread scheduling purposes and lies dormant until
126 * some other thread invokes the {@link #release} method for this
127 * semaphore and the current thread is next to be assigned a permit.
128 *
129 * <p>If the current thread
130 * is {@link Thread#interrupt interrupted} while waiting
131 * for a permit then it will continue to wait, but the time at which
132 * the thread is assigned a permit may change compared to the time it
133 * would have received the permit had no interruption occurred. When the
134 * thread does return from this method its interrupt status will be set.
135 *
136 */
137 public void acquireUninterruptibly() {
138 acquireUninterruptibly(1L);
139 }
140
141 /**
142 * Acquires a permit from this semaphore, only if one is available at the
143 * time of invocation.
144 * <p>Acquires a permit, if one is available and returns immediately,
145 * with the value <tt>true</tt>,
146 * reducing the number of available permits by one.
147 *
148 * <p>If no permit is available then this method will return
149 * immediately with the value <tt>false</tt>.
150 *
151 * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
152 * otherwise.
153 */
154 public boolean tryAcquire() {
155 return tryAcquire(1L);
156 }
157
158 /**
159 * Acquires a permit from this semaphore, if one becomes available
160 * within the given waiting time and the
161 * current thread has not been {@link Thread#interrupt interrupted}.
162 * <p>Acquires a permit, if one is available and returns immediately,
163 * with the value <tt>true</tt>,
164 * reducing the number of available permits by one.
165 * <p>If no permit is available then
166 * the current thread becomes disabled for thread scheduling
167 * purposes and lies dormant until one of three things happens:
168 * <ul>
169 * <li>Some other thread invokes the {@link #release} method for this
170 * semaphore and the current thread is next to be assigned a permit; or
171 * <li>Some other thread {@link Thread#interrupt interrupts} the current
172 * thread; or
173 * <li>The specified waiting time elapses.
174 * </ul>
175 * <p>If a permit is acquired then the value <tt>true</tt> is returned.
176 * <p>If the current thread:
177 * <ul>
178 * <li>has its interrupted status set on entry to this method; or
179 * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
180 * a permit,
181 * </ul>
182 * then {@link InterruptedException} is thrown and the current thread's
183 * interrupted status is cleared.
184 * <p>If the specified waiting time elapses then the value <tt>false</tt>
185 * is returned.
186 * If the time is less than or equal to zero, the method will not wait
187 * at all.
188 *
189 * @param timeout the maximum time to wait for a permit
190 * @param unit the time unit of the <tt>timeout</tt> argument.
191 * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
192 * if the waiting time elapsed before a permit was acquired.
193 *
194 * @throws InterruptedException if the current thread is interrupted
195 *
196 * @see Thread#interrupt
197 *
198 */
199 public boolean tryAcquire(long timeout, TimeUnit unit)
200 throws InterruptedException {
201 return tryAcquire(1L, timeout, unit);
202 }
203
204 /**
205 * Releases a permit, returning it to the semaphore.
206 * <p>Releases a permit, increasing the number of available permits
207 * by one.
208 * If any threads are blocking trying to acquire a permit, then one
209 * is selected and given the permit that was just released.
210 * That thread is re-enabled for thread scheduling purposes.
211 * <p>There is no requirement that a thread that releases a permit must
212 * have acquired that permit by calling {@link #acquire}.
213 * Correct usage of a semaphore is established by programming convention
214 * in the application.
215 */
216 public void release() {
217 release(1L);
218 }
219
220
221 /**
222 * Acquires the given number of permits from this semaphore,
223 * blocking until all are available,
224 * or the thread is {@link Thread#interrupt interrupted}.
225 *
226 * <p>Acquires the given number of permits, if they are available,
227 * and returns immediately,
228 * reducing the number of available permits by the given amount.
229 *
230 * <p>If insufficient permits are available then the current thread becomes
231 * disabled for thread scheduling purposes and lies dormant until
232 * one of two things happens:
233 * <ul>
234 * <li>Some other thread invokes one of the {@link #release() release}
235 * methods for this semaphore, the current thread is next to be assigned
236 * permits and the number of available permits satisfies this request; or
237 * <li>Some other thread {@link Thread#interrupt interrupts} the current
238 * thread.
239 * </ul>
240 *
241 * <p>If the current thread:
242 * <ul>
243 * <li>has its interrupted status set on entry to this method; or
244 * <li>is {@link Thread#interrupt interrupted} while waiting
245 * for a permit,
246 * </ul>
247 * then {@link InterruptedException} is thrown and the current thread's
248 * interrupted status is cleared.
249 * Any permits that were to be assigned to this thread, are instead
250 * assigned to the next waiting thread(s), as if
251 * they had been made available by a call to {@link #release()}.
252 *
253 * @param permits the number of permits to acquire
254 *
255 * @throws InterruptedException if the current thread is interrupted
256 * @throws IllegalArgumentException if permits less than zero.
257 *
258 * @see Thread#interrupt
259 */
260 public void acquire(long permits) throws InterruptedException {
261 if (permits < 0) throw new IllegalArgumentException();
262 Node node = null;
263 lock.lockInterruptibly();
264 try {
265 long remaining = count - permits;
266 if (remaining >= 0) {
267 count = remaining;
268 return;
269 }
270 count = 0;
271 node = enq(-remaining);
272 } finally {
273 lock.unlock();
274 }
275
276 node.lock();
277 try {
278 while (node.permitsNeeded > 0)
279 node.done.await();
280 } catch(InterruptedException ie) {
281 if (node.permitsNeeded > 0) {
282 node.cancelled = true;
283 release(permits - node.permitsNeeded);
284 throw ie;
285 } else { // ignore interrupt
286 Thread.currentThread().interrupt();
287 }
288 } finally {
289 node.unlock();
290 }
291 }
292
293 /**
294 * Acquires the given number of permits from this semaphore,
295 * blocking until all are available.
296 *
297 * <p>Acquires the given number of permits, if they are available,
298 * and returns immediately,
299 * reducing the number of available permits by the given amount.
300 *
301 * <p>If insufficient permits are available then the current thread becomes
302 * disabled for thread scheduling purposes and lies dormant until
303 * some other thread invokes one of the {@link #release() release}
304 * methods for this semaphore, the current thread is next to be assigned
305 * permits and the number of available permits satisfies this request.
306 *
307 * <p>If the current thread
308 * is {@link Thread#interrupt interrupted} while waiting
309 * for permits then it will continue to wait and its position in the
310 * queue is not affected. When the
311 * thread does return from this method its interrupt status will be set.
312 *
313 * @param permits the number of permits to acquire
314 * @throws IllegalArgumentException if permits less than zero.
315 *
316 */
317 public void acquireUninterruptibly(long permits) {
318 if (permits < 0) throw new IllegalArgumentException();
319 Node node = null;
320 lock.lock();
321 try {
322 long remaining = count - permits;
323 if (remaining >= 0) {
324 count = remaining;
325 return;
326 }
327 count = 0;
328 node = enq(-remaining);
329 } finally {
330 lock.unlock();
331 }
332
333 node.lock();
334 try {
335 while (node.permitsNeeded > 0)
336 node.done.awaitUninterruptibly();
337 } finally {
338 node.unlock();
339 }
340 }
341
342 /**
343 * Acquires the given number of permits from this semaphore, only if
344 * all are available at the time of invocation.
345 * <p>Acquires the given number of permits, if they are available, and
346 * returns immediately, with the value <tt>true</tt>,
347 * reducing the number of available permits by the given amount.
348 *
349 * <p>If insufficient permits are available then this method will return
350 * immediately with the value <tt>false</tt> and the number of available
351 * permits is unchanged.
352 *
353 * @param permits the number of permits to acquire
354 *
355 * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
356 * otherwise.
357 * @throws IllegalArgumentException if permits less than zero.
358 */
359 public boolean tryAcquire(long permits) {
360 if (permits < 0) throw new IllegalArgumentException();
361 lock.lock();
362 try {
363 long remaining = count - permits;
364 if (remaining >= 0) {
365 count = remaining;
366 return true;
367 }
368 return false;
369 } finally {
370 lock.unlock();
371 }
372 }
373
374 /**
375 * Acquires the given number of permits from this semaphore, if all
376 * become available within the given waiting time and the
377 * current thread has not been {@link Thread#interrupt interrupted}.
378 * <p>Acquires the given number of permits, if they are available and
379 * returns immediately, with the value <tt>true</tt>,
380 * reducing the number of available permits by the given amount.
381 * <p>If insufficient permits are available then
382 * the current thread becomes disabled for thread scheduling
383 * purposes and lies dormant until one of three things happens:
384 * <ul>
385 * <li>Some other thread invokes one of the {@link #release() release}
386 * methods for this semaphore, the current thread is next to be assigned
387 * permits and the number of available permits satisfies this request; or
388 * <li>Some other thread {@link Thread#interrupt interrupts} the current
389 * thread; or
390 * <li>The specified waiting time elapses.
391 * </ul>
392 * <p>If the permits are acquired then the value <tt>true</tt> is returned.
393 * <p>If the current thread:
394 * <ul>
395 * <li>has its interrupted status set on entry to this method; or
396 * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
397 * the permits,
398 * </ul>
399 * then {@link InterruptedException} is thrown and the current thread's
400 * interrupted status is cleared.
401 * Any permits that were to be assigned to this thread, are instead
402 * assigned to the next waiting thread(s), as if
403 * they had been made available by a call to {@link #release()}.
404 *
405 * <p>If the specified waiting time elapses then the value <tt>false</tt>
406 * is returned.
407 * If the time is
408 * less than or equal to zero, the method will not wait at all.
409 * Any permits that were to be assigned to this thread, are instead
410 * assigned to the next waiting thread(s), as if
411 * they had been made available by a call to {@link #release()}.
412 *
413 * @param permits the number of permits to acquire
414 * @param timeout the maximum time to wait for the permits
415 * @param unit the time unit of the <tt>timeout</tt> argument.
416 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
417 * if the waiting time elapsed before all permits were acquired.
418 *
419 * @throws InterruptedException if the current thread is interrupted
420 * @throws IllegalArgumentException if permits less than zero.
421 *
422 * @see Thread#interrupt
423 *
424 */
425 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
426 throws InterruptedException {
427 if (permits < 0) throw new IllegalArgumentException();
428 Node node = null;
429 long nanos = unit.toNanos(timeout);
430 lock.lockInterruptibly();
431 try {
432 long remaining = count - permits;
433 if (remaining >= 0) {
434 count = remaining;
435 return true;
436 }
437 if (nanos <= 0)
438 return false;
439 count = 0;
440 node = enq(-remaining);
441 } finally {
442 lock.unlock();
443 }
444
445 node.lock();
446 try {
447 while (node.permitsNeeded > 0) {
448 nanos = node.done.awaitNanos(nanos);
449 if (nanos <= 0) {
450 release(permits - node.permitsNeeded);
451 return false;
452 }
453 }
454 return true;
455 } catch(InterruptedException ie) {
456 if (node.permitsNeeded > 0) {
457 node.cancelled = true;
458 release(permits - node.permitsNeeded);
459 throw ie;
460 } else { // ignore interrupt
461 Thread.currentThread().interrupt();
462 return true;
463 }
464 } finally {
465 node.unlock();
466 }
467 }
468
469
470 /**
471 * Releases the given number of permits, returning them to the semaphore.
472 * <p>Releases the given number of permits, increasing the number of
473 * available permits by that amount.
474 * If any threads are blocking trying to acquire permits, then the
475 * one that has been waiting the longest
476 * is selected and given the permits that were just released.
477 * If the number of available permits satisfies that thread's request
478 * then that thread is re-enabled for thread scheduling purposes; otherwise
479 * the thread continues to wait. If there are still permits available
480 * after the first thread's request has been satisfied, then those permits
481 * are assigned to the next waiting thread. If it is satisfied then it is
482 * re-enabled for thread scheduling purposes. This continues until there
483 * are insufficient permits to satisfy the next waiting thread, or there
484 * are no more waiting threads.
485 *
486 * <p>There is no requirement that a thread that releases a permit must
487 * have acquired that permit by calling {@link Semaphore#acquire acquire}.
488 * Correct usage of a semaphore is established by programming convention
489 * in the application.
490 *
491 * @param permits the number of permits to release
492 * @throws IllegalArgumentException if permits less than zero.
493 */
494 public void release(long permits) {
495 if (permits < 0) throw new IllegalArgumentException();
496 if (permits == 0)
497 return;
498 // Even when using fair locks, releases should try to barge in.
499 if (!lock.tryLock())
500 lock.lock();
501 try {
502 long p = permits;
503 while (p > 0) {
504 Node node = head;
505 if (node == null) {
506 count += p;
507 p = 0;
508 } else {
509 node.lock();
510 try {
511 if (node.cancelled)
512 removeFirst();
513 else if (node.permitsNeeded > p) {
514 node.permitsNeeded -= p;
515 p = 0;
516 } else {
517 p -= node.permitsNeeded;
518 node.permitsNeeded = 0;
519 node.done.signal();
520 removeFirst();
521 }
522 } finally {
523 node.unlock();
524 }
525 }
526 }
527 } finally {
528 lock.unlock();
529 }
530 }
531 }