1 |
/* |
2 |
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
* Expert Group and released to the public domain, as explained at |
4 |
* http://creativecommons.org/licenses/publicdomain |
5 |
*/ |
6 |
|
7 |
package jsr166y.forkjoin; |
8 |
import java.io.Serializable; |
9 |
import java.util.*; |
10 |
import java.util.concurrent.*; |
11 |
import java.util.concurrent.atomic.*; |
12 |
import sun.misc.Unsafe; |
13 |
import java.lang.reflect.*; |
14 |
|
15 |
/** |
16 |
* Abstract base class for tasks that run within a ForkJoinPool. A |
17 |
* ForkJoinTask is a thread-like entity that is much lighter weight |
18 |
* than a normal thread. Huge numbers of tasks and subtasks may be |
19 |
* hosted by a small number of actual threads in a ForkJoinPool, |
20 |
* at the price of some usage limitations. |
21 |
* |
22 |
* <p> The <tt>ForkJoinTask</tt> class is not directly subclassable |
23 |
* outside of this package. Instead, you can subclass one of the |
24 |
* supplied abstract classes that support various styles of fork/join |
25 |
* processing. Normally, a concrete ForkJoinTask subclass declares |
26 |
* fields comprising its parameters, established in a constructor, and |
27 |
* then defines a <tt>compute</tt> method that somehow uses the |
28 |
* control methods supplied by this base class. While these methods |
29 |
* have <tt>public</tt> access, most may only be called from within |
30 |
* other ForkJoinTasks. Attempts to invoke them in other contexts |
31 |
* result in exceptions or errors including ClassCastException. The |
32 |
* only generally accessible methods are those for cancellation and |
33 |
* status checking. The only way to invoke a "main" driver task is to |
34 |
* submit it to a ForkJoinPool. Normally, once started, this will in |
35 |
* turn start other subtasks. Nearly all of these base support |
36 |
* methods are <tt>final</tt> because their implementations are |
37 |
* intrinsically tied to the underlying lightweight task scheduling |
38 |
* framework, and so cannot be overridden. |
39 |
* |
40 |
* <p> ForkJoinTasks play similar roles as <tt>Futures</tt> but |
41 |
* support a more limited range of use. The "lightness" of |
42 |
* ForkJoinTasks is due to a set of restrictions (that are only |
43 |
* partially statically enforceable) reflecting their intended use as |
44 |
* purely computational tasks -- calculating pure functions or |
45 |
* operating on purely isolated objects. The only coordination |
46 |
* mechanisms supported for ForkJoinTasks are <tt>fork</tt>, that |
47 |
* arranges asynchronous execution, and <tt>join</tt>, that doesn't |
48 |
* proceed until the task's result has been computed. (A simple form |
49 |
* of cancellation is also supported). The computation defined in the |
50 |
* <tt>compute</tt> method should not in general perform any other |
51 |
* form of blocking synchronization, should not perform IO, and should |
52 |
* be independent of other tasks. Minor breaches of these |
53 |
* restrictions, for example using shared output streams, may be |
54 |
* tolerable in practice, but frequent use will result in poor |
55 |
* performance, and the potential to indefinitely stall if the number |
56 |
* of threads not waiting for external synchronization becomes |
57 |
* exhausted. This usage restriction is in part enforced by not |
58 |
* permitting checked exceptions such as IOExceptions to be |
59 |
* thrown. However, computations may still encounter unchecked |
60 |
* exceptions, that are rethrown to callers attempting join |
61 |
* them. These exceptions may additionally include |
62 |
* RejectedExecutionExceptions stemming from internal resource |
63 |
* exhaustion such as failure to allocate internal task queues. |
64 |
* |
65 |
* <p>ForkJoinTasks should perform relatively small amounts of |
66 |
* computations, otherwise splitting into smaller tasks. As a very |
67 |
* rough rule of thumb, a task should perform more than 100 and less |
68 |
* than 10000 basic computational steps. If tasks are too big, then |
69 |
* parallelism cannot improve throughput. If too small, then memory |
70 |
* and internal task maintenance overhead may overwhelm processing. |
71 |
* The {@link ForkJoinWorkerThread} class supports a number of |
72 |
* inspection and tuning methods that can be useful when developing |
73 |
* fork/join programs. |
74 |
* |
75 |
* <p>ForkJoinTasks are <tt>Serializable</tt>, which enables them to |
76 |
* be used in extensions such as remote execution frameworks. However, |
77 |
* it is in general safe to serialize tasks only before or after, but |
78 |
* not during execution. Serialization is not relied on during |
79 |
* execution itself. |
80 |
*/ |
81 |
public abstract class ForkJoinTask<V> implements Serializable { |
82 |
/* |
83 |
* The main implementations of execution methods are provided by |
84 |
* ForkJoinWorkerThread, so internals are package protected. This |
85 |
* class is mainly responsible for maintaining task status field |
86 |
* and exception mechanics. |
87 |
*/ |
88 |
|
89 |
/** |
90 |
* Table of exceptions thrown by tasks, to enable reporting by |
91 |
* callers. Because exceptions are rare, we don't directly keep |
92 |
* them with task objects, but instead us a weak ref table. Note |
93 |
* that cancellation exceptions don't appear in the table, but are |
94 |
* instead recorded as status values. |
95 |
* |
96 |
* Todo: Use ConcurrentReferenceMap |
97 |
*/ |
98 |
static final WeakHashMap<ForkJoinTask<?>, Throwable> exceptionTable = |
99 |
new WeakHashMap<ForkJoinTask<?>, Throwable>(); |
100 |
|
101 |
static synchronized void setException(ForkJoinTask<?> t, Throwable ex) { |
102 |
exceptionTable.put(t, ex); |
103 |
} |
104 |
|
105 |
static synchronized Throwable getException(ForkJoinTask<?> t) { |
106 |
return exceptionTable.get(t); |
107 |
} |
108 |
|
109 |
static synchronized void clearException(ForkJoinTask<?> t) { |
110 |
exceptionTable.remove(t); |
111 |
} |
112 |
|
113 |
/** |
114 |
* Disallow direct construction outside this package. |
115 |
*/ |
116 |
ForkJoinTask() { |
117 |
} |
118 |
|
119 |
/** |
120 |
* Status, taking values: |
121 |
* zero: initial |
122 |
* negative: COMPLETED. CANCELLED, or HAS_EXCEPTION |
123 |
* positive: ignored wrt completion, may be used by subclasses |
124 |
* |
125 |
* Status is set negative when a task completes. For normal |
126 |
* completion, the status field is set with a cheaper ordered |
127 |
* write (as opposed to volatile write) because ownership |
128 |
* maintenance in Worker queues ensures that it is never subject |
129 |
* to read-after-write anomalies. (Implementing this requires |
130 |
* direct use of Unsafe to avoid overhead.) |
131 |
*/ |
132 |
volatile int status; |
133 |
static final int COMPLETED = -1; // order matters |
134 |
static final int CANCELLED = -2; |
135 |
static final int HAS_EXCEPTION = -3; |
136 |
|
137 |
// within-package utilities |
138 |
|
139 |
/** |
140 |
* Immediately executes this task unless already complete, |
141 |
* trapping all possible exceptions. Returns true if executed and |
142 |
* completed normally, else false. This method cannot be |
143 |
* implemented outside this package because we must guarantee that |
144 |
* implementations trap all exceptions. |
145 |
* @return true if executed and completed normally, else false |
146 |
*/ |
147 |
abstract boolean exec(); |
148 |
|
149 |
/** |
150 |
* Sets status to indicate this task is done. |
151 |
*/ |
152 |
final void setDone() { |
153 |
_unsafe.putOrderedInt(this, statusOffset, COMPLETED); |
154 |
} |
155 |
|
156 |
final boolean casStatus(int cmp, int val) { |
157 |
return cmp == status && |
158 |
_unsafe.compareAndSwapInt(this, statusOffset, cmp, val); |
159 |
} |
160 |
|
161 |
final void setStatus(int s) { |
162 |
_unsafe.putOrderedInt(this, statusOffset, s); |
163 |
} |
164 |
|
165 |
final void incrementStatus() { |
166 |
for (;;) { |
167 |
int s = status; // force fail if already negative |
168 |
if (s < 0 || _unsafe.compareAndSwapInt(this, statusOffset, s, s+1)) |
169 |
break; |
170 |
} |
171 |
} |
172 |
|
173 |
/** |
174 |
* Sets status to indicate exceptional completion. Note that we |
175 |
* allow races across normal and exceptional completion. |
176 |
*/ |
177 |
final void setDoneExceptionally(Throwable rex) { |
178 |
setException(this, rex); |
179 |
status = HAS_EXCEPTION; |
180 |
} |
181 |
|
182 |
/** |
183 |
* Sets status to cancelled only if in initial state. Uses same |
184 |
* implementation as cancel() but used internally to safely set |
185 |
* status on pool shutdown etc even if cancel is overridden. |
186 |
*/ |
187 |
final void setCancelled() { |
188 |
_unsafe.compareAndSwapInt(this, statusOffset, 0, CANCELLED); |
189 |
} |
190 |
|
191 |
/** |
192 |
* Workaround for not being able to rethrow unchecked exceptions. |
193 |
*/ |
194 |
static final void rethrowException(Throwable ex) { |
195 |
if (ex != null) |
196 |
_unsafe.throwException(ex); |
197 |
} |
198 |
|
199 |
/** |
200 |
* Version of setDoneExceptionally that screens argument |
201 |
*/ |
202 |
final void checkedSetDoneExceptionally(Throwable rex) { |
203 |
if (!(rex instanceof RuntimeException) && !(rex instanceof Error)) |
204 |
throw new IllegalArgumentException(rex); |
205 |
setDoneExceptionally(rex); |
206 |
} |
207 |
|
208 |
/** |
209 |
* Returns result or throws exception. |
210 |
* Only call when isDone known to be true. |
211 |
*/ |
212 |
final V reportAsForkJoinResult() { |
213 |
int s = status; |
214 |
if (s == CANCELLED) |
215 |
throw new CancellationException(); |
216 |
if (s == HAS_EXCEPTION) |
217 |
rethrowException(getException(this)); |
218 |
return rawResult(); |
219 |
} |
220 |
|
221 |
/** |
222 |
* Returns result or throws exception using j.u.c.Future conventions |
223 |
* Only call when isDone known to be true. |
224 |
*/ |
225 |
final V reportAsFutureResult() throws ExecutionException { |
226 |
Throwable ex; |
227 |
int s = status; |
228 |
if (s == CANCELLED) |
229 |
throw new CancellationException(); |
230 |
if (s == HAS_EXCEPTION && (ex = getException(this)) != null) |
231 |
throw new ExecutionException(ex); |
232 |
return rawResult(); |
233 |
} |
234 |
|
235 |
// public methods |
236 |
|
237 |
/** |
238 |
* Arranges to asynchronously execute this task, which will later |
239 |
* be directly or indirectly joined by the caller of this method. |
240 |
* While it is not necessarily enforced, it is a usage error to |
241 |
* fork a task more than once unless it has completed and been |
242 |
* reinitialized. This method may be invoked only from within |
243 |
* other ForkJoinTask computations. Attempts to invoke in other |
244 |
* contexts result in exceptions or errors including |
245 |
* ClassCastException. |
246 |
*/ |
247 |
public final void fork() { |
248 |
((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this); |
249 |
} |
250 |
|
251 |
/** |
252 |
* Returns the result of the computation when it is ready. |
253 |
* Monitoring note: Callers of this method need not block, but may |
254 |
* instead assist in performing computations that may directly or |
255 |
* indirectly cause the result to be ready. |
256 |
* This method may be invoked only from within other ForkJoinTask |
257 |
* computations. Attempts to invoke in other contexts result |
258 |
* in exceptions or errors including ClassCastException. |
259 |
* |
260 |
* @return the computed result |
261 |
* @throws Throwable (a RuntimeException, Error, or unchecked |
262 |
* exception) if the underlying computation did so. |
263 |
*/ |
264 |
public final V join() { |
265 |
int s = status; |
266 |
if (s >= 0) { |
267 |
((ForkJoinWorkerThread)(Thread.currentThread())).helpJoinTask(this); |
268 |
s = status; |
269 |
} |
270 |
return s < COMPLETED? reportAsForkJoinResult() : rawResult(); |
271 |
} |
272 |
|
273 |
/** |
274 |
* Equivalent in effect to the sequence <tt>fork(); join();</tt> |
275 |
* but likely to be more efficient. |
276 |
* @throws Throwable (a RuntimeException, Error, or unchecked |
277 |
* exception) if the underlying computation did so. |
278 |
* @return the computed result |
279 |
*/ |
280 |
public V forkJoin() { |
281 |
exec(); |
282 |
return join(); |
283 |
} |
284 |
|
285 |
/** |
286 |
* Returns true if the computation performed by this task has |
287 |
* completed (or has been cancelled). |
288 |
* @return true if this computation has completed |
289 |
*/ |
290 |
public final boolean isDone() { |
291 |
return status < 0; |
292 |
} |
293 |
|
294 |
/** |
295 |
* Returns true if this task was cancelled. |
296 |
* @return true if this task was cancelled |
297 |
*/ |
298 |
public final boolean isCancelled() { |
299 |
return status == CANCELLED; |
300 |
} |
301 |
|
302 |
/** |
303 |
* Returns true if task currently has COMPLETED status. This |
304 |
* method is not public because this fact may asynchronously |
305 |
* change, which we can handle internally but not externally. |
306 |
*/ |
307 |
final boolean completedNormally() { |
308 |
return status == COMPLETED; |
309 |
} |
310 |
|
311 |
/** |
312 |
* Returns true if task threw and exception or was cancelled |
313 |
*/ |
314 |
final boolean completedAbnormally() { |
315 |
return status < COMPLETED; |
316 |
} |
317 |
|
318 |
/** |
319 |
* Asserts that the results of this task's computation will not be |
320 |
* used. If a cancellation occurs before this task is processed, |
321 |
* then its <tt>compute</tt> method will not be executed, |
322 |
* <tt>isCancelled</tt> will report true, and <tt>join</tt> will |
323 |
* result in a CancellationException being thrown. Otherwise, |
324 |
* there are no guarantees about whether <tt>isCancelled</tt> will |
325 |
* report true, whether <tt>join</tt> will return normally or via |
326 |
* an exception, or whether these behaviors will remain consistent |
327 |
* upon repeated invocation. This method may be overridden in |
328 |
* subclasses, but if so, must still ensure that these minimal |
329 |
* properties hold. |
330 |
* |
331 |
* <p> This method is designed to be invoked by <em>other</em> |
332 |
* tasks. To abruptly terminate the current task, you should just |
333 |
* return from its computation method, or <tt>throw new |
334 |
* CancellationException()</tt>, or in AsyncActions, invoke |
335 |
* <tt>finishExceptionally()</tt>. |
336 |
*/ |
337 |
public void cancel() { |
338 |
// Using 0 here succeeds if task never touched, and maybe otherwise |
339 |
_unsafe.compareAndSwapInt(this, statusOffset, 0, CANCELLED); |
340 |
} |
341 |
|
342 |
/** |
343 |
* Returns the exception thrown by method <tt>compute</tt>, or a |
344 |
* CancellationException if cancelled, or null if none or if the |
345 |
* method has not yet completed. |
346 |
* @return the exception, or null if none |
347 |
*/ |
348 |
public final Throwable getException() { |
349 |
int s = status; |
350 |
if (s >= COMPLETED) |
351 |
return null; |
352 |
if (s == CANCELLED) |
353 |
return new CancellationException(); |
354 |
return getException(this); |
355 |
} |
356 |
|
357 |
/** |
358 |
* Returns the result that would be returned by <tt>join</tt>, or |
359 |
* null if this task is not known to have been completed. This |
360 |
* method is designed to aid debugging, as well as to support |
361 |
* extensions. Its use in any other context is strongly |
362 |
* discouraged. |
363 |
* @return the result, or null if not completed. |
364 |
*/ |
365 |
public abstract V rawResult(); |
366 |
|
367 |
/** |
368 |
* Resets the internal bookkeeping state of this task, allowing a |
369 |
* subsequent <tt>fork</tt>. This method allows repeated reuse of |
370 |
* this task, but only if reuse occurs when this task has either |
371 |
* never been forked, or has been forked, then completed and all |
372 |
* outstanding joins of this task have also completed. Effects |
373 |
* under any other usage conditions are not guaranteed, and are |
374 |
* almost surely wrong. This method may be useful when executing |
375 |
* pre-constructed trees of subtasks in loops. |
376 |
*/ |
377 |
public void reinitialize() { |
378 |
if (status == HAS_EXCEPTION) |
379 |
clearException(this); |
380 |
status = 0; |
381 |
} |
382 |
|
383 |
/** |
384 |
* Joins this task, without returning its result or throwing an |
385 |
* exception. This method may be useful when processing |
386 |
* collections of tasks when some have been cancelled or otherwise |
387 |
* known to have aborted. This method may be invoked only from |
388 |
* within other ForkJoinTask computations. Attempts to invoke in |
389 |
* other contexts result in exceptions or errors including |
390 |
* ClassCastException. |
391 |
*/ |
392 |
public final void quietlyJoin() { |
393 |
((ForkJoinWorkerThread)(Thread.currentThread())).helpJoinTask(this); |
394 |
} |
395 |
|
396 |
// Serialization support |
397 |
|
398 |
private static final long serialVersionUID = -7721805057305804111L; |
399 |
|
400 |
/** |
401 |
* Save the state to a stream. |
402 |
* |
403 |
* @serialData the current run status and the exception thrown |
404 |
* during execution, or null if none. |
405 |
* @param s the stream |
406 |
*/ |
407 |
private void writeObject(java.io.ObjectOutputStream s) |
408 |
throws java.io.IOException { |
409 |
s.defaultWriteObject(); |
410 |
s.writeObject(getException(this)); |
411 |
} |
412 |
|
413 |
/** |
414 |
* Reconstitute the instance from a stream. |
415 |
* @param s the stream |
416 |
*/ |
417 |
private void readObject(java.io.ObjectInputStream s) |
418 |
throws java.io.IOException, ClassNotFoundException { |
419 |
s.defaultReadObject(); |
420 |
Object ex = s.readObject(); |
421 |
if (ex != null) |
422 |
setException(this, (Throwable)ex); |
423 |
} |
424 |
|
425 |
// Temporary Unsafe mechanics for preliminary release |
426 |
|
427 |
static final Unsafe _unsafe; |
428 |
static final long statusOffset; |
429 |
|
430 |
static { |
431 |
try { |
432 |
if (ForkJoinWorkerThread.class.getClassLoader() != null) { |
433 |
Field f = Unsafe.class.getDeclaredField("theUnsafe"); |
434 |
f.setAccessible(true); |
435 |
_unsafe = (Unsafe)f.get(null); |
436 |
} |
437 |
else |
438 |
_unsafe = Unsafe.getUnsafe(); |
439 |
statusOffset = _unsafe.objectFieldOffset |
440 |
(ForkJoinTask.class.getDeclaredField("status")); |
441 |
} catch (Exception ex) { throw new Error(ex); } |
442 |
} |
443 |
|
444 |
} |