11 |
|
/** |
12 |
|
* A {@linkplain BlockingQueue blocking queue} in which each |
13 |
|
* <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A |
14 |
< |
* synchronous queue does not have any internal capacity - in |
15 |
< |
* particular it does not have a capacity of one. You cannot |
16 |
< |
* <tt>peek</tt> at a synchronous queue because an element is only |
17 |
< |
* present when you try to take it; you cannot add an element (using |
18 |
< |
* any method) unless another thread is trying to remove it; you |
19 |
< |
* cannot iterate as there is nothing to iterate. The <em>head</em> |
20 |
< |
* of the queue is the element that the first queued thread is trying |
21 |
< |
* to add to the queue; if there are no queued threads then no element |
22 |
< |
* is being added and the head is <tt>null</tt>. For purposes of |
23 |
< |
* other <tt>Collection</tt> methods (for example <tt>contains</tt>), |
24 |
< |
* a <tt>SynchronousQueue</tt> acts as an empty collection. This |
25 |
< |
* queue does not permit <tt>null</tt> elements. |
14 |
> |
* synchronous queue does not have any internal capacity, not even a |
15 |
> |
* capacity of one. You cannot <tt>peek</tt> at a synchronous queue |
16 |
> |
* because an element is only present when you try to take it; you |
17 |
> |
* cannot add an element (using any method) unless another thread is |
18 |
> |
* trying to remove it; you cannot iterate as there is nothing to |
19 |
> |
* iterate. The <em>head</em> of the queue is the element that the |
20 |
> |
* first queued thread is trying to add to the queue; if there are no |
21 |
> |
* queued threads then no element is being added and the head is |
22 |
> |
* <tt>null</tt>. For purposes of other <tt>Collection</tt> methods |
23 |
> |
* (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts |
24 |
> |
* as an empty collection. This queue does not permit <tt>null</tt> |
25 |
> |
* elements. |
26 |
|
* |
27 |
|
* <p>Synchronous queues are similar to rendezvous channels used in |
28 |
|
* CSP and Ada. They are well suited for handoff designs, in which an |
88 |
|
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy. |
89 |
|
*/ |
90 |
|
public SynchronousQueue() { |
91 |
< |
qlock = new ReentrantLock(); |
92 |
< |
waitingProducers = new LifoWaitQueue(); |
93 |
< |
waitingConsumers = new LifoWaitQueue(); |
91 |
> |
this(false); |
92 |
|
} |
93 |
|
|
94 |
|
/** |
95 |
|
* Creates a <tt>SynchronousQueue</tt> with specified fairness policy. |
96 |
< |
* @param fair if true, threads contend in FIFO order for access. |
96 |
> |
* @param fair if true, threads contend in FIFO order for access; |
97 |
> |
* otherwise the order is unspecified. |
98 |
|
*/ |
99 |
|
public SynchronousQueue(boolean fair) { |
100 |
|
if (fair) { |
112 |
|
/** |
113 |
|
* Queue to hold waiting puts/takes; specialized to FiFo/Lifo below. |
114 |
|
* These queues have all transient fields, but are serializable |
115 |
< |
* in order to retain fairness settings when deserialized. |
115 |
> |
* in order to recover fairness settings when deserialized. |
116 |
|
*/ |
117 |
|
static abstract class WaitQueue implements java.io.Serializable { |
118 |
|
/** Create, add, and return node for x */ |
162 |
|
|
163 |
|
Node deq() { |
164 |
|
Node p = head; |
165 |
< |
if (p != null) |
165 |
> |
if (p != null) { |
166 |
|
head = p.next; |
167 |
+ |
p.next = null; |
168 |
+ |
} |
169 |
|
return p; |
170 |
|
} |
171 |
|
} |
187 |
|
/** Next node in wait queue */ |
188 |
|
Node next; |
189 |
|
|
190 |
< |
/** Create node with initial item */ |
190 |
> |
/** Creates a node with initial item */ |
191 |
|
Node(Object x) { item = x; } |
192 |
|
|
193 |
< |
/** Create node with initial item and next */ |
193 |
> |
/** Creates a node with initial item and next */ |
194 |
|
Node(Object x, Node n) { item = x; next = n; } |
195 |
|
|
196 |
|
/** |
208 |
|
} |
209 |
|
|
210 |
|
/** |
211 |
< |
* Take item and null out field (for sake of GC) |
211 |
> |
* Takes item and nulls out field (for sake of GC) |
212 |
|
*/ |
213 |
|
private Object extract() { |
214 |
|
Object x = item; |
217 |
|
} |
218 |
|
|
219 |
|
/** |
220 |
< |
* Try to cancel on interrupt; if so rethrowing, |
220 |
> |
* Tries to cancel on interrupt; if so rethrowing, |
221 |
|
* else setting interrupt state |
222 |
|
*/ |
223 |
|
private void checkCancellationOnInterrupt(InterruptedException ie) |
228 |
|
} |
229 |
|
|
230 |
|
/** |
231 |
< |
* Fill in the slot created by the consumer and signal consumer to |
231 |
> |
* Fills in the slot created by the consumer and signal consumer to |
232 |
|
* continue. |
233 |
|
*/ |
234 |
|
boolean setItem(Object x) { |
237 |
|
} |
238 |
|
|
239 |
|
/** |
240 |
< |
* Remove item from slot created by producer and signal producer |
240 |
> |
* Removes item from slot created by producer and signal producer |
241 |
|
* to continue. |
242 |
|
*/ |
243 |
|
Object getItem() { |
245 |
|
} |
246 |
|
|
247 |
|
/** |
248 |
< |
* Wait for a consumer to take item placed by producer. |
248 |
> |
* Waits for a consumer to take item placed by producer. |
249 |
|
*/ |
250 |
|
void waitForTake() throws InterruptedException { |
251 |
|
try { |
256 |
|
} |
257 |
|
|
258 |
|
/** |
259 |
< |
* Wait for a producer to put item placed by consumer. |
259 |
> |
* Waits for a producer to put item placed by consumer. |
260 |
|
*/ |
261 |
|
Object waitForPut() throws InterruptedException { |
262 |
|
try { |
268 |
|
} |
269 |
|
|
270 |
|
/** |
271 |
< |
* Wait for a consumer to take item placed by producer or time out. |
271 |
> |
* Waits for a consumer to take item placed by producer or time out. |
272 |
|
*/ |
273 |
|
boolean waitForTake(long nanos) throws InterruptedException { |
274 |
|
try { |
282 |
|
} |
283 |
|
|
284 |
|
/** |
285 |
< |
* Wait for a producer to put item placed by consumer, or time out. |
285 |
> |
* Waits for a producer to put item placed by consumer, or time out. |
286 |
|
*/ |
287 |
|
Object waitForPut(long nanos) throws InterruptedException { |
288 |
|
try { |
296 |
|
} |
297 |
|
} |
298 |
|
|
298 |
– |
|
299 |
– |
|
299 |
|
/** |
300 |
|
* Adds the specified element to this queue, waiting if necessary for |
301 |
|
* another thread to receive it. |