ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.12
Committed: Wed Jul 22 01:36:51 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.11: +3 -0 lines
Log Message:
Add @since, @author tags

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21 dl 1.4 *
22 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
26     * and {@code join}, or derivatives such as
27     * {@code invokeAll}. However, this class also provides a number
28 dl 1.2 * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31 dl 1.4 *
32 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
40 dl 1.2 * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
52 dl 1.2 * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61 jsr166 1.8 * waits for completion and report results using {@code Future}
62 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
67 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
72 dl 1.2 * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
80 dl 1.2 * method that somehow uses the control methods supplied by this base
81 jsr166 1.8 * class. While these methods have {@code public} access (to allow
82 dl 1.2 * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84     * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 dl 1.4 * exceptions or errors possibly including ClassCastException.
86 dl 1.1 *
87 jsr166 1.8 * <p>Most base support methods are {@code final} because their
88 dl 1.1 * implementations are intrinsically tied to the underlying
89     * lightweight task scheduling framework, and so cannot be overridden.
90     * Developers creating new basic styles of fork/join processing should
91 jsr166 1.8 * minimally implement {@code protected} methods
92     * {@code exec}, {@code setRawResult}, and
93     * {@code getRawResult}, while also introducing an abstract
94 dl 1.2 * computational method that can be implemented in its subclasses,
95 jsr166 1.8 * possibly relying on other {@code protected} methods provided
96 dl 1.2 * by this class.
97 dl 1.1 *
98     * <p>ForkJoinTasks should perform relatively small amounts of
99 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
100 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
101     * than 10000 basic computational steps. If tasks are too big, then
102 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
103 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
104     *
105 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
106 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
107     * in general sensible to serialize tasks only before or after, but
108 dl 1.1 * not during execution. Serialization is not relied on during
109     * execution itself.
110 jsr166 1.12 *
111     * @since 1.7
112     * @author Doug Lea
113 dl 1.1 */
114     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
115 dl 1.2
116 dl 1.1 /**
117 dl 1.2 * Run control status bits packed into a single int to minimize
118     * footprint and to ensure atomicity (via CAS). Status is
119     * initially zero, and takes on nonnegative values until
120 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
121     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
122     * blocking waits by other threads have SIGNAL_MASK bits set --
123     * bit 15 for external (nonFJ) waits, and the rest a count of
124     * waiting FJ threads. (This representation relies on
125     * ForkJoinPool max thread limits). Completion of a stolen task
126     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
127     * though suboptimal for some purposes, we use basic builtin
128     * wait/notify to take advantage of "monitor inflation" in JVMs
129     * that we would otherwise need to emulate to avoid adding further
130     * per-task bookkeeping overhead. Note that bits 16-28 are
131     * currently unused. Also value 0x80000000 is available as spare
132     * completion value.
133     */
134 jsr166 1.9 volatile int status; // accessed directly by pool and workers
135 dl 1.1
136     static final int COMPLETION_MASK = 0xe0000000;
137     static final int NORMAL = 0xe0000000; // == mask
138     static final int CANCELLED = 0xc0000000;
139     static final int EXCEPTIONAL = 0xa0000000;
140     static final int SIGNAL_MASK = 0x0000ffff;
141     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
142     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
143    
144     /**
145     * Table of exceptions thrown by tasks, to enable reporting by
146     * callers. Because exceptions are rare, we don't directly keep
147 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
148 dl 1.1 * that cancellation exceptions don't appear in the table, but are
149     * instead recorded as status values.
150 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
151 dl 1.1 */
152     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
153     Collections.synchronizedMap
154     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
155    
156     // within-package utilities
157    
158     /**
159 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
160 dl 1.1 */
161     static ForkJoinWorkerThread getWorker() {
162     Thread t = Thread.currentThread();
163     return ((t instanceof ForkJoinWorkerThread)?
164     (ForkJoinWorkerThread)t : null);
165     }
166    
167     final boolean casStatus(int cmp, int val) {
168 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
169 dl 1.1 }
170    
171     /**
172     * Workaround for not being able to rethrow unchecked exceptions.
173     */
174     static void rethrowException(Throwable ex) {
175     if (ex != null)
176 jsr166 1.11 UNSAFE.throwException(ex);
177 dl 1.1 }
178    
179     // Setting completion status
180    
181     /**
182 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
183     *
184 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
185     */
186     final void setCompletion(int completion) {
187 dl 1.2 ForkJoinPool pool = getPool();
188 dl 1.1 if (pool != null) {
189     int s; // Clear signal bits while setting completion status
190     do;while ((s = status) >= 0 && !casStatus(s, completion));
191    
192     if ((s & SIGNAL_MASK) != 0) {
193     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
194     pool.updateRunningCount(s);
195     synchronized(this) { notifyAll(); }
196     }
197     }
198     else
199     externallySetCompletion(completion);
200     }
201    
202     /**
203     * Version of setCompletion for non-FJ threads. Leaves signal
204     * bits for unblocked threads to adjust, and always notifies.
205     */
206     private void externallySetCompletion(int completion) {
207     int s;
208     do;while ((s = status) >= 0 &&
209     !casStatus(s, (s & SIGNAL_MASK) | completion));
210     synchronized(this) { notifyAll(); }
211     }
212    
213     /**
214     * Sets status to indicate normal completion
215     */
216     final void setNormalCompletion() {
217     // Try typical fast case -- single CAS, no signal, not already done.
218     // Manually expand casStatus to improve chances of inlining it
219 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
220 dl 1.1 setCompletion(NORMAL);
221     }
222    
223     // internal waiting and notification
224    
225     /**
226     * Performs the actual monitor wait for awaitDone
227     */
228     private void doAwaitDone() {
229     // Minimize lock bias and in/de-flation effects by maximizing
230     // chances of waiting inside sync
231     try {
232     while (status >= 0)
233     synchronized(this) { if (status >= 0) wait(); }
234     } catch (InterruptedException ie) {
235     onInterruptedWait();
236     }
237     }
238    
239     /**
240     * Performs the actual monitor wait for awaitDone
241     */
242     private void doAwaitDone(long startTime, long nanos) {
243     synchronized(this) {
244     try {
245     while (status >= 0) {
246     long nt = nanos - System.nanoTime() - startTime;
247     if (nt <= 0)
248     break;
249     wait(nt / 1000000, (int)(nt % 1000000));
250     }
251     } catch (InterruptedException ie) {
252     onInterruptedWait();
253     }
254     }
255     }
256    
257     // Awaiting completion
258    
259     /**
260     * Sets status to indicate there is joiner, then waits for join,
261     * surrounded with pool notifications.
262 jsr166 1.10 *
263 dl 1.1 * @return status upon exit
264     */
265 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
266 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
267     int s;
268     while ((s = status) >= 0) {
269     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
270     if (pool == null || !pool.preJoin(this, maintainParallelism))
271     doAwaitDone();
272     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
273     adjustPoolCountsOnUnblock(pool);
274     break;
275     }
276     }
277     return s;
278     }
279    
280     /**
281     * Timed version of awaitDone
282     * @return status upon exit
283     */
284 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
285 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
286     int s;
287     while ((s = status) >= 0) {
288     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
289     long startTime = System.nanoTime();
290     if (pool == null || !pool.preJoin(this, false))
291     doAwaitDone(startTime, nanos);
292     if ((s = status) >= 0) {
293     adjustPoolCountsOnCancelledWait(pool);
294     s = status;
295     }
296     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
297     adjustPoolCountsOnUnblock(pool);
298     break;
299     }
300     }
301     return s;
302     }
303    
304     /**
305 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
306 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
307     */
308     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
309     int s;
310     do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
311     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
312     pool.updateRunningCount(s);
313     }
314    
315     /**
316 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
317 dl 1.1 */
318     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
319     if (pool != null) {
320     int s;
321     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
322     if (casStatus(s, s - 1)) {
323     pool.updateRunningCount(1);
324     break;
325     }
326     }
327     }
328     }
329    
330 dl 1.2 /**
331 jsr166 1.10 * Handles interruptions during waits.
332 dl 1.2 */
333 dl 1.1 private void onInterruptedWait() {
334 dl 1.2 ForkJoinWorkerThread w = getWorker();
335     if (w == null)
336     Thread.currentThread().interrupt(); // re-interrupt
337     else if (w.isTerminating())
338 dl 1.3 cancelIgnoringExceptions();
339 dl 1.2 // else if FJworker, ignore interrupt
340 dl 1.1 }
341    
342     // Recording and reporting exceptions
343    
344     private void setDoneExceptionally(Throwable rex) {
345     exceptionMap.put(this, rex);
346     setCompletion(EXCEPTIONAL);
347     }
348    
349     /**
350 jsr166 1.10 * Throws the exception associated with status s.
351     *
352 dl 1.1 * @throws the exception
353     */
354     private void reportException(int s) {
355     if ((s &= COMPLETION_MASK) < NORMAL) {
356     if (s == CANCELLED)
357     throw new CancellationException();
358     else
359     rethrowException(exceptionMap.get(this));
360     }
361     }
362    
363     /**
364 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
365 dl 1.1 * Only call when isDone known to be true.
366     */
367     private V reportFutureResult()
368     throws ExecutionException, InterruptedException {
369     int s = status & COMPLETION_MASK;
370     if (s < NORMAL) {
371     Throwable ex;
372     if (s == CANCELLED)
373     throw new CancellationException();
374     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
375     throw new ExecutionException(ex);
376     if (Thread.interrupted())
377     throw new InterruptedException();
378     }
379     return getRawResult();
380     }
381    
382     /**
383     * Returns result or throws exception using j.u.c.Future conventions
384 jsr166 1.10 * with timeouts.
385 dl 1.1 */
386     private V reportTimedFutureResult()
387     throws InterruptedException, ExecutionException, TimeoutException {
388     Throwable ex;
389     int s = status & COMPLETION_MASK;
390     if (s == NORMAL)
391     return getRawResult();
392     if (s == CANCELLED)
393     throw new CancellationException();
394     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
395     throw new ExecutionException(ex);
396     if (Thread.interrupted())
397     throw new InterruptedException();
398     throw new TimeoutException();
399     }
400    
401     // internal execution methods
402    
403     /**
404     * Calls exec, recording completion, and rethrowing exception if
405 jsr166 1.10 * encountered. Caller should normally check status before calling.
406     *
407 dl 1.1 * @return true if completed normally
408     */
409     private boolean tryExec() {
410     try { // try block must contain only call to exec
411     if (!exec())
412     return false;
413     } catch (Throwable rex) {
414     setDoneExceptionally(rex);
415     rethrowException(rex);
416     return false; // not reached
417     }
418     setNormalCompletion();
419     return true;
420     }
421    
422     /**
423     * Main execution method used by worker threads. Invokes
424 jsr166 1.10 * base computation unless already complete.
425 dl 1.1 */
426     final void quietlyExec() {
427     if (status >= 0) {
428     try {
429     if (!exec())
430     return;
431     } catch(Throwable rex) {
432     setDoneExceptionally(rex);
433     return;
434     }
435     setNormalCompletion();
436     }
437     }
438    
439     /**
440 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
441     * Caller should normally check status before calling.
442     *
443 dl 1.1 * @return true if completed normally
444     */
445     private boolean tryQuietlyInvoke() {
446     try {
447     if (!exec())
448     return false;
449     } catch (Throwable rex) {
450     setDoneExceptionally(rex);
451     return false;
452     }
453     setNormalCompletion();
454     return true;
455     }
456    
457     /**
458 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
459 dl 1.1 */
460 dl 1.3 final void cancelIgnoringExceptions() {
461 dl 1.1 try {
462     cancel(false);
463     } catch(Throwable ignore) {
464     }
465     }
466    
467 dl 1.3 /**
468     * Main implementation of helpJoin
469     */
470     private int busyJoin(ForkJoinWorkerThread w) {
471     int s;
472     ForkJoinTask<?> t;
473     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
474     t.quietlyExec();
475     return (s >= 0)? awaitDone(w, false) : s; // block if no work
476     }
477    
478 dl 1.1 // public methods
479    
480     /**
481     * Arranges to asynchronously execute this task. While it is not
482     * necessarily enforced, it is a usage error to fork a task more
483     * than once unless it has completed and been reinitialized. This
484 dl 1.2 * method may be invoked only from within ForkJoinTask
485 dl 1.1 * computations. Attempts to invoke in other contexts result in
486 dl 1.4 * exceptions or errors possibly including ClassCastException.
487 dl 1.1 */
488     public final void fork() {
489     ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
490     }
491    
492     /**
493     * Returns the result of the computation when it is ready.
494 jsr166 1.8 * This method differs from {@code get} in that abnormal
495 dl 1.1 * completion results in RuntimeExceptions or Errors, not
496     * ExecutionExceptions.
497     *
498     * @return the computed result
499     */
500     public final V join() {
501     ForkJoinWorkerThread w = getWorker();
502     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
503     reportException(awaitDone(w, true));
504     return getRawResult();
505     }
506    
507     /**
508 dl 1.2 * Commences performing this task, awaits its completion if
509     * necessary, and return its result.
510 jsr166 1.10 *
511 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
512 jsr166 1.10 * exception) if the underlying computation did so
513 dl 1.1 * @return the computed result
514     */
515     public final V invoke() {
516     if (status >= 0 && tryExec())
517     return getRawResult();
518     else
519     return join();
520     }
521    
522     /**
523 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
524 dl 1.2 * both of them or an exception is encountered. This method may be
525     * invoked only from within ForkJoinTask computations. Attempts to
526     * invoke in other contexts result in exceptions or errors
527 dl 1.4 * possibly including ClassCastException.
528 jsr166 1.10 *
529 dl 1.2 * @param t1 one task
530     * @param t2 the other task
531     * @throws NullPointerException if t1 or t2 are null
532 jsr166 1.10 * @throws RuntimeException or Error if either task did so
533 dl 1.1 */
534 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
535     t2.fork();
536     t1.invoke();
537     t2.join();
538 dl 1.1 }
539    
540     /**
541 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
542 dl 1.2 * for all of them. If any task encounters an exception, others
543     * may be cancelled. This method may be invoked only from within
544     * ForkJoinTask computations. Attempts to invoke in other contexts
545 dl 1.4 * result in exceptions or errors possibly including ClassCastException.
546 jsr166 1.10 *
547 dl 1.2 * @param tasks the array of tasks
548 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
549     * @throws RuntimeException or Error if any task did so
550 dl 1.1 */
551 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
552     Throwable ex = null;
553     int last = tasks.length - 1;
554     for (int i = last; i >= 0; --i) {
555     ForkJoinTask<?> t = tasks[i];
556     if (t == null) {
557     if (ex == null)
558     ex = new NullPointerException();
559     }
560     else if (i != 0)
561     t.fork();
562     else {
563     t.quietlyInvoke();
564     if (ex == null)
565     ex = t.getException();
566     }
567     }
568     for (int i = 1; i <= last; ++i) {
569     ForkJoinTask<?> t = tasks[i];
570     if (t != null) {
571     if (ex != null)
572     t.cancel(false);
573     else {
574     t.quietlyJoin();
575     if (ex == null)
576     ex = t.getException();
577     }
578     }
579 dl 1.1 }
580 dl 1.2 if (ex != null)
581     rethrowException(ex);
582 dl 1.1 }
583    
584     /**
585 dl 1.2 * Forks all tasks in the collection, returning when
586 jsr166 1.8 * {@code isDone} holds for all of them. If any task
587 dl 1.2 * encounters an exception, others may be cancelled. This method
588     * may be invoked only from within ForkJoinTask
589 jsr166 1.9 * computations. Attempts to invoke in other contexts result in
590 dl 1.4 * exceptions or errors possibly including ClassCastException.
591 jsr166 1.10 *
592 dl 1.2 * @param tasks the collection of tasks
593 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
594     * @throws RuntimeException or Error if any task did so
595 dl 1.1 */
596 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
597     if (!(tasks instanceof List)) {
598     invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
599     return;
600     }
601     List<? extends ForkJoinTask<?>> ts =
602     (List<? extends ForkJoinTask<?>>)tasks;
603     Throwable ex = null;
604     int last = ts.size() - 1;
605     for (int i = last; i >= 0; --i) {
606     ForkJoinTask<?> t = ts.get(i);
607     if (t == null) {
608     if (ex == null)
609     ex = new NullPointerException();
610     }
611     else if (i != 0)
612     t.fork();
613     else {
614     t.quietlyInvoke();
615     if (ex == null)
616     ex = t.getException();
617     }
618     }
619     for (int i = 1; i <= last; ++i) {
620     ForkJoinTask<?> t = ts.get(i);
621     if (t != null) {
622     if (ex != null)
623     t.cancel(false);
624     else {
625     t.quietlyJoin();
626     if (ex == null)
627     ex = t.getException();
628     }
629     }
630     }
631     if (ex != null)
632     rethrowException(ex);
633 dl 1.1 }
634    
635     /**
636     * Returns true if the computation performed by this task has
637     * completed (or has been cancelled).
638 jsr166 1.10 *
639 dl 1.1 * @return true if this computation has completed
640     */
641     public final boolean isDone() {
642     return status < 0;
643     }
644    
645     /**
646     * Returns true if this task was cancelled.
647 jsr166 1.10 *
648 dl 1.1 * @return true if this task was cancelled
649     */
650     public final boolean isCancelled() {
651     return (status & COMPLETION_MASK) == CANCELLED;
652     }
653    
654     /**
655     * Asserts that the results of this task's computation will not be
656 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
657 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
658     * will report true, and {@code join} will result in a
659     * {@code CancellationException} being thrown. Otherwise, when
660 dl 1.1 * cancellation races with completion, there are no guarantees
661 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
662     * {@code join} will return normally or via an exception, or
663 dl 1.1 * whether these behaviors will remain consistent upon repeated
664     * invocation.
665     *
666     * <p>This method may be overridden in subclasses, but if so, must
667     * still ensure that these minimal properties hold. In particular,
668     * the cancel method itself must not throw exceptions.
669     *
670     * <p> This method is designed to be invoked by <em>other</em>
671     * tasks. To terminate the current task, you can just return or
672     * throw an unchecked exception from its computation method, or
673 jsr166 1.8 * invoke {@code completeExceptionally}.
674 dl 1.1 *
675     * @param mayInterruptIfRunning this value is ignored in the
676     * default implementation because tasks are not in general
677     * cancelled via interruption.
678     *
679     * @return true if this task is now cancelled
680     */
681     public boolean cancel(boolean mayInterruptIfRunning) {
682     setCompletion(CANCELLED);
683     return (status & COMPLETION_MASK) == CANCELLED;
684     }
685    
686     /**
687 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
688     *
689 dl 1.3 * @return true if this task threw an exception or was cancelled
690     */
691     public final boolean isCompletedAbnormally() {
692     return (status & COMPLETION_MASK) < NORMAL;
693     }
694    
695     /**
696     * Returns the exception thrown by the base computation, or a
697     * CancellationException if cancelled, or null if none or if the
698     * method has not yet completed.
699 jsr166 1.10 *
700 dl 1.3 * @return the exception, or null if none
701     */
702     public final Throwable getException() {
703     int s = status & COMPLETION_MASK;
704     if (s >= NORMAL)
705     return null;
706     if (s == CANCELLED)
707     return new CancellationException();
708     return exceptionMap.get(this);
709     }
710    
711     /**
712 dl 1.1 * Completes this task abnormally, and if not already aborted or
713     * cancelled, causes it to throw the given exception upon
714 jsr166 1.8 * {@code join} and related operations. This method may be used
715 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
716 dl 1.2 * completion of tasks that would not otherwise complete. Its use
717     * in other situations is likely to be wrong. This method is
718 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
719 dl 1.2 * implementation to maintain guarantees.
720     *
721 dl 1.1 * @param ex the exception to throw. If this exception is
722     * not a RuntimeException or Error, the actual exception thrown
723     * will be a RuntimeException with cause ex.
724     */
725     public void completeExceptionally(Throwable ex) {
726     setDoneExceptionally((ex instanceof RuntimeException) ||
727     (ex instanceof Error)? ex :
728     new RuntimeException(ex));
729     }
730    
731     /**
732     * Completes this task, and if not already aborted or cancelled,
733 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
734 dl 1.1 * operations. This method may be used to provide results for
735     * asynchronous tasks, or to provide alternative handling for
736 dl 1.2 * tasks that would not otherwise complete normally. Its use in
737     * other situations is likely to be wrong. This method is
738 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
739 dl 1.2 * implementation to maintain guarantees.
740 dl 1.1 *
741 jsr166 1.10 * @param value the result value for this task
742 dl 1.1 */
743     public void complete(V value) {
744     try {
745     setRawResult(value);
746     } catch(Throwable rex) {
747     setDoneExceptionally(rex);
748     return;
749     }
750     setNormalCompletion();
751     }
752    
753 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
754     ForkJoinWorkerThread w = getWorker();
755     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
756     awaitDone(w, true);
757     return reportFutureResult();
758     }
759    
760     public final V get(long timeout, TimeUnit unit)
761     throws InterruptedException, ExecutionException, TimeoutException {
762     ForkJoinWorkerThread w = getWorker();
763     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
764     awaitDone(w, unit.toNanos(timeout));
765     return reportTimedFutureResult();
766     }
767    
768 dl 1.1 /**
769 dl 1.2 * Possibly executes other tasks until this task is ready, then
770     * returns the result of the computation. This method may be more
771 jsr166 1.8 * efficient than {@code join}, but is only applicable when
772 jsr166 1.9 * there are no potential dependencies between continuation of the
773 dl 1.2 * current task and that of any other task that might be executed
774     * while helping. (This usually holds for pure divide-and-conquer
775     * tasks). This method may be invoked only from within
776     * ForkJoinTask computations. Attempts to invoke in other contexts
777 jsr166 1.9 * result in exceptions or errors possibly including ClassCastException.
778 jsr166 1.10 *
779 dl 1.2 * @return the computed result
780     */
781     public final V helpJoin() {
782     ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
783     if (status < 0 || !w.unpushTask(this) || !tryExec())
784 dl 1.3 reportException(busyJoin(w));
785 dl 1.2 return getRawResult();
786     }
787    
788     /**
789     * Possibly executes other tasks until this task is ready. This
790     * method may be invoked only from within ForkJoinTask
791 jsr166 1.9 * computations. Attempts to invoke in other contexts result in
792 dl 1.4 * exceptions or errors possibly including ClassCastException.
793 dl 1.2 */
794     public final void quietlyHelpJoin() {
795     if (status >= 0) {
796     ForkJoinWorkerThread w =
797     (ForkJoinWorkerThread)(Thread.currentThread());
798     if (!w.unpushTask(this) || !tryQuietlyInvoke())
799 dl 1.3 busyJoin(w);
800 dl 1.2 }
801     }
802    
803     /**
804     * Joins this task, without returning its result or throwing an
805     * exception. This method may be useful when processing
806     * collections of tasks when some have been cancelled or otherwise
807     * known to have aborted.
808     */
809     public final void quietlyJoin() {
810     if (status >= 0) {
811     ForkJoinWorkerThread w = getWorker();
812     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
813     awaitDone(w, true);
814     }
815     }
816    
817     /**
818     * Commences performing this task and awaits its completion if
819     * necessary, without returning its result or throwing an
820     * exception. This method may be useful when processing
821     * collections of tasks when some have been cancelled or otherwise
822     * known to have aborted.
823     */
824     public final void quietlyInvoke() {
825     if (status >= 0 && !tryQuietlyInvoke())
826     quietlyJoin();
827     }
828    
829     /**
830 dl 1.3 * Possibly executes tasks until the pool hosting the current task
831     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
832     * designs in which many tasks are forked, but none are explicitly
833     * joined, instead executing them until all are processed.
834     */
835     public static void helpQuiesce() {
836     ((ForkJoinWorkerThread)(Thread.currentThread())).
837     helpQuiescePool();
838     }
839    
840     /**
841 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
842 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
843 dl 1.1 * this task, but only if reuse occurs when this task has either
844     * never been forked, or has been forked, then completed and all
845     * outstanding joins of this task have also completed. Effects
846     * under any other usage conditions are not guaranteed, and are
847     * almost surely wrong. This method may be useful when executing
848     * pre-constructed trees of subtasks in loops.
849     */
850     public void reinitialize() {
851     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
852     exceptionMap.remove(this);
853     status = 0;
854     }
855    
856     /**
857 dl 1.2 * Returns the pool hosting the current task execution, or null
858     * if this task is executing outside of any pool.
859 jsr166 1.10 *
860     * @return the pool, or null if none
861 dl 1.1 */
862 dl 1.2 public static ForkJoinPool getPool() {
863     Thread t = Thread.currentThread();
864     return ((t instanceof ForkJoinWorkerThread)?
865     ((ForkJoinWorkerThread)t).pool : null);
866 dl 1.1 }
867    
868     /**
869 dl 1.2 * Tries to unschedule this task for execution. This method will
870     * typically succeed if this task is the most recently forked task
871     * by the current thread, and has not commenced executing in
872     * another thread. This method may be useful when arranging
873     * alternative local processing of tasks that could have been, but
874     * were not, stolen. This method may be invoked only from within
875 dl 1.1 * ForkJoinTask computations. Attempts to invoke in other contexts
876 dl 1.4 * result in exceptions or errors possibly including ClassCastException.
877 jsr166 1.10 *
878 dl 1.2 * @return true if unforked
879 dl 1.1 */
880 dl 1.2 public boolean tryUnfork() {
881     return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
882 dl 1.1 }
883    
884     /**
885 dl 1.2 * Returns an estimate of the number of tasks that have been
886     * forked by the current worker thread but not yet executed. This
887     * value may be useful for heuristic decisions about whether to
888     * fork other tasks.
889 jsr166 1.10 *
890 dl 1.2 * @return the number of tasks
891     */
892     public static int getQueuedTaskCount() {
893     return ((ForkJoinWorkerThread)(Thread.currentThread())).
894     getQueueSize();
895     }
896    
897     /**
898 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
899 dl 1.1 * held by the current worker thread than there are other worker
900 dl 1.2 * threads that might steal them. This value may be useful for
901     * heuristic decisions about whether to fork other tasks. In many
902     * usages of ForkJoinTasks, at steady state, each worker should
903     * aim to maintain a small constant surplus (for example, 3) of
904     * tasks, and to process computations locally if this threshold is
905     * exceeded.
906 jsr166 1.10 *
907 dl 1.1 * @return the surplus number of tasks, which may be negative
908     */
909 dl 1.2 public static int getSurplusQueuedTaskCount() {
910 dl 1.1 return ((ForkJoinWorkerThread)(Thread.currentThread()))
911     .getEstimatedSurplusTaskCount();
912     }
913    
914 dl 1.2 // Extension methods
915 dl 1.1
916     /**
917 jsr166 1.8 * Returns the result that would be returned by {@code join},
918 dl 1.2 * even if this task completed abnormally, or null if this task is
919     * not known to have been completed. This method is designed to
920     * aid debugging, as well as to support extensions. Its use in any
921     * other context is discouraged.
922 dl 1.1 *
923 jsr166 1.10 * @return the result, or null if not completed
924 dl 1.1 */
925     public abstract V getRawResult();
926    
927     /**
928     * Forces the given value to be returned as a result. This method
929     * is designed to support extensions, and should not in general be
930     * called otherwise.
931     *
932     * @param value the value
933     */
934     protected abstract void setRawResult(V value);
935    
936     /**
937     * Immediately performs the base action of this task. This method
938     * is designed to support extensions, and should not in general be
939     * called otherwise. The return value controls whether this task
940     * is considered to be done normally. It may return false in
941     * asynchronous actions that require explicit invocations of
942 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
943 dl 1.1 * to indicate abnormal exit.
944 jsr166 1.10 *
945 dl 1.1 * @return true if completed normally
946     * @throws Error or RuntimeException if encountered during computation
947     */
948     protected abstract boolean exec();
949    
950 dl 1.2 /**
951 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
952     * the current thread but not yet executed, if one is
953     * available. There is no guarantee that this task will actually
954     * be polled or executed next. This method is designed primarily
955     * to support extensions, and is unlikely to be useful otherwise.
956     * This method may be invoked only from within ForkJoinTask
957     * computations. Attempts to invoke in other contexts result in
958     * exceptions or errors possibly including ClassCastException.
959 dl 1.2 *
960     * @return the next task, or null if none are available
961     */
962     protected static ForkJoinTask<?> peekNextLocalTask() {
963     return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
964     }
965    
966     /**
967 dl 1.6 * Unschedules and returns, without executing, the next task
968     * queued by the current thread but not yet executed. This method
969     * is designed primarily to support extensions, and is unlikely to
970     * be useful otherwise. This method may be invoked only from
971     * within ForkJoinTask computations. Attempts to invoke in other
972     * contexts result in exceptions or errors possibly including
973     * ClassCastException.
974 dl 1.2 *
975     * @return the next task, or null if none are available
976     */
977     protected static ForkJoinTask<?> pollNextLocalTask() {
978 dl 1.6 return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
979 dl 1.2 }
980 jsr166 1.7
981 dl 1.2 /**
982 dl 1.6 * Unschedules and returns, without executing, the next task
983     * queued by the current thread but not yet executed, if one is
984     * available, or if not available, a task that was forked by some
985     * other thread, if available. Availability may be transient, so a
986 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
987 dl 1.6 * of the pool this task is operating in. This method is designed
988     * primarily to support extensions, and is unlikely to be useful
989     * otherwise. This method may be invoked only from within
990 dl 1.4 * ForkJoinTask computations. Attempts to invoke in other contexts
991 dl 1.6 * result in exceptions or errors possibly including
992     * ClassCastException.
993 dl 1.4 *
994 dl 1.2 * @return a task, or null if none are available
995     */
996     protected static ForkJoinTask<?> pollTask() {
997     return ((ForkJoinWorkerThread)(Thread.currentThread())).
998 dl 1.4 pollTask();
999 dl 1.2 }
1000    
1001 dl 1.1 // Serialization support
1002    
1003     private static final long serialVersionUID = -7721805057305804111L;
1004    
1005     /**
1006     * Save the state to a stream.
1007     *
1008     * @serialData the current run status and the exception thrown
1009 jsr166 1.10 * during execution, or null if none
1010 dl 1.1 * @param s the stream
1011     */
1012     private void writeObject(java.io.ObjectOutputStream s)
1013     throws java.io.IOException {
1014     s.defaultWriteObject();
1015     s.writeObject(getException());
1016     }
1017    
1018     /**
1019     * Reconstitute the instance from a stream.
1020 jsr166 1.10 *
1021 dl 1.1 * @param s the stream
1022     */
1023     private void readObject(java.io.ObjectInputStream s)
1024     throws java.io.IOException, ClassNotFoundException {
1025     s.defaultReadObject();
1026 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1027     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1028 dl 1.1 Object ex = s.readObject();
1029     if (ex != null)
1030     setDoneExceptionally((Throwable)ex);
1031     }
1032    
1033     // Temporary Unsafe mechanics for preliminary release
1034 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1035     try {
1036     return Unsafe.getUnsafe();
1037     } catch (SecurityException se) {
1038     try {
1039     return java.security.AccessController.doPrivileged
1040     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1041     public Unsafe run() throws Exception {
1042     return getUnsafePrivileged();
1043     }});
1044     } catch (java.security.PrivilegedActionException e) {
1045     throw e.getCause();
1046     }
1047     }
1048     }
1049    
1050     private static Unsafe getUnsafePrivileged()
1051     throws NoSuchFieldException, IllegalAccessException {
1052     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1053     f.setAccessible(true);
1054     return (Unsafe) f.get(null);
1055     }
1056    
1057     private static long fieldOffset(String fieldName)
1058     throws NoSuchFieldException {
1059 jsr166 1.11 return UNSAFE.objectFieldOffset
1060 jsr166 1.5 (ForkJoinTask.class.getDeclaredField(fieldName));
1061     }
1062 dl 1.1
1063 jsr166 1.11 static final Unsafe UNSAFE;
1064 dl 1.1 static final long statusOffset;
1065    
1066     static {
1067     try {
1068 jsr166 1.11 UNSAFE = getUnsafe();
1069 jsr166 1.5 statusOffset = fieldOffset("status");
1070     } catch (Throwable e) {
1071     throw new RuntimeException("Could not initialize intrinsics", e);
1072     }
1073 dl 1.1 }
1074    
1075     }