ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
Revision: 1.15
Committed: Thu May 27 16:47:21 2010 UTC (14 years ago) by dl
Branch: MAIN
Changes since 1.14: +215 -279 lines
Log Message:
Sync with jsr166y

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.io.Serializable;
10     import java.util.Collection;
11     import java.util.Collections;
12     import java.util.List;
13 jsr166 1.7 import java.util.RandomAccess;
14 jsr166 1.1 import java.util.Map;
15     import java.util.WeakHashMap;
16    
17     /**
18 jsr166 1.6 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
19     * A {@code ForkJoinTask} is a thread-like entity that is much
20 jsr166 1.1 * lighter weight than a normal thread. Huge numbers of tasks and
21     * subtasks may be hosted by a small number of actual threads in a
22     * ForkJoinPool, at the price of some usage limitations.
23     *
24 jsr166 1.6 * <p>A "main" {@code ForkJoinTask} begins execution when submitted
25     * to a {@link ForkJoinPool}. Once started, it will usually in turn
26     * start other subtasks. As indicated by the name of this class,
27     * many programs using {@code ForkJoinTask} employ only methods
28     * {@link #fork} and {@link #join}, or derivatives such as {@link
29     * #invokeAll}. However, this class also provides a number of other
30     * methods that can come into play in advanced usages, as well as
31     * extension mechanics that allow support of new forms of fork/join
32     * processing.
33 jsr166 1.1 *
34 jsr166 1.6 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
35     * The efficiency of {@code ForkJoinTask}s stems from a set of
36     * restrictions (that are only partially statically enforceable)
37     * reflecting their intended use as computational tasks calculating
38     * pure functions or operating on purely isolated objects. The
39     * primary coordination mechanisms are {@link #fork}, that arranges
40     * asynchronous execution, and {@link #join}, that doesn't proceed
41     * until the task's result has been computed. Computations should
42     * avoid {@code synchronized} methods or blocks, and should minimize
43     * other blocking synchronization apart from joining other tasks or
44     * using synchronizers such as Phasers that are advertised to
45     * cooperate with fork/join scheduling. Tasks should also not perform
46     * blocking IO, and should ideally access variables that are
47     * completely independent of those accessed by other running
48     * tasks. Minor breaches of these restrictions, for example using
49     * shared output streams, may be tolerable in practice, but frequent
50     * use may result in poor performance, and the potential to
51     * indefinitely stall if the number of threads not waiting for IO or
52     * other external synchronization becomes exhausted. This usage
53     * restriction is in part enforced by not permitting checked
54     * exceptions such as {@code IOExceptions} to be thrown. However,
55     * computations may still encounter unchecked exceptions, that are
56 jsr166 1.7 * rethrown to callers attempting to join them. These exceptions may
57 jsr166 1.11 * additionally include {@link RejectedExecutionException} stemming
58     * from internal resource exhaustion, such as failure to allocate
59     * internal task queues.
60 jsr166 1.1 *
61     * <p>The primary method for awaiting completion and extracting
62     * results of a task is {@link #join}, but there are several variants:
63     * The {@link Future#get} methods support interruptible and/or timed
64     * waits for completion and report results using {@code Future}
65     * conventions. Method {@link #helpJoin} enables callers to actively
66     * execute other tasks while awaiting joins, which is sometimes more
67     * efficient but only applies when all subtasks are known to be
68     * strictly tree-structured. Method {@link #invoke} is semantically
69 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to begin
70     * execution in the current thread. The "<em>quiet</em>" forms of
71     * these methods do not extract results or report exceptions. These
72 jsr166 1.1 * may be useful when a set of tasks are being executed, and you need
73     * to delay processing of results or exceptions until all complete.
74     * Method {@code invokeAll} (available in multiple versions)
75     * performs the most common form of parallel invocation: forking a set
76     * of tasks and joining them all.
77     *
78 jsr166 1.8 * <p>The execution status of tasks may be queried at several levels
79     * of detail: {@link #isDone} is true if a task completed in any way
80     * (including the case where a task was cancelled without executing);
81     * {@link #isCompletedNormally} is true if a task completed without
82 jsr166 1.10 * cancellation or encountering an exception; {@link #isCancelled} is
83     * true if the task was cancelled (in which case {@link #getException}
84     * returns a {@link java.util.concurrent.CancellationException}); and
85     * {@link #isCompletedAbnormally} is true if a task was either
86     * cancelled or encountered an exception, in which case {@link
87     * #getException} will return either the encountered exception or
88     * {@link java.util.concurrent.CancellationException}.
89 jsr166 1.8 *
90 jsr166 1.6 * <p>The ForkJoinTask class is not usually directly subclassed.
91 jsr166 1.1 * Instead, you subclass one of the abstract classes that support a
92 jsr166 1.6 * particular style of fork/join processing, typically {@link
93     * RecursiveAction} for computations that do not return results, or
94     * {@link RecursiveTask} for those that do. Normally, a concrete
95 jsr166 1.1 * ForkJoinTask subclass declares fields comprising its parameters,
96     * established in a constructor, and then defines a {@code compute}
97     * method that somehow uses the control methods supplied by this base
98     * class. While these methods have {@code public} access (to allow
99 jsr166 1.7 * instances of different task subclasses to call each other's
100 jsr166 1.1 * methods), some of them may only be called from within other
101     * ForkJoinTasks (as may be determined using method {@link
102     * #inForkJoinPool}). Attempts to invoke them in other contexts
103     * result in exceptions or errors, possibly including
104     * ClassCastException.
105     *
106 jsr166 1.7 * <p>Most base support methods are {@code final}, to prevent
107     * overriding of implementations that are intrinsically tied to the
108     * underlying lightweight task scheduling framework. Developers
109     * creating new basic styles of fork/join processing should minimally
110     * implement {@code protected} methods {@link #exec}, {@link
111     * #setRawResult}, and {@link #getRawResult}, while also introducing
112     * an abstract computational method that can be implemented in its
113     * subclasses, possibly relying on other {@code protected} methods
114     * provided by this class.
115 jsr166 1.1 *
116     * <p>ForkJoinTasks should perform relatively small amounts of
117 jsr166 1.7 * computation. Large tasks should be split into smaller subtasks,
118     * usually via recursive decomposition. As a very rough rule of thumb,
119     * a task should perform more than 100 and less than 10000 basic
120     * computational steps. If tasks are too big, then parallelism cannot
121     * improve throughput. If too small, then memory and internal task
122     * maintenance overhead may overwhelm processing.
123 jsr166 1.1 *
124 jsr166 1.8 * <p>This class provides {@code adapt} methods for {@link Runnable}
125     * and {@link Callable}, that may be of use when mixing execution of
126     * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
127     * are of this form, consider using a pool in
128     * {@linkplain ForkJoinPool#setAsyncMode async mode}.
129 jsr166 1.6 *
130 jsr166 1.7 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
131     * used in extensions such as remote execution frameworks. It is
132     * sensible to serialize tasks only before or after, but not during,
133     * execution. Serialization is not relied on during execution itself.
134 jsr166 1.1 *
135     * @since 1.7
136     * @author Doug Lea
137     */
138     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
139    
140 dl 1.13 /*
141     * See the internal documentation of class ForkJoinPool for a
142     * general implementation overview. ForkJoinTasks are mainly
143     * responsible for maintaining their "status" field amidst relays
144     * to methods in ForkJoinWorkerThread and ForkJoinPool. The
145     * methods of this class are more-or-less layered into (1) basic
146     * status maintenance (2) execution and awaiting completion (3)
147     * user-level methods that additionally report results. This is
148     * sometimes hard to see because this file orders exported methods
149     * in a way that flows well in javadocs.
150     */
151    
152 jsr166 1.1 /**
153     * Run control status bits packed into a single int to minimize
154     * footprint and to ensure atomicity (via CAS). Status is
155     * initially zero, and takes on nonnegative values until
156     * completed, upon which status holds COMPLETED. CANCELLED, or
157     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
158 dl 1.15 * blocking waits by other threads have the SIGNAL bit set.
159 dl 1.13 *
160 dl 1.15 * Completion of a stolen task with SIGNAL set awakens any waiters
161     * via notifyAll. Even though suboptimal for some purposes, we use
162     * basic builtin wait/notify to take advantage of "monitor
163     * inflation" in JVMs that we would otherwise need to emulate to
164     * avoid adding further per-task bookkeeping overhead. We want
165     * these monitors to be "fat", i.e., not use biasing or thin-lock
166     * techniques, so use some odd coding idioms that tend to avoid
167     * them.
168     *
169     * Note that bits 1-28 are currently unused. Also value
170 dl 1.13 * 0x80000000 is available as spare completion value.
171 jsr166 1.1 */
172     volatile int status; // accessed directly by pool and workers
173    
174 dl 1.13 private static final int COMPLETION_MASK = 0xe0000000;
175     private static final int NORMAL = 0xe0000000; // == mask
176     private static final int CANCELLED = 0xc0000000;
177     private static final int EXCEPTIONAL = 0xa0000000;
178 dl 1.15 private static final int SIGNAL = 0x00000001;
179 jsr166 1.1
180     /**
181     * Table of exceptions thrown by tasks, to enable reporting by
182     * callers. Because exceptions are rare, we don't directly keep
183     * them with task objects, but instead use a weak ref table. Note
184     * that cancellation exceptions don't appear in the table, but are
185     * instead recorded as status values.
186     * TODO: Use ConcurrentReferenceHashMap
187     */
188     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
189     Collections.synchronizedMap
190     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
191    
192 dl 1.13 // Maintaining completion status
193 jsr166 1.1
194     /**
195 dl 1.13 * Marks completion and wakes up threads waiting to join this task,
196     * also clearing signal request bits.
197     *
198     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
199 dl 1.15 * @return status on exit
200 jsr166 1.1 */
201 dl 1.15 private int setCompletion(int completion) {
202 dl 1.13 int s;
203     while ((s = status) >= 0) {
204     if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
205 dl 1.15 if ((s & SIGNAL) != 0)
206 dl 1.13 synchronized (this) { notifyAll(); }
207 dl 1.15 return completion;
208 dl 1.13 }
209     }
210 dl 1.15 return s;
211 jsr166 1.1 }
212    
213     /**
214 dl 1.13 * Record exception and set exceptional completion
215 dl 1.15 * @return status on exit
216 jsr166 1.1 */
217 dl 1.15 private int setExceptionalCompletion(Throwable rex) {
218 dl 1.13 exceptionMap.put(this, rex);
219 dl 1.15 return setCompletion(EXCEPTIONAL);
220 jsr166 1.1 }
221    
222     /**
223 dl 1.15 * Blocks a worker thread until completion. Called only by pool.
224 jsr166 1.1 */
225 dl 1.15 final void internalAwaitDone() {
226 jsr166 1.1 int s;
227 dl 1.15 while ((s = status) >= 0) {
228     synchronized(this) {
229     if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
230     do {
231     try {
232 dl 1.14 wait();
233 dl 1.15 } catch (InterruptedException ie) {
234     cancelIfTerminating();
235 dl 1.13 }
236 dl 1.15 } while (status >= 0);
237     break;
238 jsr166 1.1 }
239     }
240     }
241     }
242    
243     /**
244 dl 1.15 * Blocks a non-worker-thread until completion.
245     * @return status on exit
246 jsr166 1.1 */
247 dl 1.15 private int externalAwaitDone() {
248     int s;
249     while ((s = status) >= 0) {
250     synchronized(this) {
251     if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
252     boolean interrupted = false;
253     do {
254     try {
255 dl 1.14 wait();
256 dl 1.15 } catch (InterruptedException ie) {
257     interrupted = true;
258 dl 1.13 }
259 dl 1.15 } while ((s = status) >= 0);
260     if (interrupted)
261     Thread.currentThread().interrupt();
262     break;
263 dl 1.13 }
264 jsr166 1.1 }
265     }
266 dl 1.15 return s;
267 jsr166 1.1 }
268    
269     /**
270 dl 1.15 * Unless done, calls exec and records status if completed, but
271     * doesn't wait for completion otherwise.
272 jsr166 1.1 */
273 dl 1.15 final void tryExec() {
274     try {
275     if (status < 0 || !exec())
276     return;
277     } catch (Throwable rex) {
278     setExceptionalCompletion(rex);
279     return;
280 jsr166 1.1 }
281 dl 1.15 setCompletion(NORMAL); // must be outside try block
282 jsr166 1.1 }
283    
284     /**
285 dl 1.15 * If not done and this task is next in worker queue, runs it,
286     * else waits for it.
287     * @return status on exit
288     */
289     private int waitingJoin() {
290     int s = status;
291     if (s < 0)
292     return s;
293     Thread t = Thread.currentThread();
294     if (t instanceof ForkJoinWorkerThread) {
295     ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
296     if (w.unpushTask(this)) {
297     boolean completed;
298 dl 1.13 try {
299 dl 1.15 completed = exec();
300     } catch (Throwable rex) {
301     return setExceptionalCompletion(rex);
302 jsr166 1.1 }
303 dl 1.15 if (completed)
304     return setCompletion(NORMAL);
305 jsr166 1.1 }
306 dl 1.15 return w.pool.awaitJoin(this);
307 jsr166 1.1 }
308 dl 1.15 else
309     return externalAwaitDone();
310 jsr166 1.1 }
311    
312     /**
313 dl 1.15 * Unless done, calls exec and records status if completed, or
314     * waits for completion otherwise.
315     * @return status on exit
316 jsr166 1.1 */
317 dl 1.15 private int waitingInvoke() {
318     int s = status;
319     if (s < 0)
320     return s;
321     boolean completed;
322     try {
323     completed = exec();
324     } catch (Throwable rex) {
325     return setExceptionalCompletion(rex);
326 jsr166 1.1 }
327 dl 1.15 if (completed)
328     return setCompletion(NORMAL);
329     return waitingJoin();
330 jsr166 1.1 }
331    
332     /**
333 dl 1.15 * If this task is next in worker queue, runs it, else processes other
334     * tasks until complete.
335     * @return status on exit
336 jsr166 1.1 */
337 dl 1.15 private int busyJoin() {
338     int s = status;
339     if (s < 0)
340     return s;
341     ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
342     if (w.unpushTask(this)) {
343     boolean completed;
344     try {
345     completed = exec();
346     } catch (Throwable rex) {
347     return setExceptionalCompletion(rex);
348     }
349     if (completed)
350     return setCompletion(NORMAL);
351 jsr166 1.1 }
352 dl 1.15 return w.execWhileJoining(this);
353 jsr166 1.1 }
354    
355     /**
356 dl 1.15 * Returns result or throws exception associated with given status.
357     * @param s the status
358 jsr166 1.1 */
359 dl 1.15 private V reportResult(int s) {
360 jsr166 1.1 Throwable ex;
361 dl 1.15 if (s < NORMAL && (ex = getException()) != null)
362     UNSAFE.throwException(ex);
363     return getRawResult();
364 jsr166 1.1 }
365    
366     // public methods
367    
368     /**
369     * Arranges to asynchronously execute this task. While it is not
370     * necessarily enforced, it is a usage error to fork a task more
371 jsr166 1.6 * than once unless it has completed and been reinitialized.
372 jsr166 1.11 * Subsequent modifications to the state of this task or any data
373     * it operates on are not necessarily consistently observable by
374     * any thread other than the one executing it unless preceded by a
375     * call to {@link #join} or related methods, or a call to {@link
376     * #isDone} returning {@code true}.
377 jsr166 1.6 *
378     * <p>This method may be invoked only from within {@code
379     * ForkJoinTask} computations (as may be determined using method
380     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
381     * result in exceptions or errors, possibly including {@code
382     * ClassCastException}.
383 jsr166 1.2 *
384 jsr166 1.6 * @return {@code this}, to simplify usage
385 jsr166 1.1 */
386 jsr166 1.2 public final ForkJoinTask<V> fork() {
387 jsr166 1.1 ((ForkJoinWorkerThread) Thread.currentThread())
388     .pushTask(this);
389 jsr166 1.2 return this;
390 jsr166 1.1 }
391    
392     /**
393 jsr166 1.10 * Returns the result of the computation when it {@link #isDone is done}.
394 jsr166 1.6 * This method differs from {@link #get()} in that
395     * abnormal completion results in {@code RuntimeException} or
396     * {@code Error}, not {@code ExecutionException}.
397 jsr166 1.1 *
398     * @return the computed result
399     */
400     public final V join() {
401 dl 1.15 return reportResult(waitingJoin());
402 jsr166 1.1 }
403    
404     /**
405     * Commences performing this task, awaits its completion if
406 jsr166 1.8 * necessary, and return its result, or throws an (unchecked)
407     * exception if the underlying computation did so.
408 jsr166 1.1 *
409     * @return the computed result
410     */
411     public final V invoke() {
412 dl 1.15 return reportResult(waitingInvoke());
413 jsr166 1.1 }
414    
415     /**
416 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds for
417     * each task or an (unchecked) exception is encountered, in which
418 jsr166 1.9 * case the exception is rethrown. If either task encounters an
419     * exception, the other one may be, but is not guaranteed to be,
420     * cancelled. If both tasks throw an exception, then this method
421     * throws one of them. The individual status of each task may be
422 jsr166 1.8 * checked using {@link #getException()} and related methods.
423 jsr166 1.6 *
424     * <p>This method may be invoked only from within {@code
425     * ForkJoinTask} computations (as may be determined using method
426     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
427     * result in exceptions or errors, possibly including {@code
428     * ClassCastException}.
429     *
430     * @param t1 the first task
431     * @param t2 the second task
432     * @throws NullPointerException if any task is null
433 jsr166 1.1 */
434 jsr166 1.6 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
435 jsr166 1.1 t2.fork();
436     t1.invoke();
437     t2.join();
438     }
439    
440     /**
441 jsr166 1.6 * Forks the given tasks, returning when {@code isDone} holds for
442 jsr166 1.8 * each task or an (unchecked) exception is encountered, in which
443     * case the exception is rethrown. If any task encounters an
444     * exception, others may be, but are not guaranteed to be,
445     * cancelled. If more than one task encounters an exception, then
446     * this method throws any one of these exceptions. The individual
447     * status of each task may be checked using {@link #getException()}
448     * and related methods.
449 jsr166 1.6 *
450     * <p>This method may be invoked only from within {@code
451     * ForkJoinTask} computations (as may be determined using method
452     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
453     * result in exceptions or errors, possibly including {@code
454     * ClassCastException}.
455     *
456     * @param tasks the tasks
457 jsr166 1.8 * @throws NullPointerException if any task is null
458 jsr166 1.1 */
459     public static void invokeAll(ForkJoinTask<?>... tasks) {
460     Throwable ex = null;
461     int last = tasks.length - 1;
462     for (int i = last; i >= 0; --i) {
463     ForkJoinTask<?> t = tasks[i];
464     if (t == null) {
465     if (ex == null)
466     ex = new NullPointerException();
467     }
468     else if (i != 0)
469     t.fork();
470 dl 1.15 else if (t.waitingInvoke() < NORMAL && ex == null)
471     ex = t.getException();
472 jsr166 1.1 }
473     for (int i = 1; i <= last; ++i) {
474     ForkJoinTask<?> t = tasks[i];
475     if (t != null) {
476     if (ex != null)
477     t.cancel(false);
478 dl 1.15 else if (t.waitingJoin() < NORMAL && ex == null)
479     ex = t.getException();
480 jsr166 1.1 }
481     }
482     if (ex != null)
483 dl 1.13 UNSAFE.throwException(ex);
484 jsr166 1.1 }
485    
486     /**
487 jsr166 1.7 * Forks all tasks in the specified collection, returning when
488 jsr166 1.8 * {@code isDone} holds for each task or an (unchecked) exception
489     * is encountered. If any task encounters an exception, others
490     * may be, but are not guaranteed to be, cancelled. If more than
491     * one task encounters an exception, then this method throws any
492     * one of these exceptions. The individual status of each task
493     * may be checked using {@link #getException()} and related
494     * methods. The behavior of this operation is undefined if the
495     * specified collection is modified while the operation is in
496     * progress.
497 jsr166 1.6 *
498     * <p>This method may be invoked only from within {@code
499     * ForkJoinTask} computations (as may be determined using method
500     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
501     * result in exceptions or errors, possibly including {@code
502     * ClassCastException}.
503 jsr166 1.1 *
504     * @param tasks the collection of tasks
505 jsr166 1.2 * @return the tasks argument, to simplify usage
506 jsr166 1.1 * @throws NullPointerException if tasks or any element are null
507     */
508 jsr166 1.2 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
509 jsr166 1.7 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
510 jsr166 1.1 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
511 jsr166 1.2 return tasks;
512 jsr166 1.1 }
513     @SuppressWarnings("unchecked")
514     List<? extends ForkJoinTask<?>> ts =
515     (List<? extends ForkJoinTask<?>>) tasks;
516     Throwable ex = null;
517     int last = ts.size() - 1;
518     for (int i = last; i >= 0; --i) {
519     ForkJoinTask<?> t = ts.get(i);
520     if (t == null) {
521     if (ex == null)
522     ex = new NullPointerException();
523     }
524     else if (i != 0)
525     t.fork();
526 dl 1.15 else if (t.waitingInvoke() < NORMAL && ex == null)
527     ex = t.getException();
528 jsr166 1.1 }
529     for (int i = 1; i <= last; ++i) {
530     ForkJoinTask<?> t = ts.get(i);
531     if (t != null) {
532     if (ex != null)
533     t.cancel(false);
534 dl 1.15 else if (t.waitingJoin() < NORMAL && ex == null)
535     ex = t.getException();
536 jsr166 1.1 }
537     }
538     if (ex != null)
539 dl 1.13 UNSAFE.throwException(ex);
540 jsr166 1.2 return tasks;
541 jsr166 1.1 }
542    
543     /**
544 jsr166 1.7 * Attempts to cancel execution of this task. This attempt will
545     * fail if the task has already completed, has already been
546     * cancelled, or could not be cancelled for some other reason. If
547     * successful, and this task has not started when cancel is
548     * called, execution of this task is suppressed, {@link
549     * #isCancelled} will report true, and {@link #join} will result
550     * in a {@code CancellationException} being thrown.
551 jsr166 1.1 *
552     * <p>This method may be overridden in subclasses, but if so, must
553     * still ensure that these minimal properties hold. In particular,
554 jsr166 1.6 * the {@code cancel} method itself must not throw exceptions.
555 jsr166 1.1 *
556 jsr166 1.6 * <p>This method is designed to be invoked by <em>other</em>
557 jsr166 1.1 * tasks. To terminate the current task, you can just return or
558     * throw an unchecked exception from its computation method, or
559 jsr166 1.4 * invoke {@link #completeExceptionally}.
560 jsr166 1.1 *
561     * @param mayInterruptIfRunning this value is ignored in the
562 jsr166 1.7 * default implementation because tasks are not
563 jsr166 1.1 * cancelled via interruption
564     *
565 jsr166 1.4 * @return {@code true} if this task is now cancelled
566 jsr166 1.1 */
567     public boolean cancel(boolean mayInterruptIfRunning) {
568     setCompletion(CANCELLED);
569     return (status & COMPLETION_MASK) == CANCELLED;
570     }
571    
572 dl 1.13 /**
573     * Cancels, ignoring any exceptions it throws. Used during worker
574     * and pool shutdown.
575     */
576     final void cancelIgnoringExceptions() {
577     try {
578     cancel(false);
579     } catch (Throwable ignore) {
580     }
581     }
582    
583     /**
584     * Cancels ignoring exceptions if worker is terminating
585     */
586     private void cancelIfTerminating() {
587     Thread t = Thread.currentThread();
588     if ((t instanceof ForkJoinWorkerThread) &&
589 dl 1.14 ((ForkJoinWorkerThread) t).isTerminating()) {
590 dl 1.13 try {
591     cancel(false);
592     } catch (Throwable ignore) {
593     }
594     }
595     }
596    
597 jsr166 1.8 public final boolean isDone() {
598     return status < 0;
599     }
600    
601     public final boolean isCancelled() {
602     return (status & COMPLETION_MASK) == CANCELLED;
603     }
604    
605     /**
606 jsr166 1.4 * Returns {@code true} if this task threw an exception or was cancelled.
607 jsr166 1.1 *
608 jsr166 1.4 * @return {@code true} if this task threw an exception or was cancelled
609 jsr166 1.1 */
610     public final boolean isCompletedAbnormally() {
611     return (status & COMPLETION_MASK) < NORMAL;
612     }
613    
614     /**
615 jsr166 1.8 * Returns {@code true} if this task completed without throwing an
616     * exception and was not cancelled.
617     *
618     * @return {@code true} if this task completed without throwing an
619     * exception and was not cancelled
620     */
621     public final boolean isCompletedNormally() {
622     return (status & COMPLETION_MASK) == NORMAL;
623     }
624    
625     /**
626 jsr166 1.1 * Returns the exception thrown by the base computation, or a
627 jsr166 1.6 * {@code CancellationException} if cancelled, or {@code null} if
628     * none or if the method has not yet completed.
629 jsr166 1.1 *
630 jsr166 1.4 * @return the exception, or {@code null} if none
631 jsr166 1.1 */
632     public final Throwable getException() {
633     int s = status & COMPLETION_MASK;
634 jsr166 1.8 return ((s >= NORMAL) ? null :
635     (s == CANCELLED) ? new CancellationException() :
636     exceptionMap.get(this));
637 jsr166 1.1 }
638    
639     /**
640     * Completes this task abnormally, and if not already aborted or
641     * cancelled, causes it to throw the given exception upon
642     * {@code join} and related operations. This method may be used
643     * to induce exceptions in asynchronous tasks, or to force
644     * completion of tasks that would not otherwise complete. Its use
645 jsr166 1.6 * in other situations is discouraged. This method is
646 jsr166 1.1 * overridable, but overridden versions must invoke {@code super}
647     * implementation to maintain guarantees.
648     *
649 jsr166 1.11 * @param ex the exception to throw. If this exception is not a
650     * {@code RuntimeException} or {@code Error}, the actual exception
651     * thrown will be a {@code RuntimeException} with cause {@code ex}.
652 jsr166 1.1 */
653     public void completeExceptionally(Throwable ex) {
654 dl 1.15 setExceptionalCompletion((ex instanceof RuntimeException) ||
655     (ex instanceof Error) ? ex :
656     new RuntimeException(ex));
657 jsr166 1.1 }
658    
659     /**
660     * Completes this task, and if not already aborted or cancelled,
661     * returning a {@code null} result upon {@code join} and related
662     * operations. This method may be used to provide results for
663     * asynchronous tasks, or to provide alternative handling for
664     * tasks that would not otherwise complete normally. Its use in
665 jsr166 1.6 * other situations is discouraged. This method is
666 jsr166 1.1 * overridable, but overridden versions must invoke {@code super}
667     * implementation to maintain guarantees.
668     *
669     * @param value the result value for this task
670     */
671     public void complete(V value) {
672     try {
673     setRawResult(value);
674     } catch (Throwable rex) {
675 dl 1.15 setExceptionalCompletion(rex);
676 jsr166 1.1 return;
677     }
678 dl 1.13 setCompletion(NORMAL);
679 jsr166 1.1 }
680    
681     public final V get() throws InterruptedException, ExecutionException {
682 dl 1.15 int s = waitingJoin() & COMPLETION_MASK;
683     if (Thread.interrupted())
684     throw new InterruptedException();
685     if (s < NORMAL) {
686     Throwable ex;
687     if (s == CANCELLED)
688     throw new CancellationException();
689     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
690     throw new ExecutionException(ex);
691     }
692     return getRawResult();
693 jsr166 1.1 }
694 dl 1.14
695 jsr166 1.1 public final V get(long timeout, TimeUnit unit)
696     throws InterruptedException, ExecutionException, TimeoutException {
697 dl 1.13 Thread t = Thread.currentThread();
698 dl 1.15 ForkJoinPool pool;
699 dl 1.13 if (t instanceof ForkJoinWorkerThread) {
700     ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
701 dl 1.15 if (status >= 0 && w.unpushTask(this))
702     tryExec();
703     pool = w.pool;
704 dl 1.13 }
705     else
706 dl 1.15 pool = null;
707     /*
708     * Timed wait loop intermixes cases for fj (pool != null) and
709     * non FJ threads. For FJ, decrement pool count but don't try
710     * for replacement; increment count on completion. For non-FJ,
711     * deal with interrupts. This is messy, but a little less so
712     * than is splitting the FJ and nonFJ cases.
713     */
714     boolean interrupted = false;
715     boolean dec = false; // true if pool count decremented
716     for (;;) {
717     if (Thread.interrupted() && pool == null) {
718     interrupted = true;
719     break;
720     }
721     int s = status;
722     if (s < 0)
723     break;
724     if (UNSAFE.compareAndSwapInt(this, statusOffset,
725     s, s | SIGNAL)) {
726     long startTime = System.nanoTime();
727     long nanos = unit.toNanos(timeout);
728     long nt; // wait time
729     while (status >= 0 &&
730     (nt = nanos - (System.nanoTime() - startTime)) > 0) {
731     if (pool != null && !dec)
732     dec = pool.tryDecrementRunningCount();
733     else {
734     long ms = nt / 1000000;
735     int ns = (int) (nt % 1000000);
736     try {
737     synchronized(this) {
738     if (status >= 0)
739     wait(ms, ns);
740     }
741     } catch (InterruptedException ie) {
742     if (pool != null)
743     cancelIfTerminating();
744     else {
745     interrupted = true;
746     break;
747     }
748     }
749     }
750     }
751     break;
752     }
753     }
754     if (pool != null && dec)
755     pool.updateRunningCount(1);
756     if (interrupted)
757     throw new InterruptedException();
758     int es = status & COMPLETION_MASK;
759     if (es != NORMAL) {
760     Throwable ex;
761     if (es == CANCELLED)
762     throw new CancellationException();
763     if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
764     throw new ExecutionException(ex);
765     throw new TimeoutException();
766     }
767     return getRawResult();
768 jsr166 1.1 }
769    
770     /**
771 jsr166 1.10 * Possibly executes other tasks until this task {@link #isDone is
772     * done}, then returns the result of the computation. This method
773     * may be more efficient than {@code join}, but is only applicable
774     * when there are no potential dependencies between continuation
775     * of the current task and that of any other task that might be
776     * executed while helping. (This usually holds for pure
777     * divide-and-conquer tasks).
778 jsr166 1.6 *
779     * <p>This method may be invoked only from within {@code
780     * ForkJoinTask} computations (as may be determined using method
781     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
782     * result in exceptions or errors, possibly including {@code
783     * ClassCastException}.
784 jsr166 1.1 *
785     * @return the computed result
786     */
787     public final V helpJoin() {
788 dl 1.15 return reportResult(busyJoin());
789 jsr166 1.1 }
790    
791     /**
792 jsr166 1.10 * Possibly executes other tasks until this task {@link #isDone is
793     * done}. This method may be useful when processing collections
794     * of tasks when some have been cancelled or otherwise known to
795     * have aborted.
796 jsr166 1.6 *
797     * <p>This method may be invoked only from within {@code
798     * ForkJoinTask} computations (as may be determined using method
799     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
800     * result in exceptions or errors, possibly including {@code
801     * ClassCastException}.
802 jsr166 1.1 */
803     public final void quietlyHelpJoin() {
804 dl 1.15 busyJoin();
805 jsr166 1.1 }
806    
807     /**
808     * Joins this task, without returning its result or throwing an
809     * exception. This method may be useful when processing
810     * collections of tasks when some have been cancelled or otherwise
811     * known to have aborted.
812     */
813     public final void quietlyJoin() {
814 dl 1.15 waitingJoin();
815 jsr166 1.1 }
816    
817     /**
818     * Commences performing this task and awaits its completion if
819     * necessary, without returning its result or throwing an
820     * exception. This method may be useful when processing
821     * collections of tasks when some have been cancelled or otherwise
822     * known to have aborted.
823     */
824     public final void quietlyInvoke() {
825 dl 1.15 waitingInvoke();
826 jsr166 1.1 }
827    
828     /**
829     * Possibly executes tasks until the pool hosting the current task
830 jsr166 1.7 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
831     * be of use in designs in which many tasks are forked, but none
832     * are explicitly joined, instead executing them until all are
833     * processed.
834 jsr166 1.6 *
835     * <p>This method may be invoked only from within {@code
836     * ForkJoinTask} computations (as may be determined using method
837     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
838     * result in exceptions or errors, possibly including {@code
839     * ClassCastException}.
840 jsr166 1.1 */
841     public static void helpQuiesce() {
842     ((ForkJoinWorkerThread) Thread.currentThread())
843     .helpQuiescePool();
844     }
845    
846     /**
847     * Resets the internal bookkeeping state of this task, allowing a
848     * subsequent {@code fork}. This method allows repeated reuse of
849     * this task, but only if reuse occurs when this task has either
850     * never been forked, or has been forked, then completed and all
851     * outstanding joins of this task have also completed. Effects
852 jsr166 1.6 * under any other usage conditions are not guaranteed.
853     * This method may be useful when executing
854 jsr166 1.1 * pre-constructed trees of subtasks in loops.
855     */
856     public void reinitialize() {
857     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
858     exceptionMap.remove(this);
859     status = 0;
860     }
861    
862     /**
863     * Returns the pool hosting the current task execution, or null
864     * if this task is executing outside of any ForkJoinPool.
865     *
866 jsr166 1.6 * @see #inForkJoinPool
867 jsr166 1.4 * @return the pool, or {@code null} if none
868 jsr166 1.1 */
869     public static ForkJoinPool getPool() {
870     Thread t = Thread.currentThread();
871     return (t instanceof ForkJoinWorkerThread) ?
872     ((ForkJoinWorkerThread) t).pool : null;
873     }
874    
875     /**
876     * Returns {@code true} if the current thread is executing as a
877     * ForkJoinPool computation.
878     *
879     * @return {@code true} if the current thread is executing as a
880     * ForkJoinPool computation, or false otherwise
881     */
882     public static boolean inForkJoinPool() {
883     return Thread.currentThread() instanceof ForkJoinWorkerThread;
884     }
885    
886     /**
887     * Tries to unschedule this task for execution. This method will
888     * typically succeed if this task is the most recently forked task
889     * by the current thread, and has not commenced executing in
890     * another thread. This method may be useful when arranging
891     * alternative local processing of tasks that could have been, but
892 jsr166 1.6 * were not, stolen.
893     *
894     * <p>This method may be invoked only from within {@code
895     * ForkJoinTask} computations (as may be determined using method
896     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
897     * result in exceptions or errors, possibly including {@code
898     * ClassCastException}.
899 jsr166 1.1 *
900 jsr166 1.4 * @return {@code true} if unforked
901 jsr166 1.1 */
902     public boolean tryUnfork() {
903     return ((ForkJoinWorkerThread) Thread.currentThread())
904     .unpushTask(this);
905     }
906    
907     /**
908     * Returns an estimate of the number of tasks that have been
909     * forked by the current worker thread but not yet executed. This
910     * value may be useful for heuristic decisions about whether to
911     * fork other tasks.
912     *
913 jsr166 1.6 * <p>This method may be invoked only from within {@code
914     * ForkJoinTask} computations (as may be determined using method
915     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
916     * result in exceptions or errors, possibly including {@code
917     * ClassCastException}.
918     *
919 jsr166 1.1 * @return the number of tasks
920     */
921     public static int getQueuedTaskCount() {
922     return ((ForkJoinWorkerThread) Thread.currentThread())
923     .getQueueSize();
924     }
925    
926     /**
927     * Returns an estimate of how many more locally queued tasks are
928     * held by the current worker thread than there are other worker
929     * threads that might steal them. This value may be useful for
930     * heuristic decisions about whether to fork other tasks. In many
931     * usages of ForkJoinTasks, at steady state, each worker should
932     * aim to maintain a small constant surplus (for example, 3) of
933     * tasks, and to process computations locally if this threshold is
934     * exceeded.
935     *
936 jsr166 1.6 * <p>This method may be invoked only from within {@code
937     * ForkJoinTask} computations (as may be determined using method
938     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
939     * result in exceptions or errors, possibly including {@code
940     * ClassCastException}.
941     *
942 jsr166 1.1 * @return the surplus number of tasks, which may be negative
943     */
944     public static int getSurplusQueuedTaskCount() {
945     return ((ForkJoinWorkerThread) Thread.currentThread())
946     .getEstimatedSurplusTaskCount();
947     }
948    
949     // Extension methods
950    
951     /**
952 jsr166 1.4 * Returns the result that would be returned by {@link #join}, even
953     * if this task completed abnormally, or {@code null} if this task
954     * is not known to have been completed. This method is designed
955     * to aid debugging, as well as to support extensions. Its use in
956     * any other context is discouraged.
957 jsr166 1.1 *
958 jsr166 1.4 * @return the result, or {@code null} if not completed
959 jsr166 1.1 */
960     public abstract V getRawResult();
961    
962     /**
963     * Forces the given value to be returned as a result. This method
964     * is designed to support extensions, and should not in general be
965     * called otherwise.
966     *
967     * @param value the value
968     */
969     protected abstract void setRawResult(V value);
970    
971     /**
972     * Immediately performs the base action of this task. This method
973     * is designed to support extensions, and should not in general be
974     * called otherwise. The return value controls whether this task
975     * is considered to be done normally. It may return false in
976     * asynchronous actions that require explicit invocations of
977 jsr166 1.8 * {@link #complete} to become joinable. It may also throw an
978     * (unchecked) exception to indicate abnormal exit.
979 jsr166 1.1 *
980 jsr166 1.4 * @return {@code true} if completed normally
981 jsr166 1.1 */
982     protected abstract boolean exec();
983    
984     /**
985 jsr166 1.5 * Returns, but does not unschedule or execute, a task queued by
986     * the current thread but not yet executed, if one is immediately
987 jsr166 1.1 * available. There is no guarantee that this task will actually
988 jsr166 1.5 * be polled or executed next. Conversely, this method may return
989     * null even if a task exists but cannot be accessed without
990     * contention with other threads. This method is designed
991     * primarily to support extensions, and is unlikely to be useful
992 jsr166 1.6 * otherwise.
993     *
994     * <p>This method may be invoked only from within {@code
995     * ForkJoinTask} computations (as may be determined using method
996     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
997     * result in exceptions or errors, possibly including {@code
998     * ClassCastException}.
999 jsr166 1.1 *
1000 jsr166 1.4 * @return the next task, or {@code null} if none are available
1001 jsr166 1.1 */
1002     protected static ForkJoinTask<?> peekNextLocalTask() {
1003     return ((ForkJoinWorkerThread) Thread.currentThread())
1004     .peekTask();
1005     }
1006    
1007     /**
1008     * Unschedules and returns, without executing, the next task
1009     * queued by the current thread but not yet executed. This method
1010     * is designed primarily to support extensions, and is unlikely to
1011 jsr166 1.6 * be useful otherwise.
1012     *
1013     * <p>This method may be invoked only from within {@code
1014     * ForkJoinTask} computations (as may be determined using method
1015     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1016     * result in exceptions or errors, possibly including {@code
1017     * ClassCastException}.
1018 jsr166 1.1 *
1019 jsr166 1.4 * @return the next task, or {@code null} if none are available
1020 jsr166 1.1 */
1021     protected static ForkJoinTask<?> pollNextLocalTask() {
1022     return ((ForkJoinWorkerThread) Thread.currentThread())
1023     .pollLocalTask();
1024     }
1025    
1026     /**
1027     * Unschedules and returns, without executing, the next task
1028     * queued by the current thread but not yet executed, if one is
1029     * available, or if not available, a task that was forked by some
1030     * other thread, if available. Availability may be transient, so a
1031     * {@code null} result does not necessarily imply quiescence
1032     * of the pool this task is operating in. This method is designed
1033     * primarily to support extensions, and is unlikely to be useful
1034 jsr166 1.6 * otherwise.
1035     *
1036     * <p>This method may be invoked only from within {@code
1037     * ForkJoinTask} computations (as may be determined using method
1038     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1039     * result in exceptions or errors, possibly including {@code
1040     * ClassCastException}.
1041 jsr166 1.1 *
1042 jsr166 1.4 * @return a task, or {@code null} if none are available
1043 jsr166 1.1 */
1044     protected static ForkJoinTask<?> pollTask() {
1045     return ((ForkJoinWorkerThread) Thread.currentThread())
1046     .pollTask();
1047     }
1048    
1049 jsr166 1.5 /**
1050     * Adaptor for Runnables. This implements RunnableFuture
1051     * to be compliant with AbstractExecutorService constraints
1052     * when used in ForkJoinPool.
1053     */
1054     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1055     implements RunnableFuture<T> {
1056     final Runnable runnable;
1057     final T resultOnCompletion;
1058     T result;
1059     AdaptedRunnable(Runnable runnable, T result) {
1060     if (runnable == null) throw new NullPointerException();
1061     this.runnable = runnable;
1062     this.resultOnCompletion = result;
1063     }
1064     public T getRawResult() { return result; }
1065     public void setRawResult(T v) { result = v; }
1066     public boolean exec() {
1067     runnable.run();
1068     result = resultOnCompletion;
1069     return true;
1070     }
1071     public void run() { invoke(); }
1072     private static final long serialVersionUID = 5232453952276885070L;
1073     }
1074    
1075     /**
1076     * Adaptor for Callables
1077     */
1078     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1079     implements RunnableFuture<T> {
1080 jsr166 1.6 final Callable<? extends T> callable;
1081 jsr166 1.5 T result;
1082 jsr166 1.6 AdaptedCallable(Callable<? extends T> callable) {
1083 jsr166 1.5 if (callable == null) throw new NullPointerException();
1084     this.callable = callable;
1085     }
1086     public T getRawResult() { return result; }
1087     public void setRawResult(T v) { result = v; }
1088     public boolean exec() {
1089     try {
1090     result = callable.call();
1091     return true;
1092     } catch (Error err) {
1093     throw err;
1094     } catch (RuntimeException rex) {
1095     throw rex;
1096     } catch (Exception ex) {
1097     throw new RuntimeException(ex);
1098     }
1099     }
1100     public void run() { invoke(); }
1101     private static final long serialVersionUID = 2838392045355241008L;
1102     }
1103 jsr166 1.2
1104     /**
1105 jsr166 1.6 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1106     * method of the given {@code Runnable} as its action, and returns
1107     * a null result upon {@link #join}.
1108 jsr166 1.2 *
1109     * @param runnable the runnable action
1110     * @return the task
1111     */
1112 jsr166 1.6 public static ForkJoinTask<?> adapt(Runnable runnable) {
1113 jsr166 1.5 return new AdaptedRunnable<Void>(runnable, null);
1114 jsr166 1.2 }
1115    
1116     /**
1117 jsr166 1.6 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1118     * method of the given {@code Runnable} as its action, and returns
1119     * the given result upon {@link #join}.
1120 jsr166 1.2 *
1121     * @param runnable the runnable action
1122     * @param result the result upon completion
1123     * @return the task
1124     */
1125     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1126 jsr166 1.5 return new AdaptedRunnable<T>(runnable, result);
1127 jsr166 1.2 }
1128    
1129     /**
1130 jsr166 1.6 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1131     * method of the given {@code Callable} as its action, and returns
1132     * its result upon {@link #join}, translating any checked exceptions
1133     * encountered into {@code RuntimeException}.
1134 jsr166 1.2 *
1135     * @param callable the callable action
1136     * @return the task
1137     */
1138 jsr166 1.6 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1139 jsr166 1.5 return new AdaptedCallable<T>(callable);
1140 jsr166 1.2 }
1141    
1142 jsr166 1.1 // Serialization support
1143    
1144     private static final long serialVersionUID = -7721805057305804111L;
1145    
1146     /**
1147 jsr166 1.12 * Saves the state to a stream.
1148 jsr166 1.1 *
1149     * @serialData the current run status and the exception thrown
1150 jsr166 1.4 * during execution, or {@code null} if none
1151 jsr166 1.1 * @param s the stream
1152     */
1153     private void writeObject(java.io.ObjectOutputStream s)
1154     throws java.io.IOException {
1155     s.defaultWriteObject();
1156     s.writeObject(getException());
1157     }
1158    
1159     /**
1160 jsr166 1.12 * Reconstitutes the instance from a stream.
1161 jsr166 1.1 *
1162     * @param s the stream
1163     */
1164     private void readObject(java.io.ObjectInputStream s)
1165     throws java.io.IOException, ClassNotFoundException {
1166     s.defaultReadObject();
1167 dl 1.15 status |= SIGNAL; // conservatively set external signal
1168 jsr166 1.1 Object ex = s.readObject();
1169     if (ex != null)
1170 dl 1.15 setExceptionalCompletion((Throwable) ex);
1171 jsr166 1.1 }
1172    
1173 jsr166 1.3 // Unsafe mechanics
1174 jsr166 1.1
1175 jsr166 1.3 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1176     private static final long statusOffset =
1177     objectFieldOffset("status", ForkJoinTask.class);
1178    
1179     private static long objectFieldOffset(String field, Class<?> klazz) {
1180 jsr166 1.1 try {
1181 jsr166 1.3 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1182 jsr166 1.1 } catch (NoSuchFieldException e) {
1183 jsr166 1.3 // Convert Exception to corresponding Error
1184     NoSuchFieldError error = new NoSuchFieldError(field);
1185 jsr166 1.1 error.initCause(e);
1186     throw error;
1187     }
1188     }
1189     }