ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.10
Committed: Tue Jul 21 00:15:13 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.9: +47 -27 lines
Log Message:
j.u.c. coding standards

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21 dl 1.4 *
22 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
26     * and {@code join}, or derivatives such as
27     * {@code invokeAll}. However, this class also provides a number
28 dl 1.2 * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31 dl 1.4 *
32 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
40 dl 1.2 * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
52 dl 1.2 * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61 jsr166 1.8 * waits for completion and report results using {@code Future}
62 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
67 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
72 dl 1.2 * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
80 dl 1.2 * method that somehow uses the control methods supplied by this base
81 jsr166 1.8 * class. While these methods have {@code public} access (to allow
82 dl 1.2 * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84     * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 dl 1.4 * exceptions or errors possibly including ClassCastException.
86 dl 1.1 *
87 jsr166 1.8 * <p>Most base support methods are {@code final} because their
88 dl 1.1 * implementations are intrinsically tied to the underlying
89     * lightweight task scheduling framework, and so cannot be overridden.
90     * Developers creating new basic styles of fork/join processing should
91 jsr166 1.8 * minimally implement {@code protected} methods
92     * {@code exec}, {@code setRawResult}, and
93     * {@code getRawResult}, while also introducing an abstract
94 dl 1.2 * computational method that can be implemented in its subclasses,
95 jsr166 1.8 * possibly relying on other {@code protected} methods provided
96 dl 1.2 * by this class.
97 dl 1.1 *
98     * <p>ForkJoinTasks should perform relatively small amounts of
99 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
100 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
101     * than 10000 basic computational steps. If tasks are too big, then
102 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
103 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
104     *
105 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
106 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
107     * in general sensible to serialize tasks only before or after, but
108 dl 1.1 * not during execution. Serialization is not relied on during
109     * execution itself.
110     */
111     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
112 dl 1.2
113 dl 1.1 /**
114 dl 1.2 * Run control status bits packed into a single int to minimize
115     * footprint and to ensure atomicity (via CAS). Status is
116     * initially zero, and takes on nonnegative values until
117 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
118     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
119     * blocking waits by other threads have SIGNAL_MASK bits set --
120     * bit 15 for external (nonFJ) waits, and the rest a count of
121     * waiting FJ threads. (This representation relies on
122     * ForkJoinPool max thread limits). Completion of a stolen task
123     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
124     * though suboptimal for some purposes, we use basic builtin
125     * wait/notify to take advantage of "monitor inflation" in JVMs
126     * that we would otherwise need to emulate to avoid adding further
127     * per-task bookkeeping overhead. Note that bits 16-28 are
128     * currently unused. Also value 0x80000000 is available as spare
129     * completion value.
130     */
131 jsr166 1.9 volatile int status; // accessed directly by pool and workers
132 dl 1.1
133     static final int COMPLETION_MASK = 0xe0000000;
134     static final int NORMAL = 0xe0000000; // == mask
135     static final int CANCELLED = 0xc0000000;
136     static final int EXCEPTIONAL = 0xa0000000;
137     static final int SIGNAL_MASK = 0x0000ffff;
138     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
139     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
140    
141     /**
142     * Table of exceptions thrown by tasks, to enable reporting by
143     * callers. Because exceptions are rare, we don't directly keep
144 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
145 dl 1.1 * that cancellation exceptions don't appear in the table, but are
146     * instead recorded as status values.
147 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
148 dl 1.1 */
149     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
150     Collections.synchronizedMap
151     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
152    
153     // within-package utilities
154    
155     /**
156 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
157 dl 1.1 */
158     static ForkJoinWorkerThread getWorker() {
159     Thread t = Thread.currentThread();
160     return ((t instanceof ForkJoinWorkerThread)?
161     (ForkJoinWorkerThread)t : null);
162     }
163    
164     final boolean casStatus(int cmp, int val) {
165     return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
166     }
167    
168     /**
169     * Workaround for not being able to rethrow unchecked exceptions.
170     */
171     static void rethrowException(Throwable ex) {
172     if (ex != null)
173     _unsafe.throwException(ex);
174     }
175    
176     // Setting completion status
177    
178     /**
179 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
180     *
181 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
182     */
183     final void setCompletion(int completion) {
184 dl 1.2 ForkJoinPool pool = getPool();
185 dl 1.1 if (pool != null) {
186     int s; // Clear signal bits while setting completion status
187     do;while ((s = status) >= 0 && !casStatus(s, completion));
188    
189     if ((s & SIGNAL_MASK) != 0) {
190     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
191     pool.updateRunningCount(s);
192     synchronized(this) { notifyAll(); }
193     }
194     }
195     else
196     externallySetCompletion(completion);
197     }
198    
199     /**
200     * Version of setCompletion for non-FJ threads. Leaves signal
201     * bits for unblocked threads to adjust, and always notifies.
202     */
203     private void externallySetCompletion(int completion) {
204     int s;
205     do;while ((s = status) >= 0 &&
206     !casStatus(s, (s & SIGNAL_MASK) | completion));
207     synchronized(this) { notifyAll(); }
208     }
209    
210     /**
211     * Sets status to indicate normal completion
212     */
213     final void setNormalCompletion() {
214     // Try typical fast case -- single CAS, no signal, not already done.
215     // Manually expand casStatus to improve chances of inlining it
216     if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
217     setCompletion(NORMAL);
218     }
219    
220     // internal waiting and notification
221    
222     /**
223     * Performs the actual monitor wait for awaitDone
224     */
225     private void doAwaitDone() {
226     // Minimize lock bias and in/de-flation effects by maximizing
227     // chances of waiting inside sync
228     try {
229     while (status >= 0)
230     synchronized(this) { if (status >= 0) wait(); }
231     } catch (InterruptedException ie) {
232     onInterruptedWait();
233     }
234     }
235    
236     /**
237     * Performs the actual monitor wait for awaitDone
238     */
239     private void doAwaitDone(long startTime, long nanos) {
240     synchronized(this) {
241     try {
242     while (status >= 0) {
243     long nt = nanos - System.nanoTime() - startTime;
244     if (nt <= 0)
245     break;
246     wait(nt / 1000000, (int)(nt % 1000000));
247     }
248     } catch (InterruptedException ie) {
249     onInterruptedWait();
250     }
251     }
252     }
253    
254     // Awaiting completion
255    
256     /**
257     * Sets status to indicate there is joiner, then waits for join,
258     * surrounded with pool notifications.
259 jsr166 1.10 *
260 dl 1.1 * @return status upon exit
261     */
262 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
263 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
264     int s;
265     while ((s = status) >= 0) {
266     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
267     if (pool == null || !pool.preJoin(this, maintainParallelism))
268     doAwaitDone();
269     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
270     adjustPoolCountsOnUnblock(pool);
271     break;
272     }
273     }
274     return s;
275     }
276    
277     /**
278     * Timed version of awaitDone
279     * @return status upon exit
280     */
281 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
282 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
283     int s;
284     while ((s = status) >= 0) {
285     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
286     long startTime = System.nanoTime();
287     if (pool == null || !pool.preJoin(this, false))
288     doAwaitDone(startTime, nanos);
289     if ((s = status) >= 0) {
290     adjustPoolCountsOnCancelledWait(pool);
291     s = status;
292     }
293     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
294     adjustPoolCountsOnUnblock(pool);
295     break;
296     }
297     }
298     return s;
299     }
300    
301     /**
302 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
303 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
304     */
305     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
306     int s;
307     do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
308     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
309     pool.updateRunningCount(s);
310     }
311    
312     /**
313 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
314 dl 1.1 */
315     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
316     if (pool != null) {
317     int s;
318     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
319     if (casStatus(s, s - 1)) {
320     pool.updateRunningCount(1);
321     break;
322     }
323     }
324     }
325     }
326    
327 dl 1.2 /**
328 jsr166 1.10 * Handles interruptions during waits.
329 dl 1.2 */
330 dl 1.1 private void onInterruptedWait() {
331 dl 1.2 ForkJoinWorkerThread w = getWorker();
332     if (w == null)
333     Thread.currentThread().interrupt(); // re-interrupt
334     else if (w.isTerminating())
335 dl 1.3 cancelIgnoringExceptions();
336 dl 1.2 // else if FJworker, ignore interrupt
337 dl 1.1 }
338    
339     // Recording and reporting exceptions
340    
341     private void setDoneExceptionally(Throwable rex) {
342     exceptionMap.put(this, rex);
343     setCompletion(EXCEPTIONAL);
344     }
345    
346     /**
347 jsr166 1.10 * Throws the exception associated with status s.
348     *
349 dl 1.1 * @throws the exception
350     */
351     private void reportException(int s) {
352     if ((s &= COMPLETION_MASK) < NORMAL) {
353     if (s == CANCELLED)
354     throw new CancellationException();
355     else
356     rethrowException(exceptionMap.get(this));
357     }
358     }
359    
360     /**
361 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
362 dl 1.1 * Only call when isDone known to be true.
363     */
364     private V reportFutureResult()
365     throws ExecutionException, InterruptedException {
366     int s = status & COMPLETION_MASK;
367     if (s < NORMAL) {
368     Throwable ex;
369     if (s == CANCELLED)
370     throw new CancellationException();
371     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
372     throw new ExecutionException(ex);
373     if (Thread.interrupted())
374     throw new InterruptedException();
375     }
376     return getRawResult();
377     }
378    
379     /**
380     * Returns result or throws exception using j.u.c.Future conventions
381 jsr166 1.10 * with timeouts.
382 dl 1.1 */
383     private V reportTimedFutureResult()
384     throws InterruptedException, ExecutionException, TimeoutException {
385     Throwable ex;
386     int s = status & COMPLETION_MASK;
387     if (s == NORMAL)
388     return getRawResult();
389     if (s == CANCELLED)
390     throw new CancellationException();
391     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
392     throw new ExecutionException(ex);
393     if (Thread.interrupted())
394     throw new InterruptedException();
395     throw new TimeoutException();
396     }
397    
398     // internal execution methods
399    
400     /**
401     * Calls exec, recording completion, and rethrowing exception if
402 jsr166 1.10 * encountered. Caller should normally check status before calling.
403     *
404 dl 1.1 * @return true if completed normally
405     */
406     private boolean tryExec() {
407     try { // try block must contain only call to exec
408     if (!exec())
409     return false;
410     } catch (Throwable rex) {
411     setDoneExceptionally(rex);
412     rethrowException(rex);
413     return false; // not reached
414     }
415     setNormalCompletion();
416     return true;
417     }
418    
419     /**
420     * Main execution method used by worker threads. Invokes
421 jsr166 1.10 * base computation unless already complete.
422 dl 1.1 */
423     final void quietlyExec() {
424     if (status >= 0) {
425     try {
426     if (!exec())
427     return;
428     } catch(Throwable rex) {
429     setDoneExceptionally(rex);
430     return;
431     }
432     setNormalCompletion();
433     }
434     }
435    
436     /**
437 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
438     * Caller should normally check status before calling.
439     *
440 dl 1.1 * @return true if completed normally
441     */
442     private boolean tryQuietlyInvoke() {
443     try {
444     if (!exec())
445     return false;
446     } catch (Throwable rex) {
447     setDoneExceptionally(rex);
448     return false;
449     }
450     setNormalCompletion();
451     return true;
452     }
453    
454     /**
455 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
456 dl 1.1 */
457 dl 1.3 final void cancelIgnoringExceptions() {
458 dl 1.1 try {
459     cancel(false);
460     } catch(Throwable ignore) {
461     }
462     }
463    
464 dl 1.3 /**
465     * Main implementation of helpJoin
466     */
467     private int busyJoin(ForkJoinWorkerThread w) {
468     int s;
469     ForkJoinTask<?> t;
470     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
471     t.quietlyExec();
472     return (s >= 0)? awaitDone(w, false) : s; // block if no work
473     }
474    
475 dl 1.1 // public methods
476    
477     /**
478     * Arranges to asynchronously execute this task. While it is not
479     * necessarily enforced, it is a usage error to fork a task more
480     * than once unless it has completed and been reinitialized. This
481 dl 1.2 * method may be invoked only from within ForkJoinTask
482 dl 1.1 * computations. Attempts to invoke in other contexts result in
483 dl 1.4 * exceptions or errors possibly including ClassCastException.
484 dl 1.1 */
485     public final void fork() {
486     ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
487     }
488    
489     /**
490     * Returns the result of the computation when it is ready.
491 jsr166 1.8 * This method differs from {@code get} in that abnormal
492 dl 1.1 * completion results in RuntimeExceptions or Errors, not
493     * ExecutionExceptions.
494     *
495     * @return the computed result
496     */
497     public final V join() {
498     ForkJoinWorkerThread w = getWorker();
499     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
500     reportException(awaitDone(w, true));
501     return getRawResult();
502     }
503    
504     /**
505 dl 1.2 * Commences performing this task, awaits its completion if
506     * necessary, and return its result.
507 jsr166 1.10 *
508 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
509 jsr166 1.10 * exception) if the underlying computation did so
510 dl 1.1 * @return the computed result
511     */
512     public final V invoke() {
513     if (status >= 0 && tryExec())
514     return getRawResult();
515     else
516     return join();
517     }
518    
519     /**
520 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
521 dl 1.2 * both of them or an exception is encountered. This method may be
522     * invoked only from within ForkJoinTask computations. Attempts to
523     * invoke in other contexts result in exceptions or errors
524 dl 1.4 * possibly including ClassCastException.
525 jsr166 1.10 *
526 dl 1.2 * @param t1 one task
527     * @param t2 the other task
528     * @throws NullPointerException if t1 or t2 are null
529 jsr166 1.10 * @throws RuntimeException or Error if either task did so
530 dl 1.1 */
531 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
532     t2.fork();
533     t1.invoke();
534     t2.join();
535 dl 1.1 }
536    
537     /**
538 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
539 dl 1.2 * for all of them. If any task encounters an exception, others
540     * may be cancelled. This method may be invoked only from within
541     * ForkJoinTask computations. Attempts to invoke in other contexts
542 dl 1.4 * result in exceptions or errors possibly including ClassCastException.
543 jsr166 1.10 *
544 dl 1.2 * @param tasks the array of tasks
545 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
546     * @throws RuntimeException or Error if any task did so
547 dl 1.1 */
548 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
549     Throwable ex = null;
550     int last = tasks.length - 1;
551     for (int i = last; i >= 0; --i) {
552     ForkJoinTask<?> t = tasks[i];
553     if (t == null) {
554     if (ex == null)
555     ex = new NullPointerException();
556     }
557     else if (i != 0)
558     t.fork();
559     else {
560     t.quietlyInvoke();
561     if (ex == null)
562     ex = t.getException();
563     }
564     }
565     for (int i = 1; i <= last; ++i) {
566     ForkJoinTask<?> t = tasks[i];
567     if (t != null) {
568     if (ex != null)
569     t.cancel(false);
570     else {
571     t.quietlyJoin();
572     if (ex == null)
573     ex = t.getException();
574     }
575     }
576 dl 1.1 }
577 dl 1.2 if (ex != null)
578     rethrowException(ex);
579 dl 1.1 }
580    
581     /**
582 dl 1.2 * Forks all tasks in the collection, returning when
583 jsr166 1.8 * {@code isDone} holds for all of them. If any task
584 dl 1.2 * encounters an exception, others may be cancelled. This method
585     * may be invoked only from within ForkJoinTask
586 jsr166 1.9 * computations. Attempts to invoke in other contexts result in
587 dl 1.4 * exceptions or errors possibly including ClassCastException.
588 jsr166 1.10 *
589 dl 1.2 * @param tasks the collection of tasks
590 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
591     * @throws RuntimeException or Error if any task did so
592 dl 1.1 */
593 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
594     if (!(tasks instanceof List)) {
595     invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
596     return;
597     }
598     List<? extends ForkJoinTask<?>> ts =
599     (List<? extends ForkJoinTask<?>>)tasks;
600     Throwable ex = null;
601     int last = ts.size() - 1;
602     for (int i = last; i >= 0; --i) {
603     ForkJoinTask<?> t = ts.get(i);
604     if (t == null) {
605     if (ex == null)
606     ex = new NullPointerException();
607     }
608     else if (i != 0)
609     t.fork();
610     else {
611     t.quietlyInvoke();
612     if (ex == null)
613     ex = t.getException();
614     }
615     }
616     for (int i = 1; i <= last; ++i) {
617     ForkJoinTask<?> t = ts.get(i);
618     if (t != null) {
619     if (ex != null)
620     t.cancel(false);
621     else {
622     t.quietlyJoin();
623     if (ex == null)
624     ex = t.getException();
625     }
626     }
627     }
628     if (ex != null)
629     rethrowException(ex);
630 dl 1.1 }
631    
632     /**
633     * Returns true if the computation performed by this task has
634     * completed (or has been cancelled).
635 jsr166 1.10 *
636 dl 1.1 * @return true if this computation has completed
637     */
638     public final boolean isDone() {
639     return status < 0;
640     }
641    
642     /**
643     * Returns true if this task was cancelled.
644 jsr166 1.10 *
645 dl 1.1 * @return true if this task was cancelled
646     */
647     public final boolean isCancelled() {
648     return (status & COMPLETION_MASK) == CANCELLED;
649     }
650    
651     /**
652     * Asserts that the results of this task's computation will not be
653 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
654 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
655     * will report true, and {@code join} will result in a
656     * {@code CancellationException} being thrown. Otherwise, when
657 dl 1.1 * cancellation races with completion, there are no guarantees
658 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
659     * {@code join} will return normally or via an exception, or
660 dl 1.1 * whether these behaviors will remain consistent upon repeated
661     * invocation.
662     *
663     * <p>This method may be overridden in subclasses, but if so, must
664     * still ensure that these minimal properties hold. In particular,
665     * the cancel method itself must not throw exceptions.
666     *
667     * <p> This method is designed to be invoked by <em>other</em>
668     * tasks. To terminate the current task, you can just return or
669     * throw an unchecked exception from its computation method, or
670 jsr166 1.8 * invoke {@code completeExceptionally}.
671 dl 1.1 *
672     * @param mayInterruptIfRunning this value is ignored in the
673     * default implementation because tasks are not in general
674     * cancelled via interruption.
675     *
676     * @return true if this task is now cancelled
677     */
678     public boolean cancel(boolean mayInterruptIfRunning) {
679     setCompletion(CANCELLED);
680     return (status & COMPLETION_MASK) == CANCELLED;
681     }
682    
683     /**
684 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
685     *
686 dl 1.3 * @return true if this task threw an exception or was cancelled
687     */
688     public final boolean isCompletedAbnormally() {
689     return (status & COMPLETION_MASK) < NORMAL;
690     }
691    
692     /**
693     * Returns the exception thrown by the base computation, or a
694     * CancellationException if cancelled, or null if none or if the
695     * method has not yet completed.
696 jsr166 1.10 *
697 dl 1.3 * @return the exception, or null if none
698     */
699     public final Throwable getException() {
700     int s = status & COMPLETION_MASK;
701     if (s >= NORMAL)
702     return null;
703     if (s == CANCELLED)
704     return new CancellationException();
705     return exceptionMap.get(this);
706     }
707    
708     /**
709 dl 1.1 * Completes this task abnormally, and if not already aborted or
710     * cancelled, causes it to throw the given exception upon
711 jsr166 1.8 * {@code join} and related operations. This method may be used
712 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
713 dl 1.2 * completion of tasks that would not otherwise complete. Its use
714     * in other situations is likely to be wrong. This method is
715 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
716 dl 1.2 * implementation to maintain guarantees.
717     *
718 dl 1.1 * @param ex the exception to throw. If this exception is
719     * not a RuntimeException or Error, the actual exception thrown
720     * will be a RuntimeException with cause ex.
721     */
722     public void completeExceptionally(Throwable ex) {
723     setDoneExceptionally((ex instanceof RuntimeException) ||
724     (ex instanceof Error)? ex :
725     new RuntimeException(ex));
726     }
727    
728     /**
729     * Completes this task, and if not already aborted or cancelled,
730 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
731 dl 1.1 * operations. This method may be used to provide results for
732     * asynchronous tasks, or to provide alternative handling for
733 dl 1.2 * tasks that would not otherwise complete normally. Its use in
734     * other situations is likely to be wrong. This method is
735 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
736 dl 1.2 * implementation to maintain guarantees.
737 dl 1.1 *
738 jsr166 1.10 * @param value the result value for this task
739 dl 1.1 */
740     public void complete(V value) {
741     try {
742     setRawResult(value);
743     } catch(Throwable rex) {
744     setDoneExceptionally(rex);
745     return;
746     }
747     setNormalCompletion();
748     }
749    
750 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
751     ForkJoinWorkerThread w = getWorker();
752     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
753     awaitDone(w, true);
754     return reportFutureResult();
755     }
756    
757     public final V get(long timeout, TimeUnit unit)
758     throws InterruptedException, ExecutionException, TimeoutException {
759     ForkJoinWorkerThread w = getWorker();
760     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
761     awaitDone(w, unit.toNanos(timeout));
762     return reportTimedFutureResult();
763     }
764    
765 dl 1.1 /**
766 dl 1.2 * Possibly executes other tasks until this task is ready, then
767     * returns the result of the computation. This method may be more
768 jsr166 1.8 * efficient than {@code join}, but is only applicable when
769 jsr166 1.9 * there are no potential dependencies between continuation of the
770 dl 1.2 * current task and that of any other task that might be executed
771     * while helping. (This usually holds for pure divide-and-conquer
772     * tasks). This method may be invoked only from within
773     * ForkJoinTask computations. Attempts to invoke in other contexts
774 jsr166 1.9 * result in exceptions or errors possibly including ClassCastException.
775 jsr166 1.10 *
776 dl 1.2 * @return the computed result
777     */
778     public final V helpJoin() {
779     ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
780     if (status < 0 || !w.unpushTask(this) || !tryExec())
781 dl 1.3 reportException(busyJoin(w));
782 dl 1.2 return getRawResult();
783     }
784    
785     /**
786     * Possibly executes other tasks until this task is ready. This
787     * method may be invoked only from within ForkJoinTask
788 jsr166 1.9 * computations. Attempts to invoke in other contexts result in
789 dl 1.4 * exceptions or errors possibly including ClassCastException.
790 dl 1.2 */
791     public final void quietlyHelpJoin() {
792     if (status >= 0) {
793     ForkJoinWorkerThread w =
794     (ForkJoinWorkerThread)(Thread.currentThread());
795     if (!w.unpushTask(this) || !tryQuietlyInvoke())
796 dl 1.3 busyJoin(w);
797 dl 1.2 }
798     }
799    
800     /**
801     * Joins this task, without returning its result or throwing an
802     * exception. This method may be useful when processing
803     * collections of tasks when some have been cancelled or otherwise
804     * known to have aborted.
805     */
806     public final void quietlyJoin() {
807     if (status >= 0) {
808     ForkJoinWorkerThread w = getWorker();
809     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
810     awaitDone(w, true);
811     }
812     }
813    
814     /**
815     * Commences performing this task and awaits its completion if
816     * necessary, without returning its result or throwing an
817     * exception. This method may be useful when processing
818     * collections of tasks when some have been cancelled or otherwise
819     * known to have aborted.
820     */
821     public final void quietlyInvoke() {
822     if (status >= 0 && !tryQuietlyInvoke())
823     quietlyJoin();
824     }
825    
826     /**
827 dl 1.3 * Possibly executes tasks until the pool hosting the current task
828     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
829     * designs in which many tasks are forked, but none are explicitly
830     * joined, instead executing them until all are processed.
831     */
832     public static void helpQuiesce() {
833     ((ForkJoinWorkerThread)(Thread.currentThread())).
834     helpQuiescePool();
835     }
836    
837     /**
838 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
839 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
840 dl 1.1 * this task, but only if reuse occurs when this task has either
841     * never been forked, or has been forked, then completed and all
842     * outstanding joins of this task have also completed. Effects
843     * under any other usage conditions are not guaranteed, and are
844     * almost surely wrong. This method may be useful when executing
845     * pre-constructed trees of subtasks in loops.
846     */
847     public void reinitialize() {
848     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
849     exceptionMap.remove(this);
850     status = 0;
851     }
852    
853     /**
854 dl 1.2 * Returns the pool hosting the current task execution, or null
855     * if this task is executing outside of any pool.
856 jsr166 1.10 *
857     * @return the pool, or null if none
858 dl 1.1 */
859 dl 1.2 public static ForkJoinPool getPool() {
860     Thread t = Thread.currentThread();
861     return ((t instanceof ForkJoinWorkerThread)?
862     ((ForkJoinWorkerThread)t).pool : null);
863 dl 1.1 }
864    
865     /**
866 dl 1.2 * Tries to unschedule this task for execution. This method will
867     * typically succeed if this task is the most recently forked task
868     * by the current thread, and has not commenced executing in
869     * another thread. This method may be useful when arranging
870     * alternative local processing of tasks that could have been, but
871     * were not, stolen. This method may be invoked only from within
872 dl 1.1 * ForkJoinTask computations. Attempts to invoke in other contexts
873 dl 1.4 * result in exceptions or errors possibly including ClassCastException.
874 jsr166 1.10 *
875 dl 1.2 * @return true if unforked
876 dl 1.1 */
877 dl 1.2 public boolean tryUnfork() {
878     return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
879 dl 1.1 }
880    
881     /**
882 dl 1.2 * Returns an estimate of the number of tasks that have been
883     * forked by the current worker thread but not yet executed. This
884     * value may be useful for heuristic decisions about whether to
885     * fork other tasks.
886 jsr166 1.10 *
887 dl 1.2 * @return the number of tasks
888     */
889     public static int getQueuedTaskCount() {
890     return ((ForkJoinWorkerThread)(Thread.currentThread())).
891     getQueueSize();
892     }
893    
894     /**
895 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
896 dl 1.1 * held by the current worker thread than there are other worker
897 dl 1.2 * threads that might steal them. This value may be useful for
898     * heuristic decisions about whether to fork other tasks. In many
899     * usages of ForkJoinTasks, at steady state, each worker should
900     * aim to maintain a small constant surplus (for example, 3) of
901     * tasks, and to process computations locally if this threshold is
902     * exceeded.
903 jsr166 1.10 *
904 dl 1.1 * @return the surplus number of tasks, which may be negative
905     */
906 dl 1.2 public static int getSurplusQueuedTaskCount() {
907 dl 1.1 return ((ForkJoinWorkerThread)(Thread.currentThread()))
908     .getEstimatedSurplusTaskCount();
909     }
910    
911 dl 1.2 // Extension methods
912 dl 1.1
913     /**
914 jsr166 1.8 * Returns the result that would be returned by {@code join},
915 dl 1.2 * even if this task completed abnormally, or null if this task is
916     * not known to have been completed. This method is designed to
917     * aid debugging, as well as to support extensions. Its use in any
918     * other context is discouraged.
919 dl 1.1 *
920 jsr166 1.10 * @return the result, or null if not completed
921 dl 1.1 */
922     public abstract V getRawResult();
923    
924     /**
925     * Forces the given value to be returned as a result. This method
926     * is designed to support extensions, and should not in general be
927     * called otherwise.
928     *
929     * @param value the value
930     */
931     protected abstract void setRawResult(V value);
932    
933     /**
934     * Immediately performs the base action of this task. This method
935     * is designed to support extensions, and should not in general be
936     * called otherwise. The return value controls whether this task
937     * is considered to be done normally. It may return false in
938     * asynchronous actions that require explicit invocations of
939 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
940 dl 1.1 * to indicate abnormal exit.
941 jsr166 1.10 *
942 dl 1.1 * @return true if completed normally
943     * @throws Error or RuntimeException if encountered during computation
944     */
945     protected abstract boolean exec();
946    
947 dl 1.2 /**
948 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
949     * the current thread but not yet executed, if one is
950     * available. There is no guarantee that this task will actually
951     * be polled or executed next. This method is designed primarily
952     * to support extensions, and is unlikely to be useful otherwise.
953     * This method may be invoked only from within ForkJoinTask
954     * computations. Attempts to invoke in other contexts result in
955     * exceptions or errors possibly including ClassCastException.
956 dl 1.2 *
957     * @return the next task, or null if none are available
958     */
959     protected static ForkJoinTask<?> peekNextLocalTask() {
960     return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
961     }
962    
963     /**
964 dl 1.6 * Unschedules and returns, without executing, the next task
965     * queued by the current thread but not yet executed. This method
966     * is designed primarily to support extensions, and is unlikely to
967     * be useful otherwise. This method may be invoked only from
968     * within ForkJoinTask computations. Attempts to invoke in other
969     * contexts result in exceptions or errors possibly including
970     * ClassCastException.
971 dl 1.2 *
972     * @return the next task, or null if none are available
973     */
974     protected static ForkJoinTask<?> pollNextLocalTask() {
975 dl 1.6 return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
976 dl 1.2 }
977 jsr166 1.7
978 dl 1.2 /**
979 dl 1.6 * Unschedules and returns, without executing, the next task
980     * queued by the current thread but not yet executed, if one is
981     * available, or if not available, a task that was forked by some
982     * other thread, if available. Availability may be transient, so a
983 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
984 dl 1.6 * of the pool this task is operating in. This method is designed
985     * primarily to support extensions, and is unlikely to be useful
986     * otherwise. This method may be invoked only from within
987 dl 1.4 * ForkJoinTask computations. Attempts to invoke in other contexts
988 dl 1.6 * result in exceptions or errors possibly including
989     * ClassCastException.
990 dl 1.4 *
991 dl 1.2 * @return a task, or null if none are available
992     */
993     protected static ForkJoinTask<?> pollTask() {
994     return ((ForkJoinWorkerThread)(Thread.currentThread())).
995 dl 1.4 pollTask();
996 dl 1.2 }
997    
998 dl 1.1 // Serialization support
999    
1000     private static final long serialVersionUID = -7721805057305804111L;
1001    
1002     /**
1003     * Save the state to a stream.
1004     *
1005     * @serialData the current run status and the exception thrown
1006 jsr166 1.10 * during execution, or null if none
1007 dl 1.1 * @param s the stream
1008     */
1009     private void writeObject(java.io.ObjectOutputStream s)
1010     throws java.io.IOException {
1011     s.defaultWriteObject();
1012     s.writeObject(getException());
1013     }
1014    
1015     /**
1016     * Reconstitute the instance from a stream.
1017 jsr166 1.10 *
1018 dl 1.1 * @param s the stream
1019     */
1020     private void readObject(java.io.ObjectInputStream s)
1021     throws java.io.IOException, ClassNotFoundException {
1022     s.defaultReadObject();
1023 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1024     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1025 dl 1.1 Object ex = s.readObject();
1026     if (ex != null)
1027     setDoneExceptionally((Throwable)ex);
1028     }
1029    
1030     // Temporary Unsafe mechanics for preliminary release
1031 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1032     try {
1033     return Unsafe.getUnsafe();
1034     } catch (SecurityException se) {
1035     try {
1036     return java.security.AccessController.doPrivileged
1037     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1038     public Unsafe run() throws Exception {
1039     return getUnsafePrivileged();
1040     }});
1041     } catch (java.security.PrivilegedActionException e) {
1042     throw e.getCause();
1043     }
1044     }
1045     }
1046    
1047     private static Unsafe getUnsafePrivileged()
1048     throws NoSuchFieldException, IllegalAccessException {
1049     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1050     f.setAccessible(true);
1051     return (Unsafe) f.get(null);
1052     }
1053    
1054     private static long fieldOffset(String fieldName)
1055     throws NoSuchFieldException {
1056     return _unsafe.objectFieldOffset
1057     (ForkJoinTask.class.getDeclaredField(fieldName));
1058     }
1059 dl 1.1
1060     static final Unsafe _unsafe;
1061     static final long statusOffset;
1062    
1063     static {
1064     try {
1065 jsr166 1.5 _unsafe = getUnsafe();
1066     statusOffset = fieldOffset("status");
1067     } catch (Throwable e) {
1068     throw new RuntimeException("Could not initialize intrinsics", e);
1069     }
1070 dl 1.1 }
1071    
1072     }