ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.2
Committed: Wed Jan 7 16:07:37 2009 UTC (15 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.1: +350 -264 lines
Log Message:
Improved documentaion; moved methods to improve javadoc flow; regularized extension APIs

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21     *
22     * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25     * programs using ForkJoinTasks employ only methods <code>fork</code>
26     * and <code>join</code>, or derivatives such as
27     * <code>invokeAll</code>. However, this class also provides a number
28     * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31     *
32     * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39     * been computed. Computations should avoid <code>synchronized</code>
40     * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51     * not permitting checked exceptions such as <code>IOExceptions</code>
52     * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61     * waits for completion and report results using <code>Future</code>
62     * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66     * equivalent to <code>fork(); join()</code> but always attempts to
67     * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71     * Method <code>invokeAll</code> (available in multiple versions)
72     * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79     * established in a constructor, and then defines a <code>compute</code>
80     * method that somehow uses the control methods supplied by this base
81     * class. While these methods have <code>public</code> access (to allow
82     * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84     * ForkJoinTasks. Attempts to invoke them in other contexts result in
85     * exceptions or errors including ClassCastException.
86 dl 1.1 *
87 dl 1.2 * <p>Most base support methods are <code>final</code> because their
88 dl 1.1 * implementations are intrinsically tied to the underlying
89     * lightweight task scheduling framework, and so cannot be overridden.
90     * Developers creating new basic styles of fork/join processing should
91 dl 1.2 * minimally implement <code>protected</code> methods
92     * <code>exec</code>, <code>setRawResult</code>, and
93     * <code>getRawResult</code>, while also introducing an abstract
94     * computational method that can be implemented in its subclasses,
95     * possibly relying on other <code>protected</code> methods provided
96     * by this class.
97 dl 1.1 *
98     * <p>ForkJoinTasks should perform relatively small amounts of
99     * computations, othewise splitting into smaller tasks. As a very
100     * rough rule of thumb, a task should perform more than 100 and less
101     * than 10000 basic computational steps. If tasks are too big, then
102     * parellelism cannot improve throughput. If too small, then memory
103     * and internal task maintenance overhead may overwhelm processing.
104     *
105 dl 1.2 * <p>ForkJoinTasks are <code>Serializable</code>, which enables them
106     * to be used in extensions such as remote execution frameworks. It is
107     * in general sensible to serialize tasks only before or after, but
108 dl 1.1 * not during execution. Serialization is not relied on during
109     * execution itself.
110     */
111     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
112 dl 1.2
113 dl 1.1 /**
114 dl 1.2 * Run control status bits packed into a single int to minimize
115     * footprint and to ensure atomicity (via CAS). Status is
116     * initially zero, and takes on nonnegative values until
117 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
118     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
119     * blocking waits by other threads have SIGNAL_MASK bits set --
120     * bit 15 for external (nonFJ) waits, and the rest a count of
121     * waiting FJ threads. (This representation relies on
122     * ForkJoinPool max thread limits). Completion of a stolen task
123     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
124     * though suboptimal for some purposes, we use basic builtin
125     * wait/notify to take advantage of "monitor inflation" in JVMs
126     * that we would otherwise need to emulate to avoid adding further
127     * per-task bookkeeping overhead. Note that bits 16-28 are
128     * currently unused. Also value 0x80000000 is available as spare
129     * completion value.
130     */
131     volatile int status; // accessed directy by pool and workers
132    
133     static final int COMPLETION_MASK = 0xe0000000;
134     static final int NORMAL = 0xe0000000; // == mask
135     static final int CANCELLED = 0xc0000000;
136     static final int EXCEPTIONAL = 0xa0000000;
137     static final int SIGNAL_MASK = 0x0000ffff;
138     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
139     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
140    
141     /**
142     * Table of exceptions thrown by tasks, to enable reporting by
143     * callers. Because exceptions are rare, we don't directly keep
144     * them with task objects, but instead us a weak ref table. Note
145     * that cancellation exceptions don't appear in the table, but are
146     * instead recorded as status values.
147     * Todo: Use ConcurrentReferenceHashMap
148     */
149     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
150     Collections.synchronizedMap
151     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
152    
153     // within-package utilities
154    
155     /**
156     * Get current worker thread, or null if not a worker thread
157     */
158     static ForkJoinWorkerThread getWorker() {
159     Thread t = Thread.currentThread();
160     return ((t instanceof ForkJoinWorkerThread)?
161     (ForkJoinWorkerThread)t : null);
162     }
163    
164     final boolean casStatus(int cmp, int val) {
165     return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
166     }
167    
168     /**
169     * Workaround for not being able to rethrow unchecked exceptions.
170     */
171     static void rethrowException(Throwable ex) {
172     if (ex != null)
173     _unsafe.throwException(ex);
174     }
175    
176     // Setting completion status
177    
178     /**
179     * Mark completion and wake up threads waiting to join this task.
180     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
181     */
182     final void setCompletion(int completion) {
183 dl 1.2 ForkJoinPool pool = getPool();
184 dl 1.1 if (pool != null) {
185     int s; // Clear signal bits while setting completion status
186     do;while ((s = status) >= 0 && !casStatus(s, completion));
187    
188     if ((s & SIGNAL_MASK) != 0) {
189     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
190     pool.updateRunningCount(s);
191     synchronized(this) { notifyAll(); }
192     }
193     }
194     else
195     externallySetCompletion(completion);
196     }
197    
198     /**
199     * Version of setCompletion for non-FJ threads. Leaves signal
200     * bits for unblocked threads to adjust, and always notifies.
201     */
202     private void externallySetCompletion(int completion) {
203     int s;
204     do;while ((s = status) >= 0 &&
205     !casStatus(s, (s & SIGNAL_MASK) | completion));
206     synchronized(this) { notifyAll(); }
207     }
208    
209     /**
210     * Sets status to indicate normal completion
211     */
212     final void setNormalCompletion() {
213     // Try typical fast case -- single CAS, no signal, not already done.
214     // Manually expand casStatus to improve chances of inlining it
215     if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
216     setCompletion(NORMAL);
217     }
218    
219     // internal waiting and notification
220    
221     /**
222     * Performs the actual monitor wait for awaitDone
223     */
224     private void doAwaitDone() {
225     // Minimize lock bias and in/de-flation effects by maximizing
226     // chances of waiting inside sync
227     try {
228     while (status >= 0)
229     synchronized(this) { if (status >= 0) wait(); }
230     } catch (InterruptedException ie) {
231     onInterruptedWait();
232     }
233     }
234    
235     /**
236     * Performs the actual monitor wait for awaitDone
237     */
238     private void doAwaitDone(long startTime, long nanos) {
239     synchronized(this) {
240     try {
241     while (status >= 0) {
242     long nt = nanos - System.nanoTime() - startTime;
243     if (nt <= 0)
244     break;
245     wait(nt / 1000000, (int)(nt % 1000000));
246     }
247     } catch (InterruptedException ie) {
248     onInterruptedWait();
249     }
250     }
251     }
252    
253     // Awaiting completion
254    
255     /**
256     * Sets status to indicate there is joiner, then waits for join,
257     * surrounded with pool notifications.
258     * @return status upon exit
259     */
260     final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
261     ForkJoinPool pool = w == null? null : w.pool;
262     int s;
263     while ((s = status) >= 0) {
264     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
265     if (pool == null || !pool.preJoin(this, maintainParallelism))
266     doAwaitDone();
267     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
268     adjustPoolCountsOnUnblock(pool);
269     break;
270     }
271     }
272     return s;
273     }
274    
275     /**
276     * Timed version of awaitDone
277     * @return status upon exit
278     */
279     final int awaitDone(ForkJoinWorkerThread w, long nanos) {
280     ForkJoinPool pool = w == null? null : w.pool;
281     int s;
282     while ((s = status) >= 0) {
283     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
284     long startTime = System.nanoTime();
285     if (pool == null || !pool.preJoin(this, false))
286     doAwaitDone(startTime, nanos);
287     if ((s = status) >= 0) {
288     adjustPoolCountsOnCancelledWait(pool);
289     s = status;
290     }
291     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
292     adjustPoolCountsOnUnblock(pool);
293     break;
294     }
295     }
296     return s;
297     }
298    
299     /**
300     * Notify pool that thread is unblocked. Called by signalled
301     * threads when woken by non-FJ threads (which is atypical).
302     */
303     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
304     int s;
305     do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
306     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
307     pool.updateRunningCount(s);
308     }
309    
310     /**
311     * Notify pool to adjust counts on cancelled or timed out wait
312     */
313     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
314     if (pool != null) {
315     int s;
316     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
317     if (casStatus(s, s - 1)) {
318     pool.updateRunningCount(1);
319     break;
320     }
321     }
322     }
323     }
324    
325 dl 1.2 /**
326     * Handle interruptions during waits.
327     */
328 dl 1.1 private void onInterruptedWait() {
329 dl 1.2 ForkJoinWorkerThread w = getWorker();
330     if (w == null)
331     Thread.currentThread().interrupt(); // re-interrupt
332     else if (w.isTerminating())
333     cancelIgnoreExceptions();
334     // else if FJworker, ignore interrupt
335 dl 1.1 }
336    
337     // Recording and reporting exceptions
338    
339     private void setDoneExceptionally(Throwable rex) {
340     exceptionMap.put(this, rex);
341     setCompletion(EXCEPTIONAL);
342     }
343    
344     /**
345     * Throws the exception associated with status s;
346     * @throws the exception
347     */
348     private void reportException(int s) {
349     if ((s &= COMPLETION_MASK) < NORMAL) {
350     if (s == CANCELLED)
351     throw new CancellationException();
352     else
353     rethrowException(exceptionMap.get(this));
354     }
355     }
356    
357     /**
358     * Returns result or throws exception using j.u.c.Future conventions
359     * Only call when isDone known to be true.
360     */
361     private V reportFutureResult()
362     throws ExecutionException, InterruptedException {
363     int s = status & COMPLETION_MASK;
364     if (s < NORMAL) {
365     Throwable ex;
366     if (s == CANCELLED)
367     throw new CancellationException();
368     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
369     throw new ExecutionException(ex);
370     if (Thread.interrupted())
371     throw new InterruptedException();
372     }
373     return getRawResult();
374     }
375    
376     /**
377     * Returns result or throws exception using j.u.c.Future conventions
378     * with timeouts
379     */
380     private V reportTimedFutureResult()
381     throws InterruptedException, ExecutionException, TimeoutException {
382     Throwable ex;
383     int s = status & COMPLETION_MASK;
384     if (s == NORMAL)
385     return getRawResult();
386     if (s == CANCELLED)
387     throw new CancellationException();
388     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
389     throw new ExecutionException(ex);
390     if (Thread.interrupted())
391     throw new InterruptedException();
392     throw new TimeoutException();
393     }
394    
395     // internal execution methods
396    
397     /**
398     * Calls exec, recording completion, and rethrowing exception if
399     * encountered. Caller should normally check status before calling
400     * @return true if completed normally
401     */
402     private boolean tryExec() {
403     try { // try block must contain only call to exec
404     if (!exec())
405     return false;
406     } catch (Throwable rex) {
407     setDoneExceptionally(rex);
408     rethrowException(rex);
409     return false; // not reached
410     }
411     setNormalCompletion();
412     return true;
413     }
414    
415     /**
416     * Main execution method used by worker threads. Invokes
417     * base computation unless already complete
418     */
419     final void quietlyExec() {
420     if (status >= 0) {
421     try {
422     if (!exec())
423     return;
424     } catch(Throwable rex) {
425     setDoneExceptionally(rex);
426     return;
427     }
428     setNormalCompletion();
429     }
430     }
431    
432     /**
433     * Calls exec, recording but not rethrowing exception
434     * Caller should normally check status before calling
435     * @return true if completed normally
436     */
437     private boolean tryQuietlyInvoke() {
438     try {
439     if (!exec())
440     return false;
441     } catch (Throwable rex) {
442     setDoneExceptionally(rex);
443     return false;
444     }
445     setNormalCompletion();
446     return true;
447     }
448    
449     /**
450     * Cancel, ignoring any exceptions it throws
451     */
452     final void cancelIgnoreExceptions() {
453     try {
454     cancel(false);
455     } catch(Throwable ignore) {
456     }
457     }
458    
459     // public methods
460    
461     /**
462     * Arranges to asynchronously execute this task. While it is not
463     * necessarily enforced, it is a usage error to fork a task more
464     * than once unless it has completed and been reinitialized. This
465 dl 1.2 * method may be invoked only from within ForkJoinTask
466 dl 1.1 * computations. Attempts to invoke in other contexts result in
467     * exceptions or errors including ClassCastException.
468     */
469     public final void fork() {
470     ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
471     }
472    
473     /**
474     * Returns the result of the computation when it is ready.
475 dl 1.2 * This method differs from <code>get</code> in that abnormal
476 dl 1.1 * completion results in RuntimeExceptions or Errors, not
477     * ExecutionExceptions.
478     *
479     * @return the computed result
480     */
481     public final V join() {
482     ForkJoinWorkerThread w = getWorker();
483     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
484     reportException(awaitDone(w, true));
485     return getRawResult();
486     }
487    
488     public final V get() throws InterruptedException, ExecutionException {
489     ForkJoinWorkerThread w = getWorker();
490     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
491     awaitDone(w, true);
492     return reportFutureResult();
493     }
494    
495     public final V get(long timeout, TimeUnit unit)
496     throws InterruptedException, ExecutionException, TimeoutException {
497     ForkJoinWorkerThread w = getWorker();
498     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
499     awaitDone(w, unit.toNanos(timeout));
500     return reportTimedFutureResult();
501     }
502    
503     /**
504 dl 1.2 * Commences performing this task, awaits its completion if
505     * necessary, and return its result.
506 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
507     * exception) if the underlying computation did so.
508     * @return the computed result
509     */
510     public final V invoke() {
511     if (status >= 0 && tryExec())
512     return getRawResult();
513     else
514     return join();
515     }
516    
517     /**
518 dl 1.2 * Forks both tasks, returning when <code>isDone</code> holds for
519     * both of them or an exception is encountered. This method may be
520     * invoked only from within ForkJoinTask computations. Attempts to
521     * invoke in other contexts result in exceptions or errors
522     * including ClassCastException.
523     * @param t1 one task
524     * @param t2 the other task
525     * @throws NullPointerException if t1 or t2 are null
526     * @throws RuntimeException or Error if either task did so.
527 dl 1.1 */
528 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
529     t2.fork();
530     t1.invoke();
531     t2.join();
532 dl 1.1 }
533    
534     /**
535 dl 1.2 * Forks the given tasks, returning when <code>isDone</code> holds
536     * for all of them. If any task encounters an exception, others
537     * may be cancelled. This method may be invoked only from within
538     * ForkJoinTask computations. Attempts to invoke in other contexts
539     * result in exceptions or errors including ClassCastException.
540     * @param tasks the array of tasks
541     * @throws NullPointerException if tasks or any element are null.
542     * @throws RuntimeException or Error if any task did so.
543 dl 1.1 */
544 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
545     Throwable ex = null;
546     int last = tasks.length - 1;
547     for (int i = last; i >= 0; --i) {
548     ForkJoinTask<?> t = tasks[i];
549     if (t == null) {
550     if (ex == null)
551     ex = new NullPointerException();
552     }
553     else if (i != 0)
554     t.fork();
555     else {
556     t.quietlyInvoke();
557     if (ex == null)
558     ex = t.getException();
559     }
560     }
561     for (int i = 1; i <= last; ++i) {
562     ForkJoinTask<?> t = tasks[i];
563     if (t != null) {
564     if (ex != null)
565     t.cancel(false);
566     else {
567     t.quietlyJoin();
568     if (ex == null)
569     ex = t.getException();
570     }
571     }
572 dl 1.1 }
573 dl 1.2 if (ex != null)
574     rethrowException(ex);
575 dl 1.1 }
576    
577     /**
578 dl 1.2 * Forks all tasks in the collection, returning when
579     * <code>isDone</code> holds for all of them. If any task
580     * encounters an exception, others may be cancelled. This method
581     * may be invoked only from within ForkJoinTask
582     * computations. Attempts to invoke in other contexts resul!t in
583     * exceptions or errors including ClassCastException.
584     * @param tasks the collection of tasks
585     * @throws NullPointerException if tasks or any element are null.
586     * @throws RuntimeException or Error if any task did so.
587 dl 1.1 */
588 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
589     if (!(tasks instanceof List)) {
590     invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
591     return;
592     }
593     List<? extends ForkJoinTask<?>> ts =
594     (List<? extends ForkJoinTask<?>>)tasks;
595     Throwable ex = null;
596     int last = ts.size() - 1;
597     for (int i = last; i >= 0; --i) {
598     ForkJoinTask<?> t = ts.get(i);
599     if (t == null) {
600     if (ex == null)
601     ex = new NullPointerException();
602     }
603     else if (i != 0)
604     t.fork();
605     else {
606     t.quietlyInvoke();
607     if (ex == null)
608     ex = t.getException();
609     }
610     }
611     for (int i = 1; i <= last; ++i) {
612     ForkJoinTask<?> t = ts.get(i);
613     if (t != null) {
614     if (ex != null)
615     t.cancel(false);
616     else {
617     t.quietlyJoin();
618     if (ex == null)
619     ex = t.getException();
620     }
621     }
622     }
623     if (ex != null)
624     rethrowException(ex);
625 dl 1.1 }
626    
627     /**
628     * Returns true if the computation performed by this task has
629     * completed (or has been cancelled).
630     * @return true if this computation has completed
631     */
632     public final boolean isDone() {
633     return status < 0;
634     }
635    
636     /**
637     * Returns true if this task was cancelled.
638     * @return true if this task was cancelled
639     */
640     public final boolean isCancelled() {
641     return (status & COMPLETION_MASK) == CANCELLED;
642     }
643    
644     /**
645     * Returns true if this task threw an exception or was cancelled
646     * @return true if this task threw an exception or was cancelled
647     */
648 dl 1.2 public final boolean isCompletedAbnormally() {
649 dl 1.1 return (status & COMPLETION_MASK) < NORMAL;
650     }
651    
652     /**
653     * Returns the exception thrown by the base computation, or a
654     * CancellationException if cancelled, or null if none or if the
655     * method has not yet completed.
656     * @return the exception, or null if none
657     */
658     public final Throwable getException() {
659     int s = status & COMPLETION_MASK;
660     if (s >= NORMAL)
661     return null;
662     if (s == CANCELLED)
663     return new CancellationException();
664     return exceptionMap.get(this);
665     }
666    
667     /**
668     * Asserts that the results of this task's computation will not be
669 dl 1.2 * used. If a cancellation occurs before atempting to execute this
670     * task, then execution will be suppressed, <code>isCancelled</code>
671     * will report true, and <code>join</code> will result in a
672     * <code>CancellationException</code> being thrown. Otherwise, when
673 dl 1.1 * cancellation races with completion, there are no guarantees
674 dl 1.2 * about whether <code>isCancelled</code> will report true, whether
675     * <code>join</code> will return normally or via an exception, or
676 dl 1.1 * whether these behaviors will remain consistent upon repeated
677     * invocation.
678     *
679     * <p>This method may be overridden in subclasses, but if so, must
680     * still ensure that these minimal properties hold. In particular,
681     * the cancel method itself must not throw exceptions.
682     *
683     * <p> This method is designed to be invoked by <em>other</em>
684     * tasks. To terminate the current task, you can just return or
685     * throw an unchecked exception from its computation method, or
686 dl 1.2 * invoke <code>completeExceptionally</code>.
687 dl 1.1 *
688     * @param mayInterruptIfRunning this value is ignored in the
689     * default implementation because tasks are not in general
690     * cancelled via interruption.
691     *
692     * @return true if this task is now cancelled
693     */
694     public boolean cancel(boolean mayInterruptIfRunning) {
695     setCompletion(CANCELLED);
696     return (status & COMPLETION_MASK) == CANCELLED;
697     }
698    
699     /**
700     * Completes this task abnormally, and if not already aborted or
701     * cancelled, causes it to throw the given exception upon
702 dl 1.2 * <code>join</code> and related operations. This method may be used
703 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
704 dl 1.2 * completion of tasks that would not otherwise complete. Its use
705     * in other situations is likely to be wrong. This method is
706     * overridable, but overridden versions must invoke <code>super</code>
707     * implementation to maintain guarantees.
708     *
709 dl 1.1 * @param ex the exception to throw. If this exception is
710     * not a RuntimeException or Error, the actual exception thrown
711     * will be a RuntimeException with cause ex.
712     */
713     public void completeExceptionally(Throwable ex) {
714     setDoneExceptionally((ex instanceof RuntimeException) ||
715     (ex instanceof Error)? ex :
716     new RuntimeException(ex));
717     }
718    
719     /**
720     * Completes this task, and if not already aborted or cancelled,
721 dl 1.2 * returning a <code>null</code> result upon <code>join</code> and related
722 dl 1.1 * operations. This method may be used to provide results for
723     * asynchronous tasks, or to provide alternative handling for
724 dl 1.2 * tasks that would not otherwise complete normally. Its use in
725     * other situations is likely to be wrong. This method is
726     * overridable, but overridden versions must invoke <code>super</code>
727     * implementation to maintain guarantees.
728 dl 1.1 *
729     * @param value the result value for this task.
730     */
731     public void complete(V value) {
732     try {
733     setRawResult(value);
734     } catch(Throwable rex) {
735     setDoneExceptionally(rex);
736     return;
737     }
738     setNormalCompletion();
739     }
740    
741     /**
742 dl 1.2 * Possibly executes other tasks until this task is ready, then
743     * returns the result of the computation. This method may be more
744     * efficient than <code>join</code>, but is only applicable when
745     * there are no potemtial dependencies between continuation of the
746     * current task and that of any other task that might be executed
747     * while helping. (This usually holds for pure divide-and-conquer
748     * tasks). This method may be invoked only from within
749     * ForkJoinTask computations. Attempts to invoke in other contexts
750     * resul!t in exceptions or errors including ClassCastException.
751     * @return the computed result
752     */
753     public final V helpJoin() {
754     ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
755     if (status < 0 || !w.unpushTask(this) || !tryExec())
756     reportException(w.helpJoinTask(this));
757     return getRawResult();
758     }
759    
760     /**
761     * Possibly executes other tasks until this task is ready. This
762     * method may be invoked only from within ForkJoinTask
763     * computations. Attempts to invoke in other contexts resul!t in
764     * exceptions or errors including ClassCastException.
765     */
766     public final void quietlyHelpJoin() {
767     if (status >= 0) {
768     ForkJoinWorkerThread w =
769     (ForkJoinWorkerThread)(Thread.currentThread());
770     if (!w.unpushTask(this) || !tryQuietlyInvoke())
771     w.helpJoinTask(this);
772     }
773     }
774    
775     /**
776     * Joins this task, without returning its result or throwing an
777     * exception. This method may be useful when processing
778     * collections of tasks when some have been cancelled or otherwise
779     * known to have aborted.
780     */
781     public final void quietlyJoin() {
782     if (status >= 0) {
783     ForkJoinWorkerThread w = getWorker();
784     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
785     awaitDone(w, true);
786     }
787     }
788    
789     /**
790     * Commences performing this task and awaits its completion if
791     * necessary, without returning its result or throwing an
792     * exception. This method may be useful when processing
793     * collections of tasks when some have been cancelled or otherwise
794     * known to have aborted.
795     */
796     public final void quietlyInvoke() {
797     if (status >= 0 && !tryQuietlyInvoke())
798     quietlyJoin();
799     }
800    
801     /**
802 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
803 dl 1.2 * subsequent <code>fork</code>. This method allows repeated reuse of
804 dl 1.1 * this task, but only if reuse occurs when this task has either
805     * never been forked, or has been forked, then completed and all
806     * outstanding joins of this task have also completed. Effects
807     * under any other usage conditions are not guaranteed, and are
808     * almost surely wrong. This method may be useful when executing
809     * pre-constructed trees of subtasks in loops.
810     */
811     public void reinitialize() {
812     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
813     exceptionMap.remove(this);
814     status = 0;
815     }
816    
817     /**
818 dl 1.2 * Returns the pool hosting the current task execution, or null
819     * if this task is executing outside of any pool.
820     * @return the pool, or null if none.
821 dl 1.1 */
822 dl 1.2 public static ForkJoinPool getPool() {
823     Thread t = Thread.currentThread();
824     return ((t instanceof ForkJoinWorkerThread)?
825     ((ForkJoinWorkerThread)t).pool : null);
826 dl 1.1 }
827    
828     /**
829 dl 1.2 * Tries to unschedule this task for execution. This method will
830     * typically succeed if this task is the most recently forked task
831     * by the current thread, and has not commenced executing in
832     * another thread. This method may be useful when arranging
833     * alternative local processing of tasks that could have been, but
834     * were not, stolen. This method may be invoked only from within
835 dl 1.1 * ForkJoinTask computations. Attempts to invoke in other contexts
836     * result in exceptions or errors including ClassCastException.
837 dl 1.2 * @return true if unforked
838 dl 1.1 */
839 dl 1.2 public boolean tryUnfork() {
840     return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
841 dl 1.1 }
842    
843     /**
844     * Possibly executes tasks until the pool hosting the current task
845     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
846     * designs in which many tasks are forked, but none are explicitly
847     * joined, instead executing them until all are processed.
848     */
849     public static void helpQuiesce() {
850     ((ForkJoinWorkerThread)(Thread.currentThread())).
851     helpQuiescePool();
852     }
853    
854     /**
855 dl 1.2 * Returns an estimate of the number of tasks that have been
856     * forked by the current worker thread but not yet executed. This
857     * value may be useful for heuristic decisions about whether to
858     * fork other tasks.
859     * @return the number of tasks
860     */
861     public static int getQueuedTaskCount() {
862     return ((ForkJoinWorkerThread)(Thread.currentThread())).
863     getQueueSize();
864     }
865    
866     /**
867 dl 1.1 * Returns a estimate of how many more locally queued tasks are
868     * held by the current worker thread than there are other worker
869 dl 1.2 * threads that might steal them. This value may be useful for
870     * heuristic decisions about whether to fork other tasks. In many
871     * usages of ForkJoinTasks, at steady state, each worker should
872     * aim to maintain a small constant surplus (for example, 3) of
873     * tasks, and to process computations locally if this threshold is
874     * exceeded.
875 dl 1.1 * @return the surplus number of tasks, which may be negative
876     */
877 dl 1.2 public static int getSurplusQueuedTaskCount() {
878 dl 1.1 return ((ForkJoinWorkerThread)(Thread.currentThread()))
879     .getEstimatedSurplusTaskCount();
880     }
881    
882 dl 1.2 // Extension methods
883 dl 1.1
884     /**
885 dl 1.2 * Returns the result that would be returned by <code>join</code>,
886     * even if this task completed abnormally, or null if this task is
887     * not known to have been completed. This method is designed to
888     * aid debugging, as well as to support extensions. Its use in any
889     * other context is discouraged.
890 dl 1.1 *
891     * @return the result, or null if not completed.
892     */
893     public abstract V getRawResult();
894    
895     /**
896     * Forces the given value to be returned as a result. This method
897     * is designed to support extensions, and should not in general be
898     * called otherwise.
899     *
900     * @param value the value
901     */
902     protected abstract void setRawResult(V value);
903    
904     /**
905     * Immediately performs the base action of this task. This method
906     * is designed to support extensions, and should not in general be
907     * called otherwise. The return value controls whether this task
908     * is considered to be done normally. It may return false in
909     * asynchronous actions that require explicit invocations of
910 dl 1.2 * <code>complete</code> to become joinable. It may throw exceptions
911 dl 1.1 * to indicate abnormal exit.
912     * @return true if completed normally
913     * @throws Error or RuntimeException if encountered during computation
914     */
915     protected abstract boolean exec();
916    
917 dl 1.2 /**
918     * Returns, but does not unschedule or execute, the task most
919     * recently forked by the current thread but not yet executed, if
920     * one is available. There is no guarantee that this task will
921     * actually be polled or executed next.
922     * This method is designed primarily to support extensions,
923     * and is unlikely to be useful otherwise.
924     *
925     * @return the next task, or null if none are available
926     */
927     protected static ForkJoinTask<?> peekNextLocalTask() {
928     return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
929     }
930    
931     /**
932     * Unschedules and returns, without executing, the task most
933     * recently forked by the current thread but not yet executed.
934     * This method is designed primarily to support extensions,
935     * and is unlikely to be useful otherwise.
936     *
937     * @return the next task, or null if none are available
938     */
939     protected static ForkJoinTask<?> pollNextLocalTask() {
940     return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
941     }
942    
943     /**
944     * Unschedules and returns, without executing, the task most
945     * recently forked by the current thread but not yet executed, if
946     * one is available, or if not available, a task that was forked
947     * by some other thread, if available. Availability may be
948     * transient, so a <code>null</code> result does not necessarily
949     * imply quiecence of the pool this task is operating in.
950     * This method is designed primarily to support extensions,
951     * and is unlikely to be useful otherwise.
952     *
953     * @return a task, or null if none are available
954     */
955     protected static ForkJoinTask<?> pollTask() {
956     return ((ForkJoinWorkerThread)(Thread.currentThread())).
957     getLocalOrStolenTask();
958     }
959    
960 dl 1.1 // Serialization support
961    
962     private static final long serialVersionUID = -7721805057305804111L;
963    
964     /**
965     * Save the state to a stream.
966     *
967     * @serialData the current run status and the exception thrown
968     * during execution, or null if none.
969     * @param s the stream
970     */
971     private void writeObject(java.io.ObjectOutputStream s)
972     throws java.io.IOException {
973     s.defaultWriteObject();
974     s.writeObject(getException());
975     }
976    
977     /**
978     * Reconstitute the instance from a stream.
979     * @param s the stream
980     */
981     private void readObject(java.io.ObjectInputStream s)
982     throws java.io.IOException, ClassNotFoundException {
983     s.defaultReadObject();
984 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
985     status |= EXTERNAL_SIGNAL; // conservatively set external signal
986 dl 1.1 Object ex = s.readObject();
987     if (ex != null)
988     setDoneExceptionally((Throwable)ex);
989     }
990    
991     // Temporary Unsafe mechanics for preliminary release
992    
993     static final Unsafe _unsafe;
994     static final long statusOffset;
995    
996     static {
997     try {
998     if (ForkJoinTask.class.getClassLoader() != null) {
999     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1000     f.setAccessible(true);
1001     _unsafe = (Unsafe)f.get(null);
1002     }
1003     else
1004     _unsafe = Unsafe.getUnsafe();
1005     statusOffset = _unsafe.objectFieldOffset
1006     (ForkJoinTask.class.getDeclaredField("status"));
1007     } catch (Exception ex) { throw new Error(ex); }
1008     }
1009    
1010     }