ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/forkjoin/ForkJoinTask.java
Revision: 1.29
Committed: Tue Jan 6 14:34:58 2009 UTC (15 years, 4 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.28: +0 -0 lines
State: FILE REMOVED
Log Message:
Repackaging

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y.forkjoin;
8 import java.io.Serializable;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * Abstract base class for tasks that run within a ForkJoinPool. A
17 * ForkJoinTask is a thread-like entity that is much lighter weight
18 * than a normal thread. Huge numbers of tasks and subtasks may be
19 * hosted by a small number of actual threads in a ForkJoinPool,
20 * at the price of some usage limitations.
21 *
22 * <p> The <tt>ForkJoinTask</tt> class is not directly subclassable
23 * outside of this package. Instead, you can subclass one of the
24 * supplied abstract classes that support various styles of fork/join
25 * processing. Normally, a concrete ForkJoinTask subclass declares
26 * fields comprising its parameters, established in a constructor, and
27 * then defines a <tt>compute</tt> method that somehow uses the
28 * control methods supplied by this base class. While these methods
29 * have <tt>public</tt> access, most may only be called from within
30 * other ForkJoinTasks. Attempts to invoke them in other contexts
31 * result in exceptions or errors including ClassCastException. The
32 * only generally accessible methods are those for cancellation and
33 * status checking. The only way to invoke a "main" driver task is to
34 * submit it to a ForkJoinPool. Normally, once started, this will in
35 * turn start other subtasks. Nearly all of these base support
36 * methods are <tt>final</tt> because their implementations are
37 * intrinsically tied to the underlying lightweight task scheduling
38 * framework, and so cannot be overridden.
39 *
40 * <p> ForkJoinTasks play similar roles as <tt>Futures</tt> but
41 * support a more limited range of use. The "lightness" of
42 * ForkJoinTasks is due to a set of restrictions (that are only
43 * partially statically enforceable) reflecting their intended use as
44 * purely computational tasks -- calculating pure functions or
45 * operating on purely isolated objects. The only coordination
46 * mechanisms supported for ForkJoinTasks are <tt>fork</tt>, that
47 * arranges asynchronous execution, and <tt>join</tt>, that doesn't
48 * proceed until the task's result has been computed. (A simple form
49 * of cancellation is also supported). The computation defined in the
50 * <tt>compute</tt> method should not in general perform any other
51 * form of blocking synchronization, should not perform IO, and should
52 * be independent of other tasks. Minor breaches of these
53 * restrictions, for example using shared output streams, may be
54 * tolerable in practice, but frequent use will result in poor
55 * performance, and the potential to indefinitely stall if the number
56 * of threads not waiting for external synchronization becomes
57 * exhausted. This usage restriction is in part enforced by not
58 * permitting checked exceptions such as IOExceptions to be
59 * thrown. However, computations may still encounter unchecked
60 * exceptions, that are rethrown to callers attempting join
61 * them. These exceptions may additionally include
62 * RejectedExecutionExceptions stemming from internal resource
63 * exhaustion such as failure to allocate internal task queues.
64 *
65 * <p>ForkJoinTasks should perform relatively small amounts of
66 * computations, otherwise splitting into smaller tasks. As a very
67 * rough rule of thumb, a task should perform more than 100 and less
68 * than 10000 basic computational steps. If tasks are too big, then
69 * parallelism cannot improve throughput. If too small, then memory
70 * and internal task maintenance overhead may overwhelm processing.
71 * The {@link ForkJoinWorkerThread} class supports a number of
72 * inspection and tuning methods that can be useful when developing
73 * fork/join programs.
74 *
75 * <p>ForkJoinTasks are <tt>Serializable</tt>, which enables them to
76 * be used in extensions such as remote execution frameworks. However,
77 * it is in general safe to serialize tasks only before or after, but
78 * not during execution. Serialization is not relied on during
79 * execution itself.
80 */
81 public abstract class ForkJoinTask<V> implements Serializable {
82 /*
83 * The main implementations of execution methods are provided by
84 * ForkJoinWorkerThread, so internals are package protected. This
85 * class is mainly responsible for maintaining task status field
86 * and exception mechanics.
87 */
88
89 /**
90 * Table of exceptions thrown by tasks, to enable reporting by
91 * callers. Because exceptions are rare, we don't directly keep
92 * them with task objects, but instead us a weak ref table. Note
93 * that cancellation exceptions don't appear in the table, but are
94 * instead recorded as status values.
95 *
96 * Todo: Use ConcurrentReferenceMap
97 */
98 static final WeakHashMap<ForkJoinTask<?>, Throwable> exceptionTable =
99 new WeakHashMap<ForkJoinTask<?>, Throwable>();
100
101 static synchronized void setException(ForkJoinTask<?> t, Throwable ex) {
102 exceptionTable.put(t, ex);
103 }
104
105 static synchronized Throwable getException(ForkJoinTask<?> t) {
106 return exceptionTable.get(t);
107 }
108
109 static synchronized void clearException(ForkJoinTask<?> t) {
110 exceptionTable.remove(t);
111 }
112
113 /**
114 * Disallow direct construction outside this package.
115 */
116 ForkJoinTask() {
117 }
118
119 /**
120 * Status, taking values:
121 * zero: initial
122 * negative: COMPLETED. CANCELLED, or HAS_EXCEPTION
123 * positive: ignored wrt completion, may be used by subclasses
124 *
125 * Status is set negative when a task completes. For normal
126 * completion, the status field is set with a cheaper ordered
127 * write (as opposed to volatile write) because ownership
128 * maintenance in Worker queues ensures that it is never subject
129 * to read-after-write anomalies. (Implementing this requires
130 * direct use of Unsafe to avoid overhead.)
131 */
132 volatile int status;
133 static final int COMPLETED = -1; // order matters
134 static final int CANCELLED = -2;
135 static final int HAS_EXCEPTION = -3;
136
137 // within-package utilities
138
139 /**
140 * Immediately executes this task unless already complete,
141 * trapping all possible exceptions. Returns true if executed and
142 * completed normally, else false. This method cannot be
143 * implemented outside this package because we must guarantee that
144 * implementations trap all exceptions.
145 * @return true if executed and completed normally, else false
146 */
147 abstract boolean exec();
148
149 /**
150 * Sets status to indicate this task is done.
151 */
152 final void setDone() {
153 _unsafe.putOrderedInt(this, statusOffset, COMPLETED);
154 }
155
156 final boolean casStatus(int cmp, int val) {
157 return cmp == status &&
158 _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
159 }
160
161 final void setStatus(int s) {
162 _unsafe.putOrderedInt(this, statusOffset, s);
163 }
164
165 final void incrementStatus() {
166 for (;;) {
167 int s = status; // force fail if already negative
168 if (s < 0 || _unsafe.compareAndSwapInt(this, statusOffset, s, s+1))
169 break;
170 }
171 }
172
173 /**
174 * Sets status to indicate exceptional completion. Note that we
175 * allow races across normal and exceptional completion.
176 */
177 final void setDoneExceptionally(Throwable rex) {
178 setException(this, rex);
179 status = HAS_EXCEPTION;
180 }
181
182 /**
183 * Sets status to cancelled only if in initial state. Uses same
184 * implementation as cancel() but used internally to safely set
185 * status on pool shutdown etc even if cancel is overridden.
186 */
187 final void setCancelled() {
188 _unsafe.compareAndSwapInt(this, statusOffset, 0, CANCELLED);
189 }
190
191 /**
192 * Workaround for not being able to rethrow unchecked exceptions.
193 */
194 static final void rethrowException(Throwable ex) {
195 if (ex != null)
196 _unsafe.throwException(ex);
197 }
198
199 /**
200 * Version of setDoneExceptionally that screens argument
201 */
202 final void checkedSetDoneExceptionally(Throwable rex) {
203 if (!(rex instanceof RuntimeException) && !(rex instanceof Error))
204 throw new IllegalArgumentException(rex);
205 setDoneExceptionally(rex);
206 }
207
208 /**
209 * Returns result or throws exception.
210 * Only call when isDone known to be true.
211 */
212 final V reportAsForkJoinResult() {
213 int s = status;
214 if (s == CANCELLED)
215 throw new CancellationException();
216 if (s == HAS_EXCEPTION)
217 rethrowException(getException(this));
218 return rawResult();
219 }
220
221 /**
222 * Returns result or throws exception using j.u.c.Future conventions
223 * Only call when isDone known to be true.
224 */
225 final V reportAsFutureResult() throws ExecutionException {
226 Throwable ex;
227 int s = status;
228 if (s == CANCELLED)
229 throw new CancellationException();
230 if (s == HAS_EXCEPTION && (ex = getException(this)) != null)
231 throw new ExecutionException(ex);
232 return rawResult();
233 }
234
235 // public methods
236
237 /**
238 * Arranges to asynchronously execute this task, which will later
239 * be directly or indirectly joined by the caller of this method.
240 * While it is not necessarily enforced, it is a usage error to
241 * fork a task more than once unless it has completed and been
242 * reinitialized. This method may be invoked only from within
243 * other ForkJoinTask computations. Attempts to invoke in other
244 * contexts result in exceptions or errors including
245 * ClassCastException.
246 */
247 public final void fork() {
248 ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
249 }
250
251 /**
252 * Returns the result of the computation when it is ready.
253 * Monitoring note: Callers of this method need not block, but may
254 * instead assist in performing computations that may directly or
255 * indirectly cause the result to be ready.
256 * This method may be invoked only from within other ForkJoinTask
257 * computations. Attempts to invoke in other contexts result
258 * in exceptions or errors including ClassCastException.
259 *
260 * @return the computed result
261 * @throws Throwable (a RuntimeException, Error, or unchecked
262 * exception) if the underlying computation did so.
263 */
264 public final V join() {
265 int s = status;
266 if (s >= 0) {
267 ((ForkJoinWorkerThread)(Thread.currentThread())).helpJoinTask(this);
268 s = status;
269 }
270 return s < COMPLETED? reportAsForkJoinResult() : rawResult();
271 }
272
273 /**
274 * Equivalent in effect to the sequence <tt>fork(); join();</tt>
275 * but likely to be more efficient.
276 * @throws Throwable (a RuntimeException, Error, or unchecked
277 * exception) if the underlying computation did so.
278 * @return the computed result
279 */
280 public V forkJoin() {
281 exec();
282 return join();
283 }
284
285 /**
286 * Returns true if the computation performed by this task has
287 * completed (or has been cancelled).
288 * @return true if this computation has completed
289 */
290 public final boolean isDone() {
291 return status < 0;
292 }
293
294 /**
295 * Returns true if this task was cancelled.
296 * @return true if this task was cancelled
297 */
298 public final boolean isCancelled() {
299 return status == CANCELLED;
300 }
301
302 /**
303 * Returns true if task currently has COMPLETED status. This
304 * method is not public because this fact may asynchronously
305 * change, which we can handle internally but not externally.
306 */
307 final boolean completedNormally() {
308 return status == COMPLETED;
309 }
310
311 /**
312 * Returns true if task threw and exception or was cancelled
313 */
314 final boolean completedAbnormally() {
315 return status < COMPLETED;
316 }
317
318 /**
319 * Asserts that the results of this task's computation will not be
320 * used. If a cancellation occurs before this task is processed,
321 * then its <tt>compute</tt> method will not be executed,
322 * <tt>isCancelled</tt> will report true, and <tt>join</tt> will
323 * result in a CancellationException being thrown. Otherwise,
324 * there are no guarantees about whether <tt>isCancelled</tt> will
325 * report true, whether <tt>join</tt> will return normally or via
326 * an exception, or whether these behaviors will remain consistent
327 * upon repeated invocation. This method may be overridden in
328 * subclasses, but if so, must still ensure that these minimal
329 * properties hold.
330 *
331 * <p> This method is designed to be invoked by <em>other</em>
332 * tasks. To abruptly terminate the current task, you should just
333 * return from its computation method, or <tt>throw new
334 * CancellationException()</tt>, or in AsyncActions, invoke
335 * <tt>finishExceptionally()</tt>.
336 */
337 public void cancel() {
338 // Using 0 here succeeds if task never touched, and maybe otherwise
339 _unsafe.compareAndSwapInt(this, statusOffset, 0, CANCELLED);
340 }
341
342 /**
343 * Returns the exception thrown by method <tt>compute</tt>, or a
344 * CancellationException if cancelled, or null if none or if the
345 * method has not yet completed.
346 * @return the exception, or null if none
347 */
348 public final Throwable getException() {
349 int s = status;
350 if (s >= COMPLETED)
351 return null;
352 if (s == CANCELLED)
353 return new CancellationException();
354 return getException(this);
355 }
356
357 /**
358 * Returns the result that would be returned by <tt>join</tt>, or
359 * null if this task is not known to have been completed. This
360 * method is designed to aid debugging, as well as to support
361 * extensions. Its use in any other context is strongly
362 * discouraged.
363 * @return the result, or null if not completed.
364 */
365 public abstract V rawResult();
366
367 /**
368 * Resets the internal bookkeeping state of this task, allowing a
369 * subsequent <tt>fork</tt>. This method allows repeated reuse of
370 * this task, but only if reuse occurs when this task has either
371 * never been forked, or has been forked, then completed and all
372 * outstanding joins of this task have also completed. Effects
373 * under any other usage conditions are not guaranteed, and are
374 * almost surely wrong. This method may be useful when executing
375 * pre-constructed trees of subtasks in loops.
376 */
377 public void reinitialize() {
378 if (status == HAS_EXCEPTION)
379 clearException(this);
380 status = 0;
381 }
382
383 /**
384 * Joins this task, without returning its result or throwing an
385 * exception. This method may be useful when processing
386 * collections of tasks when some have been cancelled or otherwise
387 * known to have aborted. This method may be invoked only from
388 * within other ForkJoinTask computations. Attempts to invoke in
389 * other contexts result in exceptions or errors including
390 * ClassCastException.
391 */
392 public final void quietlyJoin() {
393 ((ForkJoinWorkerThread)(Thread.currentThread())).helpJoinTask(this);
394 }
395
396 // Serialization support
397
398 private static final long serialVersionUID = -7721805057305804111L;
399
400 /**
401 * Save the state to a stream.
402 *
403 * @serialData the current run status and the exception thrown
404 * during execution, or null if none.
405 * @param s the stream
406 */
407 private void writeObject(java.io.ObjectOutputStream s)
408 throws java.io.IOException {
409 s.defaultWriteObject();
410 s.writeObject(getException(this));
411 }
412
413 /**
414 * Reconstitute the instance from a stream.
415 * @param s the stream
416 */
417 private void readObject(java.io.ObjectInputStream s)
418 throws java.io.IOException, ClassNotFoundException {
419 s.defaultReadObject();
420 Object ex = s.readObject();
421 if (ex != null)
422 setException(this, (Throwable)ex);
423 }
424
425 // Temporary Unsafe mechanics for preliminary release
426
427 static final Unsafe _unsafe;
428 static final long statusOffset;
429
430 static {
431 try {
432 if (ForkJoinWorkerThread.class.getClassLoader() != null) {
433 Field f = Unsafe.class.getDeclaredField("theUnsafe");
434 f.setAccessible(true);
435 _unsafe = (Unsafe)f.get(null);
436 }
437 else
438 _unsafe = Unsafe.getUnsafe();
439 statusOffset = _unsafe.objectFieldOffset
440 (ForkJoinTask.class.getDeclaredField("status"));
441 } catch (Exception ex) { throw new Error(ex); }
442 }
443
444 }