ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.1
Committed: Tue Jan 6 14:30:31 2009 UTC (15 years, 3 months ago) by dl
Branch: MAIN
Log Message:
Refactored and repackaged ForkJoin classes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16     * Abstract base class for tasks that run within a ForkJoinPool. A
17     * ForkJoinTask is a thread-like entity that is much lighter weight
18     * than a normal thread. Huge numbers of tasks and subtasks may be
19     * hosted by a small number of actual threads in a ForkJoinPool,
20     * at the price of some usage limitations.
21     *
22     * <p> ForkJoinTasks are forms of <tt>Futures</tt> supporting a
23     * limited range of use. The "lightness" of ForkJoinTasks is due to a
24     * set of restrictions (that are only partially statically
25     * enforceable) reflecting their intended use as computational tasks
26     * calculating pure functions or operating on purely isolated objects.
27     * The primary coordination mechanisms supported for ForkJoinTasks are
28     * <tt>fork</tt>, that arranges asynchronous execution, and
29     * <tt>join</tt>, that doesn't proceed until the task's result has
30     * been computed. (Cancellation is also supported). The computation
31     * defined in the <tt>compute</tt> method should avoid
32     * <tt>synchronized</tt> methods or blocks, and should minimize
33     * blocking synchronization apart from joining other tasks or using
34     * synchronizers such as Phasers that are advertised to cooperate with
35     * fork/join scheduling. Tasks should also not perform blocking IO,
36     * and should ideally access variables that are completely independent
37     * of those accessed by other running tasks. Minor breaches of these
38     * restrictions, for example using shared output streams, may be
39     * tolerable in practice, but frequent use may result in poor
40     * performance, and the potential to indefinitely stall if the number
41     * of threads not waiting for external synchronization becomes
42     * exhausted. This usage restriction is in part enforced by not
43     * permitting checked exceptions such as IOExceptions to be
44     * thrown. However, computations may still encounter unchecked
45     * exceptions, that are rethrown to callers attempting join
46     * them. These exceptions may additionally include
47     * RejectedExecutionExceptions stemming from internal resource
48     * exhaustion such as failure to allocate internal task queues.
49     *
50     * <p> The <tt>ForkJoinTask</tt> class is not usually directly
51     * subclassed. Instead, you subclass one of the abstract classes that
52     * support different styles of fork/join processing. Normally, a
53     * concrete ForkJoinTask subclass declares fields comprising its
54     * parameters, established in a constructor, and then defines a
55     * <tt>compute</tt> method that somehow uses the control methods
56     * supplied by this base class. While these methods have
57     * <tt>public</tt> access, some of them may only be called from within
58     * other ForkJoinTasks. Attempts to invoke them in other contexts
59     * result in exceptions or errors including ClassCastException. The
60     * only way to invoke a "main" driver task is to submit it to a
61     * ForkJoinPool. Once started, this will usually in turn start other
62     * subtasks.
63     *
64     * <p>Most base support methods are <tt>final</tt> because their
65     * implementations are intrinsically tied to the underlying
66     * lightweight task scheduling framework, and so cannot be overridden.
67     * Developers creating new basic styles of fork/join processing should
68     * minimally implement protected methods <tt>exec</tt>,
69     * <tt>setRawResult</tt>, and <tt>getRawResult</tt>, while also
70     * introducing an abstract computational method that can be
71     * implemented in its subclasses. To support such extensions,
72     * instances of ForkJoinTasks maintain an atomically updated
73     * <tt>short</tt> representing user-defined control state. Control
74     * state is guaranteed initially to be zero, and to be negative upon
75     * completion, but may otherwise be used for any other control
76     * purposes, such as maintaining join counts. The {@link
77     * ForkJoinWorkerThread} class supports additional inspection and
78     * tuning methods that can be useful when developing extensions.
79     *
80     * <p>ForkJoinTasks should perform relatively small amounts of
81     * computations, othewise splitting into smaller tasks. As a very
82     * rough rule of thumb, a task should perform more than 100 and less
83     * than 10000 basic computational steps. If tasks are too big, then
84     * parellelism cannot improve throughput. If too small, then memory
85     * and internal task maintenance overhead may overwhelm processing.
86     *
87     * <p>ForkJoinTasks are <tt>Serializable</tt>, which enables them to
88     * be used in extensions such as remote execution frameworks. However,
89     * it is in general safe to serialize tasks only before or after, but
90     * not during execution. Serialization is not relied on during
91     * execution itself.
92     */
93     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
94     /**
95     * Status field holding all run status. We pack this into a single
96     * int both to minimize footprint overhead and to ensure atomicity
97     * (updates are via CAS).
98     *
99     * Status is initially zero, and takes on nonnegative values until
100     * completed, upon which status holds COMPLETED. CANCELLED, or
101     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
102     * blocking waits by other threads have SIGNAL_MASK bits set --
103     * bit 15 for external (nonFJ) waits, and the rest a count of
104     * waiting FJ threads. (This representation relies on
105     * ForkJoinPool max thread limits). Completion of a stolen task
106     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
107     * though suboptimal for some purposes, we use basic builtin
108     * wait/notify to take advantage of "monitor inflation" in JVMs
109     * that we would otherwise need to emulate to avoid adding further
110     * per-task bookkeeping overhead. Note that bits 16-28 are
111     * currently unused. Also value 0x80000000 is available as spare
112     * completion value.
113     */
114     volatile int status; // accessed directy by pool and workers
115    
116     static final int COMPLETION_MASK = 0xe0000000;
117     static final int NORMAL = 0xe0000000; // == mask
118     static final int CANCELLED = 0xc0000000;
119     static final int EXCEPTIONAL = 0xa0000000;
120     static final int SIGNAL_MASK = 0x0000ffff;
121     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
122     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
123    
124     /**
125     * Table of exceptions thrown by tasks, to enable reporting by
126     * callers. Because exceptions are rare, we don't directly keep
127     * them with task objects, but instead us a weak ref table. Note
128     * that cancellation exceptions don't appear in the table, but are
129     * instead recorded as status values.
130     * Todo: Use ConcurrentReferenceHashMap
131     */
132     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
133     Collections.synchronizedMap
134     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
135    
136     // within-package utilities
137    
138     /**
139     * Get current worker thread, or null if not a worker thread
140     */
141     static ForkJoinWorkerThread getWorker() {
142     Thread t = Thread.currentThread();
143     return ((t instanceof ForkJoinWorkerThread)?
144     (ForkJoinWorkerThread)t : null);
145     }
146    
147     /**
148     * Get pool of current worker thread, or null if not a worker thread
149     */
150     static ForkJoinPool getWorkerPool() {
151     Thread t = Thread.currentThread();
152     return ((t instanceof ForkJoinWorkerThread)?
153     ((ForkJoinWorkerThread)t).pool : null);
154     }
155    
156     final boolean casStatus(int cmp, int val) {
157     return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
158     }
159    
160     /**
161     * Workaround for not being able to rethrow unchecked exceptions.
162     */
163     static void rethrowException(Throwable ex) {
164     if (ex != null)
165     _unsafe.throwException(ex);
166     }
167    
168     // Setting completion status
169    
170     /**
171     * Mark completion and wake up threads waiting to join this task.
172     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
173     */
174     final void setCompletion(int completion) {
175     ForkJoinPool pool = getWorkerPool();
176     if (pool != null) {
177     int s; // Clear signal bits while setting completion status
178     do;while ((s = status) >= 0 && !casStatus(s, completion));
179    
180     if ((s & SIGNAL_MASK) != 0) {
181     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
182     pool.updateRunningCount(s);
183     synchronized(this) { notifyAll(); }
184     }
185     }
186     else
187     externallySetCompletion(completion);
188     }
189    
190     /**
191     * Version of setCompletion for non-FJ threads. Leaves signal
192     * bits for unblocked threads to adjust, and always notifies.
193     */
194     private void externallySetCompletion(int completion) {
195     int s;
196     do;while ((s = status) >= 0 &&
197     !casStatus(s, (s & SIGNAL_MASK) | completion));
198     synchronized(this) { notifyAll(); }
199     }
200    
201     /**
202     * Sets status to indicate normal completion
203     */
204     final void setNormalCompletion() {
205     // Try typical fast case -- single CAS, no signal, not already done.
206     // Manually expand casStatus to improve chances of inlining it
207     if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
208     setCompletion(NORMAL);
209     }
210    
211     // internal waiting and notification
212    
213     /**
214     * Performs the actual monitor wait for awaitDone
215     */
216     private void doAwaitDone() {
217     // Minimize lock bias and in/de-flation effects by maximizing
218     // chances of waiting inside sync
219     try {
220     while (status >= 0)
221     synchronized(this) { if (status >= 0) wait(); }
222     } catch (InterruptedException ie) {
223     onInterruptedWait();
224     }
225     }
226    
227     /**
228     * Performs the actual monitor wait for awaitDone
229     */
230     private void doAwaitDone(long startTime, long nanos) {
231     synchronized(this) {
232     try {
233     while (status >= 0) {
234     long nt = nanos - System.nanoTime() - startTime;
235     if (nt <= 0)
236     break;
237     wait(nt / 1000000, (int)(nt % 1000000));
238     }
239     } catch (InterruptedException ie) {
240     onInterruptedWait();
241     }
242     }
243     }
244    
245     // Awaiting completion
246    
247     /**
248     * Sets status to indicate there is joiner, then waits for join,
249     * surrounded with pool notifications.
250     * @return status upon exit
251     */
252     final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
253     ForkJoinPool pool = w == null? null : w.pool;
254     int s;
255     while ((s = status) >= 0) {
256     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
257     if (pool == null || !pool.preJoin(this, maintainParallelism))
258     doAwaitDone();
259     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
260     adjustPoolCountsOnUnblock(pool);
261     break;
262     }
263     }
264     return s;
265     }
266    
267     /**
268     * Timed version of awaitDone
269     * @return status upon exit
270     */
271     final int awaitDone(ForkJoinWorkerThread w, long nanos) {
272     ForkJoinPool pool = w == null? null : w.pool;
273     int s;
274     while ((s = status) >= 0) {
275     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
276     long startTime = System.nanoTime();
277     if (pool == null || !pool.preJoin(this, false))
278     doAwaitDone(startTime, nanos);
279     if ((s = status) >= 0) {
280     adjustPoolCountsOnCancelledWait(pool);
281     s = status;
282     }
283     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
284     adjustPoolCountsOnUnblock(pool);
285     break;
286     }
287     }
288     return s;
289     }
290    
291     /**
292     * Notify pool that thread is unblocked. Called by signalled
293     * threads when woken by non-FJ threads (which is atypical).
294     */
295     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
296     int s;
297     do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
298     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
299     pool.updateRunningCount(s);
300     }
301    
302     /**
303     * Notify pool to adjust counts on cancelled or timed out wait
304     */
305     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
306     if (pool != null) {
307     int s;
308     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
309     if (casStatus(s, s - 1)) {
310     pool.updateRunningCount(1);
311     break;
312     }
313     }
314     }
315     }
316    
317     private void onInterruptedWait() {
318     Thread t = Thread.currentThread();
319     if (t instanceof ForkJoinWorkerThread) {
320     ForkJoinWorkerThread w = (ForkJoinWorkerThread)t;
321     if (w.isTerminating())
322     cancelIgnoreExceptions();
323     }
324     else { // re-interrupt
325     try {
326     t.interrupt();
327     } catch (SecurityException ignore) {
328     }
329     }
330     }
331    
332     // Recording and reporting exceptions
333    
334     private void setDoneExceptionally(Throwable rex) {
335     exceptionMap.put(this, rex);
336     setCompletion(EXCEPTIONAL);
337     }
338    
339     /**
340     * Throws the exception associated with status s;
341     * @throws the exception
342     */
343     private void reportException(int s) {
344     if ((s &= COMPLETION_MASK) < NORMAL) {
345     if (s == CANCELLED)
346     throw new CancellationException();
347     else
348     rethrowException(exceptionMap.get(this));
349     }
350     }
351    
352     /**
353     * Returns result or throws exception using j.u.c.Future conventions
354     * Only call when isDone known to be true.
355     */
356     private V reportFutureResult()
357     throws ExecutionException, InterruptedException {
358     int s = status & COMPLETION_MASK;
359     if (s < NORMAL) {
360     Throwable ex;
361     if (s == CANCELLED)
362     throw new CancellationException();
363     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
364     throw new ExecutionException(ex);
365     if (Thread.interrupted())
366     throw new InterruptedException();
367     }
368     return getRawResult();
369     }
370    
371     /**
372     * Returns result or throws exception using j.u.c.Future conventions
373     * with timeouts
374     */
375     private V reportTimedFutureResult()
376     throws InterruptedException, ExecutionException, TimeoutException {
377     Throwable ex;
378     int s = status & COMPLETION_MASK;
379     if (s == NORMAL)
380     return getRawResult();
381     if (s == CANCELLED)
382     throw new CancellationException();
383     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
384     throw new ExecutionException(ex);
385     if (Thread.interrupted())
386     throw new InterruptedException();
387     throw new TimeoutException();
388     }
389    
390     // internal execution methods
391    
392     /**
393     * Calls exec, recording completion, and rethrowing exception if
394     * encountered. Caller should normally check status before calling
395     * @return true if completed normally
396     */
397     private boolean tryExec() {
398     try { // try block must contain only call to exec
399     if (!exec())
400     return false;
401     } catch (Throwable rex) {
402     setDoneExceptionally(rex);
403     rethrowException(rex);
404     return false; // not reached
405     }
406     setNormalCompletion();
407     return true;
408     }
409    
410     /**
411     * Main execution method used by worker threads. Invokes
412     * base computation unless already complete
413     */
414     final void quietlyExec() {
415     if (status >= 0) {
416     try {
417     if (!exec())
418     return;
419     } catch(Throwable rex) {
420     setDoneExceptionally(rex);
421     return;
422     }
423     setNormalCompletion();
424     }
425     }
426    
427     /**
428     * Calls exec, recording but not rethrowing exception
429     * Caller should normally check status before calling
430     * @return true if completed normally
431     */
432     private boolean tryQuietlyInvoke() {
433     try {
434     if (!exec())
435     return false;
436     } catch (Throwable rex) {
437     setDoneExceptionally(rex);
438     return false;
439     }
440     setNormalCompletion();
441     return true;
442     }
443    
444     /**
445     * Cancel, ignoring any exceptions it throws
446     */
447     final void cancelIgnoreExceptions() {
448     try {
449     cancel(false);
450     } catch(Throwable ignore) {
451     }
452     }
453    
454     // public methods
455    
456     /**
457     * Arranges to asynchronously execute this task. While it is not
458     * necessarily enforced, it is a usage error to fork a task more
459     * than once unless it has completed and been reinitialized. This
460     * method may be invoked only from within other ForkJoinTask
461     * computations. Attempts to invoke in other contexts result in
462     * exceptions or errors including ClassCastException.
463     */
464     public final void fork() {
465     ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
466     }
467    
468     /**
469     * Returns the result of the computation when it is ready.
470     * This method differs from <tt>get</tt> in that abnormal
471     * completion results in RuntimeExceptions or Errors, not
472     * ExecutionExceptions.
473     *
474     * @return the computed result
475     */
476     public final V join() {
477     ForkJoinWorkerThread w = getWorker();
478     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
479     reportException(awaitDone(w, true));
480     return getRawResult();
481     }
482    
483     public final V get() throws InterruptedException, ExecutionException {
484     ForkJoinWorkerThread w = getWorker();
485     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
486     awaitDone(w, true);
487     return reportFutureResult();
488     }
489    
490     public final V get(long timeout, TimeUnit unit)
491     throws InterruptedException, ExecutionException, TimeoutException {
492     ForkJoinWorkerThread w = getWorker();
493     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
494     awaitDone(w, unit.toNanos(timeout));
495     return reportTimedFutureResult();
496     }
497    
498     /**
499     * Possibly executes other tasks until this task is ready, then
500     * returns the result of the computation. This method may be more
501     * efficient than <tt>join</tt>, but is only applicable when there
502     * are no potemtial dependencies between continuation of the
503     * current task and that of any other task that might be executed
504     * while helping. (This usually holds for pure divide-and-conquer
505     * tasks).
506     * @return the computed result
507     */
508     public final V helpJoin() {
509     ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
510     if (status < 0 || !w.unpushTask(this) || !tryExec())
511     reportException(w.helpJoinTask(this));
512     return getRawResult();
513     }
514    
515     /**
516     * Performs this task, awaits its completion if necessary, and
517     * return its result.
518     * @throws Throwable (a RuntimeException, Error, or unchecked
519     * exception) if the underlying computation did so.
520     * @return the computed result
521     */
522     public final V invoke() {
523     if (status >= 0 && tryExec())
524     return getRawResult();
525     else
526     return join();
527     }
528    
529     /**
530     * Joins this task, without returning its result or throwing an
531     * exception. This method may be useful when processing
532     * collections of tasks when some have been cancelled or otherwise
533     * known to have aborted.
534     */
535     public final void quietlyJoin() {
536     if (status >= 0) {
537     ForkJoinWorkerThread w = getWorker();
538     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
539     awaitDone(w, true);
540     }
541     }
542    
543     /**
544     * Possibly executes other tasks until this task is ready.
545     */
546     public final void quietlyHelpJoin() {
547     if (status >= 0) {
548     ForkJoinWorkerThread w =
549     (ForkJoinWorkerThread)(Thread.currentThread());
550     if (!w.unpushTask(this) || !tryQuietlyInvoke())
551     w.helpJoinTask(this);
552     }
553     }
554    
555     /**
556     * Performs this task and awaits its completion if necessary,
557     * without returning its result or throwing an exception. This
558     * method may be useful when processing collections of tasks when
559     * some have been cancelled or otherwise known to have aborted.
560     */
561     public final void quietlyInvoke() {
562     if (status >= 0 && !tryQuietlyInvoke())
563     quietlyJoin();
564     }
565    
566     /**
567     * Returns true if the computation performed by this task has
568     * completed (or has been cancelled).
569     * @return true if this computation has completed
570     */
571     public final boolean isDone() {
572     return status < 0;
573     }
574    
575     /**
576     * Returns true if this task was cancelled.
577     * @return true if this task was cancelled
578     */
579     public final boolean isCancelled() {
580     return (status & COMPLETION_MASK) == CANCELLED;
581     }
582    
583     /**
584     * Returns true if this task threw an exception or was cancelled
585     * @return true if this task threw an exception or was cancelled
586     */
587     public final boolean completedAbnormally() {
588     return (status & COMPLETION_MASK) < NORMAL;
589     }
590    
591     /**
592     * Returns the exception thrown by the base computation, or a
593     * CancellationException if cancelled, or null if none or if the
594     * method has not yet completed.
595     * @return the exception, or null if none
596     */
597     public final Throwable getException() {
598     int s = status & COMPLETION_MASK;
599     if (s >= NORMAL)
600     return null;
601     if (s == CANCELLED)
602     return new CancellationException();
603     return exceptionMap.get(this);
604     }
605    
606     /**
607     * Asserts that the results of this task's computation will not be
608     * used. If a cancellation occurs before this task is processed,
609     * then its <tt>compute</tt> method will not be executed,
610     * <tt>isCancelled</tt> will report true, and <tt>join</tt> will
611     * result in a CancellationException being thrown. Otherwise, when
612     * cancellation races with completion, there are no guarantees
613     * about whether <tt>isCancelled</tt> will report true, whether
614     * <tt>join</tt> will return normally or via an exception, or
615     * whether these behaviors will remain consistent upon repeated
616     * invocation.
617     *
618     * <p>This method may be overridden in subclasses, but if so, must
619     * still ensure that these minimal properties hold. In particular,
620     * the cancel method itself must not throw exceptions.
621     *
622     * <p> This method is designed to be invoked by <em>other</em>
623     * tasks. To terminate the current task, you can just return or
624     * throw an unchecked exception from its computation method, or
625     * invoke <tt>completeExceptionally(someException)</tt>.
626     *
627     * @param mayInterruptIfRunning this value is ignored in the
628     * default implementation because tasks are not in general
629     * cancelled via interruption.
630     *
631     * @return true if this task is now cancelled
632     */
633     public boolean cancel(boolean mayInterruptIfRunning) {
634     setCompletion(CANCELLED);
635     return (status & COMPLETION_MASK) == CANCELLED;
636     }
637    
638     /**
639     * Completes this task abnormally, and if not already aborted or
640     * cancelled, causes it to throw the given exception upon
641     * <tt>join</tt> and related operations. This method may be used
642     * to induce exceptions in asynchronous tasks, or to force
643     * completion of tasks that would not otherwise complete. This
644     * method is overridable, but overridden versions must invoke
645     * <tt>super</tt> implementation to maintain guarantees.
646     * @param ex the exception to throw. If this exception is
647     * not a RuntimeException or Error, the actual exception thrown
648     * will be a RuntimeException with cause ex.
649     */
650     public void completeExceptionally(Throwable ex) {
651     setDoneExceptionally((ex instanceof RuntimeException) ||
652     (ex instanceof Error)? ex :
653     new RuntimeException(ex));
654     }
655    
656     /**
657     * Completes this task, and if not already aborted or cancelled,
658     * returning a <tt>null</tt> result upon <tt>join</tt> and related
659     * operations. This method may be used to provide results for
660     * asynchronous tasks, or to provide alternative handling for
661     * tasks that would not otherwise complete normally.
662     *
663     * @param value the result value for this task.
664     */
665     public void complete(V value) {
666     try {
667     setRawResult(value);
668     } catch(Throwable rex) {
669     setDoneExceptionally(rex);
670     return;
671     }
672     setNormalCompletion();
673     }
674    
675     /**
676     * Resets the internal bookkeeping state of this task, allowing a
677     * subsequent <tt>fork</tt>. This method allows repeated reuse of
678     * this task, but only if reuse occurs when this task has either
679     * never been forked, or has been forked, then completed and all
680     * outstanding joins of this task have also completed. Effects
681     * under any other usage conditions are not guaranteed, and are
682     * almost surely wrong. This method may be useful when executing
683     * pre-constructed trees of subtasks in loops.
684     */
685     public void reinitialize() {
686     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
687     exceptionMap.remove(this);
688     status = 0;
689     }
690    
691     /**
692     * Tries to unschedule this task for execution. This method will
693     * typically succeed if this task is the next task that would be
694     * executed by the current thread, and will typically fail (return
695     * false) otherwise. This method may be useful when arranging
696     * faster local processing of tasks that could have been, but were
697     * not, stolen.
698     * @return true if unforked
699     */
700     public boolean tryUnfork() {
701     return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
702     }
703    
704     /**
705     * Forks both tasks, returning when <tt>isDone</tt> holds for both
706     * of them or an exception is encountered. This method may be
707     * invoked only from within other ForkJoinTask
708     * computations. Attempts to invoke in other contexts result in
709     * exceptions or errors including ClassCastException.
710     * @param t1 one task
711     * @param t2 the other task
712     * @throws NullPointerException if t1 or t2 are null
713     * @throws RuntimeException or Error if either task did so.
714     */
715     public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
716     t2.fork();
717     t1.invoke();
718     t2.join();
719     }
720    
721     /**
722     * Forks the given tasks, returning when <tt>isDone</tt> holds for
723     * all of them. If any task encounters an exception, others may be
724     * cancelled. This method may be invoked only from within other
725     * ForkJoinTask computations. Attempts to invoke in other contexts
726     * result in exceptions or errors including ClassCastException.
727     * @param tasks the array of tasks
728     * @throws NullPointerException if tasks or any element are null.
729     * @throws RuntimeException or Error if any task did so.
730     */
731     public static void invokeAll(ForkJoinTask<?>... tasks) {
732     Throwable ex = null;
733     int last = tasks.length - 1;
734     for (int i = last; i >= 0; --i) {
735     ForkJoinTask<?> t = tasks[i];
736     if (t == null) {
737     if (ex == null)
738     ex = new NullPointerException();
739     }
740     else if (i != 0)
741     t.fork();
742     else {
743     t.quietlyInvoke();
744     if (ex == null)
745     ex = t.getException();
746     }
747     }
748     for (int i = 1; i <= last; ++i) {
749     ForkJoinTask<?> t = tasks[i];
750     if (t != null) {
751     if (ex != null)
752     t.cancel(false);
753     else {
754     t.quietlyJoin();
755     if (ex == null)
756     ex = t.getException();
757     }
758     }
759     }
760     if (ex != null)
761     rethrowException(ex);
762     }
763    
764     /**
765     * Forks all tasks in the collection, returning when
766     * <tt>isDone</tt> holds for all of them. If any task encounters
767     * an exception, others may be cancelled. This method may be
768     * invoked only from within other ForkJoinTask
769     * computations. Attempts to invoke in other contexts result in
770     * exceptions or errors including ClassCastException.
771     * @param tasks the collection of tasks
772     * @throws NullPointerException if tasks or any element are null.
773     * @throws RuntimeException or Error if any task did so.
774     */
775     public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
776     if (!(tasks instanceof List)) {
777     invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
778     return;
779     }
780     List<? extends ForkJoinTask<?>> ts =
781     (List<? extends ForkJoinTask<?>>)tasks;
782     Throwable ex = null;
783     int last = ts.size() - 1;
784     for (int i = last; i >= 0; --i) {
785     ForkJoinTask<?> t = ts.get(i);
786     if (t == null) {
787     if (ex == null)
788     ex = new NullPointerException();
789     }
790     else if (i != 0)
791     t.fork();
792     else {
793     t.quietlyInvoke();
794     if (ex == null)
795     ex = t.getException();
796     }
797     }
798     for (int i = 1; i <= last; ++i) {
799     ForkJoinTask<?> t = ts.get(i);
800     if (t != null) {
801     if (ex != null)
802     t.cancel(false);
803     else {
804     t.quietlyJoin();
805     if (ex == null)
806     ex = t.getException();
807     }
808     }
809     }
810     if (ex != null)
811     rethrowException(ex);
812     }
813    
814     /**
815     * Possibly executes tasks until the pool hosting the current task
816     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
817     * designs in which many tasks are forked, but none are explicitly
818     * joined, instead executing them until all are processed.
819     */
820     public static void helpQuiesce() {
821     ((ForkJoinWorkerThread)(Thread.currentThread())).
822     helpQuiescePool();
823     }
824    
825     /**
826     * Returns a estimate of how many more locally queued tasks are
827     * held by the current worker thread than there are other worker
828     * threads that might want to steal them. This value may be
829     * useful for heuristic decisions about whether to fork other
830     * tasks. In many usages of ForkJoinTasks, at steady state, each
831     * worker should aim to maintain a small constant surplus (for
832     * example, 3) of tasks, and to process computations locally if
833     * this threshold is exceeded.
834     * @return the surplus number of tasks, which may be negative
835     */
836     public static int surplus() {
837     return ((ForkJoinWorkerThread)(Thread.currentThread()))
838     .getEstimatedSurplusTaskCount();
839     }
840    
841     // Extension kit
842    
843     /**
844     * Returns the result that would be returned by <tt>join</tt>, or
845     * null if this task is not known to have been completed. This
846     * method is designed to aid debugging, as well as to support
847     * extensions. Its use in any other context is discouraged.
848     *
849     * @return the result, or null if not completed.
850     */
851     public abstract V getRawResult();
852    
853     /**
854     * Forces the given value to be returned as a result. This method
855     * is designed to support extensions, and should not in general be
856     * called otherwise.
857     *
858     * @param value the value
859     */
860     protected abstract void setRawResult(V value);
861    
862     /**
863     * Immediately performs the base action of this task. This method
864     * is designed to support extensions, and should not in general be
865     * called otherwise. The return value controls whether this task
866     * is considered to be done normally. It may return false in
867     * asynchronous actions that require explicit invocations of
868     * <tt>complete</tt> to become joinable. It may throw exceptions
869     * to indicate abnormal exit.
870     * @return true if completed normally
871     * @throws Error or RuntimeException if encountered during computation
872     */
873     protected abstract boolean exec();
874    
875     // Serialization support
876    
877     private static final long serialVersionUID = -7721805057305804111L;
878    
879     /**
880     * Save the state to a stream.
881     *
882     * @serialData the current run status and the exception thrown
883     * during execution, or null if none.
884     * @param s the stream
885     */
886     private void writeObject(java.io.ObjectOutputStream s)
887     throws java.io.IOException {
888     s.defaultWriteObject();
889     s.writeObject(getException());
890     }
891    
892     /**
893     * Reconstitute the instance from a stream.
894     * @param s the stream
895     */
896     private void readObject(java.io.ObjectInputStream s)
897     throws java.io.IOException, ClassNotFoundException {
898     s.defaultReadObject();
899     // status &= ~INTERNAL_SIGNAL_MASK; // todo: define policy
900     Object ex = s.readObject();
901     if (ex != null)
902     setDoneExceptionally((Throwable)ex);
903     }
904    
905     // Temporary Unsafe mechanics for preliminary release
906    
907     static final Unsafe _unsafe;
908     static final long statusOffset;
909    
910     static {
911     try {
912     if (ForkJoinTask.class.getClassLoader() != null) {
913     Field f = Unsafe.class.getDeclaredField("theUnsafe");
914     f.setAccessible(true);
915     _unsafe = (Unsafe)f.get(null);
916     }
917     else
918     _unsafe = Unsafe.getUnsafe();
919     statusOffset = _unsafe.objectFieldOffset
920     (ForkJoinTask.class.getDeclaredField("status"));
921     } catch (Exception ex) { throw new Error(ex); }
922     }
923    
924     }