ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
Revision: 1.5
Committed: Fri Jul 31 20:41:13 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.4: +71 -14 lines
Log Message:
sync with jsr166 package

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.io.Serializable;
10     import java.util.Collection;
11     import java.util.Collections;
12     import java.util.List;
13     import java.util.Map;
14     import java.util.WeakHashMap;
15    
16     /**
17     * Abstract base class for tasks that run within a {@link
18     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
19     * lighter weight than a normal thread. Huge numbers of tasks and
20     * subtasks may be hosted by a small number of actual threads in a
21     * ForkJoinPool, at the price of some usage limitations.
22     *
23     * <p> A "main" ForkJoinTask begins execution when submitted to a
24     * {@link ForkJoinPool}. Once started, it will usually in turn start
25     * other subtasks. As indicated by the name of this class, many
26 jsr166 1.4 * programs using ForkJoinTasks employ only methods {@code fork} and
27     * {@code join}, or derivatives such as {@code invokeAll}. However,
28     * this class also provides a number of other methods that can come
29     * into play in advanced usages, as well as extension mechanics that
30     * allow support of new forms of fork/join processing.
31 jsr166 1.1 *
32     * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39     * been computed. Computations should avoid {@code synchronized}
40     * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42     * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49     * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51     * not permitting checked exceptions such as {@code IOExceptions}
52     * to be thrown. However, computations may still encounter unchecked
53     * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58     * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61     * waits for completion and report results using {@code Future}
62     * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66     * equivalent to {@code fork(); join()} but always attempts to
67     * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71     * Method {@code invokeAll} (available in multiple versions)
72     * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79     * established in a constructor, and then defines a {@code compute}
80     * method that somehow uses the control methods supplied by this base
81     * class. While these methods have {@code public} access (to allow
82     * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84     * ForkJoinTasks (as may be determined using method {@link
85     * #inForkJoinPool}). Attempts to invoke them in other contexts
86     * result in exceptions or errors, possibly including
87     * ClassCastException.
88     *
89     * <p>Most base support methods are {@code final} because their
90     * implementations are intrinsically tied to the underlying
91     * lightweight task scheduling framework, and so cannot be overridden.
92     * Developers creating new basic styles of fork/join processing should
93     * minimally implement {@code protected} methods
94 jsr166 1.4 * {@link #exec}, {@link #setRawResult}, and
95     * {@link #getRawResult}, while also introducing an abstract
96 jsr166 1.1 * computational method that can be implemented in its subclasses,
97     * possibly relying on other {@code protected} methods provided
98     * by this class.
99     *
100     * <p>ForkJoinTasks should perform relatively small amounts of
101     * computations, otherwise splitting into smaller tasks. As a very
102     * rough rule of thumb, a task should perform more than 100 and less
103     * than 10000 basic computational steps. If tasks are too big, then
104     * parallelism cannot improve throughput. If too small, then memory
105     * and internal task maintenance overhead may overwhelm processing.
106     *
107     * <p>ForkJoinTasks are {@code Serializable}, which enables them
108     * to be used in extensions such as remote execution frameworks. It is
109     * in general sensible to serialize tasks only before or after, but
110     * not during execution. Serialization is not relied on during
111     * execution itself.
112     *
113     * @since 1.7
114     * @author Doug Lea
115     */
116     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
117    
118     /**
119     * Run control status bits packed into a single int to minimize
120     * footprint and to ensure atomicity (via CAS). Status is
121     * initially zero, and takes on nonnegative values until
122     * completed, upon which status holds COMPLETED. CANCELLED, or
123     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
124     * blocking waits by other threads have SIGNAL_MASK bits set --
125     * bit 15 for external (nonFJ) waits, and the rest a count of
126     * waiting FJ threads. (This representation relies on
127     * ForkJoinPool max thread limits). Completion of a stolen task
128     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
129     * though suboptimal for some purposes, we use basic builtin
130     * wait/notify to take advantage of "monitor inflation" in JVMs
131     * that we would otherwise need to emulate to avoid adding further
132     * per-task bookkeeping overhead. Note that bits 16-28 are
133     * currently unused. Also value 0x80000000 is available as spare
134     * completion value.
135     */
136     volatile int status; // accessed directly by pool and workers
137    
138     static final int COMPLETION_MASK = 0xe0000000;
139     static final int NORMAL = 0xe0000000; // == mask
140     static final int CANCELLED = 0xc0000000;
141     static final int EXCEPTIONAL = 0xa0000000;
142     static final int SIGNAL_MASK = 0x0000ffff;
143     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
144     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
145    
146     /**
147     * Table of exceptions thrown by tasks, to enable reporting by
148     * callers. Because exceptions are rare, we don't directly keep
149     * them with task objects, but instead use a weak ref table. Note
150     * that cancellation exceptions don't appear in the table, but are
151     * instead recorded as status values.
152     * TODO: Use ConcurrentReferenceHashMap
153     */
154     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
155     Collections.synchronizedMap
156     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
157    
158     // within-package utilities
159    
160     /**
161     * Gets current worker thread, or null if not a worker thread.
162     */
163     static ForkJoinWorkerThread getWorker() {
164     Thread t = Thread.currentThread();
165     return ((t instanceof ForkJoinWorkerThread) ?
166     (ForkJoinWorkerThread) t : null);
167     }
168    
169     final boolean casStatus(int cmp, int val) {
170     return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
171     }
172    
173     /**
174     * Workaround for not being able to rethrow unchecked exceptions.
175     */
176     static void rethrowException(Throwable ex) {
177     if (ex != null)
178     UNSAFE.throwException(ex);
179     }
180    
181     // Setting completion status
182    
183     /**
184     * Marks completion and wakes up threads waiting to join this task.
185     *
186     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
187     */
188     final void setCompletion(int completion) {
189     ForkJoinPool pool = getPool();
190     if (pool != null) {
191     int s; // Clear signal bits while setting completion status
192     do {} while ((s = status) >= 0 && !casStatus(s, completion));
193    
194     if ((s & SIGNAL_MASK) != 0) {
195     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
196     pool.updateRunningCount(s);
197     synchronized (this) { notifyAll(); }
198     }
199     }
200     else
201     externallySetCompletion(completion);
202     }
203    
204     /**
205     * Version of setCompletion for non-FJ threads. Leaves signal
206     * bits for unblocked threads to adjust, and always notifies.
207     */
208     private void externallySetCompletion(int completion) {
209     int s;
210     do {} while ((s = status) >= 0 &&
211     !casStatus(s, (s & SIGNAL_MASK) | completion));
212     synchronized (this) { notifyAll(); }
213     }
214    
215     /**
216     * Sets status to indicate normal completion.
217     */
218     final void setNormalCompletion() {
219     // Try typical fast case -- single CAS, no signal, not already done.
220     // Manually expand casStatus to improve chances of inlining it
221     if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
222     setCompletion(NORMAL);
223     }
224    
225     // internal waiting and notification
226    
227     /**
228     * Performs the actual monitor wait for awaitDone.
229     */
230     private void doAwaitDone() {
231     // Minimize lock bias and in/de-flation effects by maximizing
232     // chances of waiting inside sync
233     try {
234     while (status >= 0)
235     synchronized (this) { if (status >= 0) wait(); }
236     } catch (InterruptedException ie) {
237     onInterruptedWait();
238     }
239     }
240    
241     /**
242     * Performs the actual timed monitor wait for awaitDone.
243     */
244     private void doAwaitDone(long startTime, long nanos) {
245     synchronized (this) {
246     try {
247     while (status >= 0) {
248 jsr166 1.5 long nt = nanos - (System.nanoTime() - startTime);
249 jsr166 1.1 if (nt <= 0)
250     break;
251     wait(nt / 1000000, (int) (nt % 1000000));
252     }
253     } catch (InterruptedException ie) {
254     onInterruptedWait();
255     }
256     }
257     }
258    
259     // Awaiting completion
260    
261     /**
262     * Sets status to indicate there is joiner, then waits for join,
263     * surrounded with pool notifications.
264     *
265     * @return status upon exit
266     */
267     private int awaitDone(ForkJoinWorkerThread w,
268     boolean maintainParallelism) {
269     ForkJoinPool pool = (w == null) ? null : w.pool;
270     int s;
271     while ((s = status) >= 0) {
272     if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
273     if (pool == null || !pool.preJoin(this, maintainParallelism))
274     doAwaitDone();
275     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
276     adjustPoolCountsOnUnblock(pool);
277     break;
278     }
279     }
280     return s;
281     }
282    
283     /**
284     * Timed version of awaitDone
285     *
286     * @return status upon exit
287     */
288     private int awaitDone(ForkJoinWorkerThread w, long nanos) {
289     ForkJoinPool pool = (w == null) ? null : w.pool;
290     int s;
291     while ((s = status) >= 0) {
292     if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
293     long startTime = System.nanoTime();
294     if (pool == null || !pool.preJoin(this, false))
295     doAwaitDone(startTime, nanos);
296     if ((s = status) >= 0) {
297     adjustPoolCountsOnCancelledWait(pool);
298     s = status;
299     }
300     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
301     adjustPoolCountsOnUnblock(pool);
302     break;
303     }
304     }
305     return s;
306     }
307    
308     /**
309     * Notifies pool that thread is unblocked. Called by signalled
310     * threads when woken by non-FJ threads (which is atypical).
311     */
312     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
313     int s;
314     do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
315     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
316     pool.updateRunningCount(s);
317     }
318    
319     /**
320     * Notifies pool to adjust counts on cancelled or timed out wait.
321     */
322     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
323     if (pool != null) {
324     int s;
325     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
326     if (casStatus(s, s - 1)) {
327     pool.updateRunningCount(1);
328     break;
329     }
330     }
331     }
332     }
333    
334     /**
335     * Handles interruptions during waits.
336     */
337     private void onInterruptedWait() {
338     ForkJoinWorkerThread w = getWorker();
339     if (w == null)
340     Thread.currentThread().interrupt(); // re-interrupt
341     else if (w.isTerminating())
342     cancelIgnoringExceptions();
343     // else if FJworker, ignore interrupt
344     }
345    
346     // Recording and reporting exceptions
347    
348     private void setDoneExceptionally(Throwable rex) {
349     exceptionMap.put(this, rex);
350     setCompletion(EXCEPTIONAL);
351     }
352    
353     /**
354     * Throws the exception associated with status s.
355     *
356     * @throws the exception
357     */
358     private void reportException(int s) {
359     if ((s &= COMPLETION_MASK) < NORMAL) {
360     if (s == CANCELLED)
361     throw new CancellationException();
362     else
363     rethrowException(exceptionMap.get(this));
364     }
365     }
366    
367     /**
368     * Returns result or throws exception using j.u.c.Future conventions.
369     * Only call when {@code isDone} known to be true.
370     */
371     private V reportFutureResult()
372     throws ExecutionException, InterruptedException {
373     int s = status & COMPLETION_MASK;
374     if (s < NORMAL) {
375     Throwable ex;
376     if (s == CANCELLED)
377     throw new CancellationException();
378     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
379     throw new ExecutionException(ex);
380     if (Thread.interrupted())
381     throw new InterruptedException();
382     }
383     return getRawResult();
384     }
385    
386     /**
387     * Returns result or throws exception using j.u.c.Future conventions
388     * with timeouts.
389     */
390     private V reportTimedFutureResult()
391     throws InterruptedException, ExecutionException, TimeoutException {
392     Throwable ex;
393     int s = status & COMPLETION_MASK;
394     if (s == NORMAL)
395     return getRawResult();
396     if (s == CANCELLED)
397     throw new CancellationException();
398     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
399     throw new ExecutionException(ex);
400     if (Thread.interrupted())
401     throw new InterruptedException();
402     throw new TimeoutException();
403     }
404    
405     // internal execution methods
406    
407     /**
408     * Calls exec, recording completion, and rethrowing exception if
409     * encountered. Caller should normally check status before calling.
410     *
411     * @return true if completed normally
412     */
413     private boolean tryExec() {
414     try { // try block must contain only call to exec
415     if (!exec())
416     return false;
417     } catch (Throwable rex) {
418     setDoneExceptionally(rex);
419     rethrowException(rex);
420     return false; // not reached
421     }
422     setNormalCompletion();
423     return true;
424     }
425    
426     /**
427     * Main execution method used by worker threads. Invokes
428     * base computation unless already complete.
429     */
430     final void quietlyExec() {
431     if (status >= 0) {
432     try {
433     if (!exec())
434     return;
435     } catch (Throwable rex) {
436     setDoneExceptionally(rex);
437     return;
438     }
439     setNormalCompletion();
440     }
441     }
442    
443     /**
444     * Calls exec(), recording but not rethrowing exception.
445     * Caller should normally check status before calling.
446     *
447     * @return true if completed normally
448     */
449     private boolean tryQuietlyInvoke() {
450     try {
451     if (!exec())
452     return false;
453     } catch (Throwable rex) {
454     setDoneExceptionally(rex);
455     return false;
456     }
457     setNormalCompletion();
458     return true;
459     }
460    
461     /**
462     * Cancels, ignoring any exceptions it throws.
463     */
464     final void cancelIgnoringExceptions() {
465     try {
466     cancel(false);
467     } catch (Throwable ignore) {
468     }
469     }
470    
471     /**
472     * Main implementation of helpJoin
473     */
474     private int busyJoin(ForkJoinWorkerThread w) {
475     int s;
476     ForkJoinTask<?> t;
477     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
478     t.quietlyExec();
479     return (s >= 0) ? awaitDone(w, false) : s; // block if no work
480     }
481    
482     // public methods
483    
484     /**
485     * Arranges to asynchronously execute this task. While it is not
486     * necessarily enforced, it is a usage error to fork a task more
487     * than once unless it has completed and been reinitialized. This
488     * method may be invoked only from within ForkJoinTask
489     * computations (as may be determined using method {@link
490     * #inForkJoinPool}). Attempts to invoke in other contexts result
491     * in exceptions or errors, possibly including ClassCastException.
492 jsr166 1.2 *
493     * @return {@code this}, to simplify usage.
494 jsr166 1.1 */
495 jsr166 1.2 public final ForkJoinTask<V> fork() {
496 jsr166 1.1 ((ForkJoinWorkerThread) Thread.currentThread())
497     .pushTask(this);
498 jsr166 1.2 return this;
499 jsr166 1.1 }
500    
501     /**
502     * Returns the result of the computation when it is ready.
503     * This method differs from {@code get} in that abnormal
504     * completion results in RuntimeExceptions or Errors, not
505     * ExecutionExceptions.
506     *
507     * @return the computed result
508     */
509     public final V join() {
510     ForkJoinWorkerThread w = getWorker();
511     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
512     reportException(awaitDone(w, true));
513     return getRawResult();
514     }
515    
516     /**
517     * Commences performing this task, awaits its completion if
518     * necessary, and return its result.
519     *
520     * @throws Throwable (a RuntimeException, Error, or unchecked
521     * exception) if the underlying computation did so
522     * @return the computed result
523     */
524     public final V invoke() {
525     if (status >= 0 && tryExec())
526     return getRawResult();
527     else
528     return join();
529     }
530    
531     /**
532     * Forks both tasks, returning when {@code isDone} holds for
533     * both of them or an exception is encountered. This method may be
534     * invoked only from within ForkJoinTask computations (as may be
535     * determined using method {@link #inForkJoinPool}). Attempts to
536     * invoke in other contexts result in exceptions or errors,
537     * possibly including ClassCastException.
538     *
539     * @param t1 one task
540     * @param t2 the other task
541     * @throws NullPointerException if t1 or t2 are null
542     * @throws RuntimeException or Error if either task did so
543     */
544     public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
545     t2.fork();
546     t1.invoke();
547     t2.join();
548     }
549    
550     /**
551     * Forks the given tasks, returning when {@code isDone} holds
552     * for all of them. If any task encounters an exception, others
553     * may be cancelled. This method may be invoked only from within
554     * ForkJoinTask computations (as may be determined using method
555     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
556     * result in exceptions or errors, possibly including
557     * ClassCastException.
558     *
559     * @param tasks the array of tasks
560     * @throws NullPointerException if tasks or any element are null
561     * @throws RuntimeException or Error if any task did so
562     */
563     public static void invokeAll(ForkJoinTask<?>... tasks) {
564     Throwable ex = null;
565     int last = tasks.length - 1;
566     for (int i = last; i >= 0; --i) {
567     ForkJoinTask<?> t = tasks[i];
568     if (t == null) {
569     if (ex == null)
570     ex = new NullPointerException();
571     }
572     else if (i != 0)
573     t.fork();
574     else {
575     t.quietlyInvoke();
576     if (ex == null)
577     ex = t.getException();
578     }
579     }
580     for (int i = 1; i <= last; ++i) {
581     ForkJoinTask<?> t = tasks[i];
582     if (t != null) {
583     if (ex != null)
584     t.cancel(false);
585     else {
586     t.quietlyJoin();
587     if (ex == null)
588     ex = t.getException();
589     }
590     }
591     }
592     if (ex != null)
593     rethrowException(ex);
594     }
595    
596     /**
597     * Forks all tasks in the collection, returning when
598     * {@code isDone} holds for all of them. If any task
599     * encounters an exception, others may be cancelled. This method
600     * may be invoked only from within ForkJoinTask computations (as
601     * may be determined using method {@link
602     * #inForkJoinPool}). Attempts to invoke in other contexts result
603     * in exceptions or errors, possibly including ClassCastException.
604     *
605     * @param tasks the collection of tasks
606 jsr166 1.2 * @return the tasks argument, to simplify usage
607 jsr166 1.1 * @throws NullPointerException if tasks or any element are null
608     * @throws RuntimeException or Error if any task did so
609     */
610 jsr166 1.2 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
611 jsr166 1.1 if (!(tasks instanceof List<?>)) {
612     invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
613 jsr166 1.2 return tasks;
614 jsr166 1.1 }
615     @SuppressWarnings("unchecked")
616     List<? extends ForkJoinTask<?>> ts =
617     (List<? extends ForkJoinTask<?>>) tasks;
618     Throwable ex = null;
619     int last = ts.size() - 1;
620     for (int i = last; i >= 0; --i) {
621     ForkJoinTask<?> t = ts.get(i);
622     if (t == null) {
623     if (ex == null)
624     ex = new NullPointerException();
625     }
626     else if (i != 0)
627     t.fork();
628     else {
629     t.quietlyInvoke();
630     if (ex == null)
631     ex = t.getException();
632     }
633     }
634     for (int i = 1; i <= last; ++i) {
635     ForkJoinTask<?> t = ts.get(i);
636     if (t != null) {
637     if (ex != null)
638     t.cancel(false);
639     else {
640     t.quietlyJoin();
641     if (ex == null)
642     ex = t.getException();
643     }
644     }
645     }
646     if (ex != null)
647     rethrowException(ex);
648 jsr166 1.2 return tasks;
649 jsr166 1.1 }
650    
651     /**
652 jsr166 1.4 * Returns {@code true} if the computation performed by this task
653     * has completed (or has been cancelled).
654 jsr166 1.1 *
655 jsr166 1.4 * @return {@code true} if this computation has completed
656 jsr166 1.1 */
657     public final boolean isDone() {
658     return status < 0;
659     }
660    
661     /**
662 jsr166 1.4 * Returns {@code true} if this task was cancelled.
663 jsr166 1.1 *
664 jsr166 1.4 * @return {@code true} if this task was cancelled
665 jsr166 1.1 */
666     public final boolean isCancelled() {
667     return (status & COMPLETION_MASK) == CANCELLED;
668     }
669    
670     /**
671     * Asserts that the results of this task's computation will not be
672     * used. If a cancellation occurs before attempting to execute this
673 jsr166 1.4 * task, execution will be suppressed, {@link #isCancelled}
674     * will report true, and {@link #join} will result in a
675 jsr166 1.1 * {@code CancellationException} being thrown. Otherwise, when
676     * cancellation races with completion, there are no guarantees
677 jsr166 1.4 * about whether {@code isCancelled} will report {@code true},
678     * whether {@code join} will return normally or via an exception,
679     * or whether these behaviors will remain consistent upon repeated
680 jsr166 1.1 * invocation.
681     *
682     * <p>This method may be overridden in subclasses, but if so, must
683     * still ensure that these minimal properties hold. In particular,
684     * the cancel method itself must not throw exceptions.
685     *
686     * <p> This method is designed to be invoked by <em>other</em>
687     * tasks. To terminate the current task, you can just return or
688     * throw an unchecked exception from its computation method, or
689 jsr166 1.4 * invoke {@link #completeExceptionally}.
690 jsr166 1.1 *
691     * @param mayInterruptIfRunning this value is ignored in the
692     * default implementation because tasks are not in general
693     * cancelled via interruption
694     *
695 jsr166 1.4 * @return {@code true} if this task is now cancelled
696 jsr166 1.1 */
697     public boolean cancel(boolean mayInterruptIfRunning) {
698     setCompletion(CANCELLED);
699     return (status & COMPLETION_MASK) == CANCELLED;
700     }
701    
702     /**
703 jsr166 1.4 * Returns {@code true} if this task threw an exception or was cancelled.
704 jsr166 1.1 *
705 jsr166 1.4 * @return {@code true} if this task threw an exception or was cancelled
706 jsr166 1.1 */
707     public final boolean isCompletedAbnormally() {
708     return (status & COMPLETION_MASK) < NORMAL;
709     }
710    
711     /**
712     * Returns the exception thrown by the base computation, or a
713     * CancellationException if cancelled, or null if none or if the
714     * method has not yet completed.
715     *
716 jsr166 1.4 * @return the exception, or {@code null} if none
717 jsr166 1.1 */
718     public final Throwable getException() {
719     int s = status & COMPLETION_MASK;
720     if (s >= NORMAL)
721     return null;
722     if (s == CANCELLED)
723     return new CancellationException();
724     return exceptionMap.get(this);
725     }
726    
727     /**
728     * Completes this task abnormally, and if not already aborted or
729     * cancelled, causes it to throw the given exception upon
730     * {@code join} and related operations. This method may be used
731     * to induce exceptions in asynchronous tasks, or to force
732     * completion of tasks that would not otherwise complete. Its use
733     * in other situations is likely to be wrong. This method is
734     * overridable, but overridden versions must invoke {@code super}
735     * implementation to maintain guarantees.
736     *
737     * @param ex the exception to throw. If this exception is
738     * not a RuntimeException or Error, the actual exception thrown
739     * will be a RuntimeException with cause ex.
740     */
741     public void completeExceptionally(Throwable ex) {
742     setDoneExceptionally((ex instanceof RuntimeException) ||
743     (ex instanceof Error) ? ex :
744     new RuntimeException(ex));
745     }
746    
747     /**
748     * Completes this task, and if not already aborted or cancelled,
749     * returning a {@code null} result upon {@code join} and related
750     * operations. This method may be used to provide results for
751     * asynchronous tasks, or to provide alternative handling for
752     * tasks that would not otherwise complete normally. Its use in
753     * other situations is likely to be wrong. This method is
754     * overridable, but overridden versions must invoke {@code super}
755     * implementation to maintain guarantees.
756     *
757     * @param value the result value for this task
758     */
759     public void complete(V value) {
760     try {
761     setRawResult(value);
762     } catch (Throwable rex) {
763     setDoneExceptionally(rex);
764     return;
765     }
766     setNormalCompletion();
767     }
768    
769     public final V get() throws InterruptedException, ExecutionException {
770     ForkJoinWorkerThread w = getWorker();
771     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
772     awaitDone(w, true);
773     return reportFutureResult();
774     }
775    
776     public final V get(long timeout, TimeUnit unit)
777     throws InterruptedException, ExecutionException, TimeoutException {
778 jsr166 1.5 long nanos = unit.toNanos(timeout);
779 jsr166 1.1 ForkJoinWorkerThread w = getWorker();
780     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
781 jsr166 1.5 awaitDone(w, nanos);
782 jsr166 1.1 return reportTimedFutureResult();
783     }
784    
785     /**
786     * Possibly executes other tasks until this task is ready, then
787     * returns the result of the computation. This method may be more
788     * efficient than {@code join}, but is only applicable when
789     * there are no potential dependencies between continuation of the
790     * current task and that of any other task that might be executed
791     * while helping. (This usually holds for pure divide-and-conquer
792     * tasks). This method may be invoked only from within
793     * ForkJoinTask computations (as may be determined using method
794     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
795     * result in exceptions or errors, possibly including
796     * ClassCastException.
797     *
798     * @return the computed result
799     */
800     public final V helpJoin() {
801     ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
802     if (status < 0 || !w.unpushTask(this) || !tryExec())
803     reportException(busyJoin(w));
804     return getRawResult();
805     }
806    
807     /**
808     * Possibly executes other tasks until this task is ready. This
809     * method may be invoked only from within ForkJoinTask
810     * computations (as may be determined using method {@link
811     * #inForkJoinPool}). Attempts to invoke in other contexts result
812     * in exceptions or errors, possibly including ClassCastException.
813     */
814     public final void quietlyHelpJoin() {
815     if (status >= 0) {
816     ForkJoinWorkerThread w =
817     (ForkJoinWorkerThread) Thread.currentThread();
818     if (!w.unpushTask(this) || !tryQuietlyInvoke())
819     busyJoin(w);
820     }
821     }
822    
823     /**
824     * Joins this task, without returning its result or throwing an
825     * exception. This method may be useful when processing
826     * collections of tasks when some have been cancelled or otherwise
827     * known to have aborted.
828     */
829     public final void quietlyJoin() {
830     if (status >= 0) {
831     ForkJoinWorkerThread w = getWorker();
832     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
833     awaitDone(w, true);
834     }
835     }
836    
837     /**
838     * Commences performing this task and awaits its completion if
839     * necessary, without returning its result or throwing an
840     * exception. This method may be useful when processing
841     * collections of tasks when some have been cancelled or otherwise
842     * known to have aborted.
843     */
844     public final void quietlyInvoke() {
845     if (status >= 0 && !tryQuietlyInvoke())
846     quietlyJoin();
847     }
848    
849     /**
850     * Possibly executes tasks until the pool hosting the current task
851     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
852     * designs in which many tasks are forked, but none are explicitly
853     * joined, instead executing them until all are processed.
854     */
855     public static void helpQuiesce() {
856     ((ForkJoinWorkerThread) Thread.currentThread())
857     .helpQuiescePool();
858     }
859    
860     /**
861     * Resets the internal bookkeeping state of this task, allowing a
862     * subsequent {@code fork}. This method allows repeated reuse of
863     * this task, but only if reuse occurs when this task has either
864     * never been forked, or has been forked, then completed and all
865     * outstanding joins of this task have also completed. Effects
866     * under any other usage conditions are not guaranteed, and are
867     * almost surely wrong. This method may be useful when executing
868     * pre-constructed trees of subtasks in loops.
869     */
870     public void reinitialize() {
871     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
872     exceptionMap.remove(this);
873     status = 0;
874     }
875    
876     /**
877     * Returns the pool hosting the current task execution, or null
878     * if this task is executing outside of any ForkJoinPool.
879     *
880 jsr166 1.4 * @return the pool, or {@code null} if none
881 jsr166 1.1 */
882     public static ForkJoinPool getPool() {
883     Thread t = Thread.currentThread();
884     return (t instanceof ForkJoinWorkerThread) ?
885     ((ForkJoinWorkerThread) t).pool : null;
886     }
887    
888     /**
889     * Returns {@code true} if the current thread is executing as a
890     * ForkJoinPool computation.
891     *
892     * @return {@code true} if the current thread is executing as a
893     * ForkJoinPool computation, or false otherwise
894     */
895     public static boolean inForkJoinPool() {
896     return Thread.currentThread() instanceof ForkJoinWorkerThread;
897     }
898    
899     /**
900     * Tries to unschedule this task for execution. This method will
901     * typically succeed if this task is the most recently forked task
902     * by the current thread, and has not commenced executing in
903     * another thread. This method may be useful when arranging
904     * alternative local processing of tasks that could have been, but
905     * were not, stolen. This method may be invoked only from within
906     * ForkJoinTask computations (as may be determined using method
907     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
908     * result in exceptions or errors, possibly including
909     * ClassCastException.
910     *
911 jsr166 1.4 * @return {@code true} if unforked
912 jsr166 1.1 */
913     public boolean tryUnfork() {
914     return ((ForkJoinWorkerThread) Thread.currentThread())
915     .unpushTask(this);
916     }
917    
918     /**
919     * Returns an estimate of the number of tasks that have been
920     * forked by the current worker thread but not yet executed. This
921     * value may be useful for heuristic decisions about whether to
922     * fork other tasks.
923     *
924     * @return the number of tasks
925     */
926     public static int getQueuedTaskCount() {
927     return ((ForkJoinWorkerThread) Thread.currentThread())
928     .getQueueSize();
929     }
930    
931     /**
932     * Returns an estimate of how many more locally queued tasks are
933     * held by the current worker thread than there are other worker
934     * threads that might steal them. This value may be useful for
935     * heuristic decisions about whether to fork other tasks. In many
936     * usages of ForkJoinTasks, at steady state, each worker should
937     * aim to maintain a small constant surplus (for example, 3) of
938     * tasks, and to process computations locally if this threshold is
939     * exceeded.
940     *
941     * @return the surplus number of tasks, which may be negative
942     */
943     public static int getSurplusQueuedTaskCount() {
944     return ((ForkJoinWorkerThread) Thread.currentThread())
945     .getEstimatedSurplusTaskCount();
946     }
947    
948     // Extension methods
949    
950     /**
951 jsr166 1.4 * Returns the result that would be returned by {@link #join}, even
952     * if this task completed abnormally, or {@code null} if this task
953     * is not known to have been completed. This method is designed
954     * to aid debugging, as well as to support extensions. Its use in
955     * any other context is discouraged.
956 jsr166 1.1 *
957 jsr166 1.4 * @return the result, or {@code null} if not completed
958 jsr166 1.1 */
959     public abstract V getRawResult();
960    
961     /**
962     * Forces the given value to be returned as a result. This method
963     * is designed to support extensions, and should not in general be
964     * called otherwise.
965     *
966     * @param value the value
967     */
968     protected abstract void setRawResult(V value);
969    
970     /**
971     * Immediately performs the base action of this task. This method
972     * is designed to support extensions, and should not in general be
973     * called otherwise. The return value controls whether this task
974     * is considered to be done normally. It may return false in
975     * asynchronous actions that require explicit invocations of
976 jsr166 1.4 * {@link #complete} to become joinable. It may throw exceptions
977 jsr166 1.1 * to indicate abnormal exit.
978     *
979 jsr166 1.4 * @return {@code true} if completed normally
980 jsr166 1.1 * @throws Error or RuntimeException if encountered during computation
981     */
982     protected abstract boolean exec();
983    
984     /**
985 jsr166 1.5 * Returns, but does not unschedule or execute, a task queued by
986     * the current thread but not yet executed, if one is immediately
987 jsr166 1.1 * available. There is no guarantee that this task will actually
988 jsr166 1.5 * be polled or executed next. Conversely, this method may return
989     * null even if a task exists but cannot be accessed without
990     * contention with other threads. This method is designed
991     * primarily to support extensions, and is unlikely to be useful
992     * otherwise. This method may be invoked only from within
993     * ForkJoinTask computations (as may be determined using method
994     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
995     * result in exceptions or errors, possibly including
996     * ClassCastException.
997 jsr166 1.1 *
998 jsr166 1.4 * @return the next task, or {@code null} if none are available
999 jsr166 1.1 */
1000     protected static ForkJoinTask<?> peekNextLocalTask() {
1001     return ((ForkJoinWorkerThread) Thread.currentThread())
1002     .peekTask();
1003     }
1004    
1005     /**
1006     * Unschedules and returns, without executing, the next task
1007     * queued by the current thread but not yet executed. This method
1008     * is designed primarily to support extensions, and is unlikely to
1009     * be useful otherwise. This method may be invoked only from
1010     * within ForkJoinTask computations (as may be determined using
1011     * method {@link #inForkJoinPool}). Attempts to invoke in other
1012     * contexts result in exceptions or errors, possibly including
1013     * ClassCastException.
1014     *
1015 jsr166 1.4 * @return the next task, or {@code null} if none are available
1016 jsr166 1.1 */
1017     protected static ForkJoinTask<?> pollNextLocalTask() {
1018     return ((ForkJoinWorkerThread) Thread.currentThread())
1019     .pollLocalTask();
1020     }
1021    
1022     /**
1023     * Unschedules and returns, without executing, the next task
1024     * queued by the current thread but not yet executed, if one is
1025     * available, or if not available, a task that was forked by some
1026     * other thread, if available. Availability may be transient, so a
1027     * {@code null} result does not necessarily imply quiescence
1028     * of the pool this task is operating in. This method is designed
1029     * primarily to support extensions, and is unlikely to be useful
1030     * otherwise. This method may be invoked only from within
1031     * ForkJoinTask computations (as may be determined using method
1032     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1033     * result in exceptions or errors, possibly including
1034     * ClassCastException.
1035     *
1036 jsr166 1.4 * @return a task, or {@code null} if none are available
1037 jsr166 1.1 */
1038     protected static ForkJoinTask<?> pollTask() {
1039     return ((ForkJoinWorkerThread) Thread.currentThread())
1040     .pollTask();
1041     }
1042    
1043 jsr166 1.5 /**
1044     * Adaptor for Runnables. This implements RunnableFuture
1045     * to be compliant with AbstractExecutorService constraints
1046     * when used in ForkJoinPool.
1047     */
1048     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1049     implements RunnableFuture<T> {
1050     final Runnable runnable;
1051     final T resultOnCompletion;
1052     T result;
1053     AdaptedRunnable(Runnable runnable, T result) {
1054     if (runnable == null) throw new NullPointerException();
1055     this.runnable = runnable;
1056     this.resultOnCompletion = result;
1057     }
1058     public T getRawResult() { return result; }
1059     public void setRawResult(T v) { result = v; }
1060     public boolean exec() {
1061     runnable.run();
1062     result = resultOnCompletion;
1063     return true;
1064     }
1065     public void run() { invoke(); }
1066     private static final long serialVersionUID = 5232453952276885070L;
1067     }
1068    
1069     /**
1070     * Adaptor for Callables
1071     */
1072     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1073     implements RunnableFuture<T> {
1074     final Callable<T> callable;
1075     T result;
1076     AdaptedCallable(Callable<T> callable) {
1077     if (callable == null) throw new NullPointerException();
1078     this.callable = callable;
1079     }
1080     public T getRawResult() { return result; }
1081     public void setRawResult(T v) { result = v; }
1082     public boolean exec() {
1083     try {
1084     result = callable.call();
1085     return true;
1086     } catch (Error err) {
1087     throw err;
1088     } catch (RuntimeException rex) {
1089     throw rex;
1090     } catch (Exception ex) {
1091     throw new RuntimeException(ex);
1092     }
1093     }
1094     public void run() { invoke(); }
1095     private static final long serialVersionUID = 2838392045355241008L;
1096     }
1097 jsr166 1.2
1098     /**
1099     * Returns a new ForkJoinTask that performs the {@code run}
1100     * method of the given Runnable as its action, and returns a null
1101     * result upon {@code join}.
1102     *
1103     * @param runnable the runnable action
1104     * @return the task
1105     */
1106     public static ForkJoinTask<Void> adapt(Runnable runnable) {
1107 jsr166 1.5 return new AdaptedRunnable<Void>(runnable, null);
1108 jsr166 1.2 }
1109    
1110     /**
1111     * Returns a new ForkJoinTask that performs the {@code run}
1112     * method of the given Runnable as its action, and returns the
1113     * given result upon {@code join}.
1114     *
1115     * @param runnable the runnable action
1116     * @param result the result upon completion
1117     * @return the task
1118     */
1119     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1120 jsr166 1.5 return new AdaptedRunnable<T>(runnable, result);
1121 jsr166 1.2 }
1122    
1123     /**
1124     * Returns a new ForkJoinTask that performs the {@code call}
1125     * method of the given Callable as its action, and returns its
1126     * result upon {@code join}, translating any checked
1127     * exceptions encountered into {@code RuntimeException}.
1128     *
1129     * @param callable the callable action
1130     * @return the task
1131     */
1132     public static <T> ForkJoinTask<T> adapt(Callable<T> callable) {
1133 jsr166 1.5 return new AdaptedCallable<T>(callable);
1134 jsr166 1.2 }
1135    
1136 jsr166 1.1 // Serialization support
1137    
1138     private static final long serialVersionUID = -7721805057305804111L;
1139    
1140     /**
1141     * Save the state to a stream.
1142     *
1143     * @serialData the current run status and the exception thrown
1144 jsr166 1.4 * during execution, or {@code null} if none
1145 jsr166 1.1 * @param s the stream
1146     */
1147     private void writeObject(java.io.ObjectOutputStream s)
1148     throws java.io.IOException {
1149     s.defaultWriteObject();
1150     s.writeObject(getException());
1151     }
1152    
1153     /**
1154     * Reconstitute the instance from a stream.
1155     *
1156     * @param s the stream
1157     */
1158     private void readObject(java.io.ObjectInputStream s)
1159     throws java.io.IOException, ClassNotFoundException {
1160     s.defaultReadObject();
1161     status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1162     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1163     Object ex = s.readObject();
1164     if (ex != null)
1165     setDoneExceptionally((Throwable) ex);
1166     }
1167    
1168 jsr166 1.3 // Unsafe mechanics
1169 jsr166 1.1
1170 jsr166 1.3 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1171     private static final long statusOffset =
1172     objectFieldOffset("status", ForkJoinTask.class);
1173    
1174     private static long objectFieldOffset(String field, Class<?> klazz) {
1175 jsr166 1.1 try {
1176 jsr166 1.3 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1177 jsr166 1.1 } catch (NoSuchFieldException e) {
1178 jsr166 1.3 // Convert Exception to corresponding Error
1179     NoSuchFieldError error = new NoSuchFieldError(field);
1180 jsr166 1.1 error.initCause(e);
1181     throw error;
1182     }
1183     }
1184     }