ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.6
Committed: Thu Jul 16 15:32:34 2009 UTC (14 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.5: +27 -28 lines
Log Message:
Async mode, drainTasks

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21 dl 1.4 *
22 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25     * programs using ForkJoinTasks employ only methods <code>fork</code>
26     * and <code>join</code>, or derivatives such as
27     * <code>invokeAll</code>. However, this class also provides a number
28     * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31 dl 1.4 *
32 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39     * been computed. Computations should avoid <code>synchronized</code>
40     * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51     * not permitting checked exceptions such as <code>IOExceptions</code>
52     * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61     * waits for completion and report results using <code>Future</code>
62     * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66     * equivalent to <code>fork(); join()</code> but always attempts to
67     * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71     * Method <code>invokeAll</code> (available in multiple versions)
72     * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79     * established in a constructor, and then defines a <code>compute</code>
80     * method that somehow uses the control methods supplied by this base
81     * class. While these methods have <code>public</code> access (to allow
82     * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84     * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 dl 1.4 * exceptions or errors possibly including ClassCastException.
86 dl 1.1 *
87 dl 1.2 * <p>Most base support methods are <code>final</code> because their
88 dl 1.1 * implementations are intrinsically tied to the underlying
89     * lightweight task scheduling framework, and so cannot be overridden.
90     * Developers creating new basic styles of fork/join processing should
91 dl 1.2 * minimally implement <code>protected</code> methods
92     * <code>exec</code>, <code>setRawResult</code>, and
93     * <code>getRawResult</code>, while also introducing an abstract
94     * computational method that can be implemented in its subclasses,
95     * possibly relying on other <code>protected</code> methods provided
96     * by this class.
97 dl 1.1 *
98     * <p>ForkJoinTasks should perform relatively small amounts of
99     * computations, othewise splitting into smaller tasks. As a very
100     * rough rule of thumb, a task should perform more than 100 and less
101     * than 10000 basic computational steps. If tasks are too big, then
102     * parellelism cannot improve throughput. If too small, then memory
103     * and internal task maintenance overhead may overwhelm processing.
104     *
105 dl 1.2 * <p>ForkJoinTasks are <code>Serializable</code>, which enables them
106     * to be used in extensions such as remote execution frameworks. It is
107     * in general sensible to serialize tasks only before or after, but
108 dl 1.1 * not during execution. Serialization is not relied on during
109     * execution itself.
110     */
111     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
112 dl 1.2
113 dl 1.1 /**
114 dl 1.2 * Run control status bits packed into a single int to minimize
115     * footprint and to ensure atomicity (via CAS). Status is
116     * initially zero, and takes on nonnegative values until
117 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
118     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
119     * blocking waits by other threads have SIGNAL_MASK bits set --
120     * bit 15 for external (nonFJ) waits, and the rest a count of
121     * waiting FJ threads. (This representation relies on
122     * ForkJoinPool max thread limits). Completion of a stolen task
123     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
124     * though suboptimal for some purposes, we use basic builtin
125     * wait/notify to take advantage of "monitor inflation" in JVMs
126     * that we would otherwise need to emulate to avoid adding further
127     * per-task bookkeeping overhead. Note that bits 16-28 are
128     * currently unused. Also value 0x80000000 is available as spare
129     * completion value.
130     */
131     volatile int status; // accessed directy by pool and workers
132    
133     static final int COMPLETION_MASK = 0xe0000000;
134     static final int NORMAL = 0xe0000000; // == mask
135     static final int CANCELLED = 0xc0000000;
136     static final int EXCEPTIONAL = 0xa0000000;
137     static final int SIGNAL_MASK = 0x0000ffff;
138     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
139     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
140    
141     /**
142     * Table of exceptions thrown by tasks, to enable reporting by
143     * callers. Because exceptions are rare, we don't directly keep
144     * them with task objects, but instead us a weak ref table. Note
145     * that cancellation exceptions don't appear in the table, but are
146     * instead recorded as status values.
147     * Todo: Use ConcurrentReferenceHashMap
148     */
149     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
150     Collections.synchronizedMap
151     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
152    
153     // within-package utilities
154    
155     /**
156     * Get current worker thread, or null if not a worker thread
157     */
158     static ForkJoinWorkerThread getWorker() {
159     Thread t = Thread.currentThread();
160     return ((t instanceof ForkJoinWorkerThread)?
161     (ForkJoinWorkerThread)t : null);
162     }
163    
164     final boolean casStatus(int cmp, int val) {
165     return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
166     }
167    
168     /**
169     * Workaround for not being able to rethrow unchecked exceptions.
170     */
171     static void rethrowException(Throwable ex) {
172     if (ex != null)
173     _unsafe.throwException(ex);
174     }
175    
176     // Setting completion status
177    
178     /**
179     * Mark completion and wake up threads waiting to join this task.
180     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
181     */
182     final void setCompletion(int completion) {
183 dl 1.2 ForkJoinPool pool = getPool();
184 dl 1.1 if (pool != null) {
185     int s; // Clear signal bits while setting completion status
186     do;while ((s = status) >= 0 && !casStatus(s, completion));
187    
188     if ((s & SIGNAL_MASK) != 0) {
189     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
190     pool.updateRunningCount(s);
191     synchronized(this) { notifyAll(); }
192     }
193     }
194     else
195     externallySetCompletion(completion);
196     }
197    
198     /**
199     * Version of setCompletion for non-FJ threads. Leaves signal
200     * bits for unblocked threads to adjust, and always notifies.
201     */
202     private void externallySetCompletion(int completion) {
203     int s;
204     do;while ((s = status) >= 0 &&
205     !casStatus(s, (s & SIGNAL_MASK) | completion));
206     synchronized(this) { notifyAll(); }
207     }
208    
209     /**
210     * Sets status to indicate normal completion
211     */
212     final void setNormalCompletion() {
213     // Try typical fast case -- single CAS, no signal, not already done.
214     // Manually expand casStatus to improve chances of inlining it
215     if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
216     setCompletion(NORMAL);
217     }
218    
219     // internal waiting and notification
220    
221     /**
222     * Performs the actual monitor wait for awaitDone
223     */
224     private void doAwaitDone() {
225     // Minimize lock bias and in/de-flation effects by maximizing
226     // chances of waiting inside sync
227     try {
228     while (status >= 0)
229     synchronized(this) { if (status >= 0) wait(); }
230     } catch (InterruptedException ie) {
231     onInterruptedWait();
232     }
233     }
234    
235     /**
236     * Performs the actual monitor wait for awaitDone
237     */
238     private void doAwaitDone(long startTime, long nanos) {
239     synchronized(this) {
240     try {
241     while (status >= 0) {
242     long nt = nanos - System.nanoTime() - startTime;
243     if (nt <= 0)
244     break;
245     wait(nt / 1000000, (int)(nt % 1000000));
246     }
247     } catch (InterruptedException ie) {
248     onInterruptedWait();
249     }
250     }
251     }
252    
253     // Awaiting completion
254    
255     /**
256     * Sets status to indicate there is joiner, then waits for join,
257     * surrounded with pool notifications.
258     * @return status upon exit
259     */
260 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
261 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
262     int s;
263     while ((s = status) >= 0) {
264     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
265     if (pool == null || !pool.preJoin(this, maintainParallelism))
266     doAwaitDone();
267     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
268     adjustPoolCountsOnUnblock(pool);
269     break;
270     }
271     }
272     return s;
273     }
274    
275     /**
276     * Timed version of awaitDone
277     * @return status upon exit
278     */
279 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
280 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
281     int s;
282     while ((s = status) >= 0) {
283     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
284     long startTime = System.nanoTime();
285     if (pool == null || !pool.preJoin(this, false))
286     doAwaitDone(startTime, nanos);
287     if ((s = status) >= 0) {
288     adjustPoolCountsOnCancelledWait(pool);
289     s = status;
290     }
291     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
292     adjustPoolCountsOnUnblock(pool);
293     break;
294     }
295     }
296     return s;
297     }
298    
299     /**
300     * Notify pool that thread is unblocked. Called by signalled
301     * threads when woken by non-FJ threads (which is atypical).
302     */
303     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
304     int s;
305     do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
306     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
307     pool.updateRunningCount(s);
308     }
309    
310     /**
311     * Notify pool to adjust counts on cancelled or timed out wait
312     */
313     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
314     if (pool != null) {
315     int s;
316     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
317     if (casStatus(s, s - 1)) {
318     pool.updateRunningCount(1);
319     break;
320     }
321     }
322     }
323     }
324    
325 dl 1.2 /**
326     * Handle interruptions during waits.
327     */
328 dl 1.1 private void onInterruptedWait() {
329 dl 1.2 ForkJoinWorkerThread w = getWorker();
330     if (w == null)
331     Thread.currentThread().interrupt(); // re-interrupt
332     else if (w.isTerminating())
333 dl 1.3 cancelIgnoringExceptions();
334 dl 1.2 // else if FJworker, ignore interrupt
335 dl 1.1 }
336    
337     // Recording and reporting exceptions
338    
339     private void setDoneExceptionally(Throwable rex) {
340     exceptionMap.put(this, rex);
341     setCompletion(EXCEPTIONAL);
342     }
343    
344     /**
345     * Throws the exception associated with status s;
346     * @throws the exception
347     */
348     private void reportException(int s) {
349     if ((s &= COMPLETION_MASK) < NORMAL) {
350     if (s == CANCELLED)
351     throw new CancellationException();
352     else
353     rethrowException(exceptionMap.get(this));
354     }
355     }
356    
357     /**
358     * Returns result or throws exception using j.u.c.Future conventions
359     * Only call when isDone known to be true.
360     */
361     private V reportFutureResult()
362     throws ExecutionException, InterruptedException {
363     int s = status & COMPLETION_MASK;
364     if (s < NORMAL) {
365     Throwable ex;
366     if (s == CANCELLED)
367     throw new CancellationException();
368     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
369     throw new ExecutionException(ex);
370     if (Thread.interrupted())
371     throw new InterruptedException();
372     }
373     return getRawResult();
374     }
375    
376     /**
377     * Returns result or throws exception using j.u.c.Future conventions
378     * with timeouts
379     */
380     private V reportTimedFutureResult()
381     throws InterruptedException, ExecutionException, TimeoutException {
382     Throwable ex;
383     int s = status & COMPLETION_MASK;
384     if (s == NORMAL)
385     return getRawResult();
386     if (s == CANCELLED)
387     throw new CancellationException();
388     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
389     throw new ExecutionException(ex);
390     if (Thread.interrupted())
391     throw new InterruptedException();
392     throw new TimeoutException();
393     }
394    
395     // internal execution methods
396    
397     /**
398     * Calls exec, recording completion, and rethrowing exception if
399     * encountered. Caller should normally check status before calling
400     * @return true if completed normally
401     */
402     private boolean tryExec() {
403     try { // try block must contain only call to exec
404     if (!exec())
405     return false;
406     } catch (Throwable rex) {
407     setDoneExceptionally(rex);
408     rethrowException(rex);
409     return false; // not reached
410     }
411     setNormalCompletion();
412     return true;
413     }
414    
415     /**
416     * Main execution method used by worker threads. Invokes
417     * base computation unless already complete
418     */
419     final void quietlyExec() {
420     if (status >= 0) {
421     try {
422     if (!exec())
423     return;
424     } catch(Throwable rex) {
425     setDoneExceptionally(rex);
426     return;
427     }
428     setNormalCompletion();
429     }
430     }
431    
432     /**
433     * Calls exec, recording but not rethrowing exception
434     * Caller should normally check status before calling
435     * @return true if completed normally
436     */
437     private boolean tryQuietlyInvoke() {
438     try {
439     if (!exec())
440     return false;
441     } catch (Throwable rex) {
442     setDoneExceptionally(rex);
443     return false;
444     }
445     setNormalCompletion();
446     return true;
447     }
448    
449     /**
450     * Cancel, ignoring any exceptions it throws
451     */
452 dl 1.3 final void cancelIgnoringExceptions() {
453 dl 1.1 try {
454     cancel(false);
455     } catch(Throwable ignore) {
456     }
457     }
458    
459 dl 1.3 /**
460     * Main implementation of helpJoin
461     */
462     private int busyJoin(ForkJoinWorkerThread w) {
463     int s;
464     ForkJoinTask<?> t;
465     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
466     t.quietlyExec();
467     return (s >= 0)? awaitDone(w, false) : s; // block if no work
468     }
469    
470 dl 1.1 // public methods
471    
472     /**
473     * Arranges to asynchronously execute this task. While it is not
474     * necessarily enforced, it is a usage error to fork a task more
475     * than once unless it has completed and been reinitialized. This
476 dl 1.2 * method may be invoked only from within ForkJoinTask
477 dl 1.1 * computations. Attempts to invoke in other contexts result in
478 dl 1.4 * exceptions or errors possibly including ClassCastException.
479 dl 1.1 */
480     public final void fork() {
481     ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
482     }
483    
484     /**
485     * Returns the result of the computation when it is ready.
486 dl 1.2 * This method differs from <code>get</code> in that abnormal
487 dl 1.1 * completion results in RuntimeExceptions or Errors, not
488     * ExecutionExceptions.
489     *
490     * @return the computed result
491     */
492     public final V join() {
493     ForkJoinWorkerThread w = getWorker();
494     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
495     reportException(awaitDone(w, true));
496     return getRawResult();
497     }
498    
499     /**
500 dl 1.2 * Commences performing this task, awaits its completion if
501     * necessary, and return its result.
502 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
503     * exception) if the underlying computation did so.
504     * @return the computed result
505     */
506     public final V invoke() {
507     if (status >= 0 && tryExec())
508     return getRawResult();
509     else
510     return join();
511     }
512    
513     /**
514 dl 1.2 * Forks both tasks, returning when <code>isDone</code> holds for
515     * both of them or an exception is encountered. This method may be
516     * invoked only from within ForkJoinTask computations. Attempts to
517     * invoke in other contexts result in exceptions or errors
518 dl 1.4 * possibly including ClassCastException.
519 dl 1.2 * @param t1 one task
520     * @param t2 the other task
521     * @throws NullPointerException if t1 or t2 are null
522     * @throws RuntimeException or Error if either task did so.
523 dl 1.1 */
524 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
525     t2.fork();
526     t1.invoke();
527     t2.join();
528 dl 1.1 }
529    
530     /**
531 dl 1.2 * Forks the given tasks, returning when <code>isDone</code> holds
532     * for all of them. If any task encounters an exception, others
533     * may be cancelled. This method may be invoked only from within
534     * ForkJoinTask computations. Attempts to invoke in other contexts
535 dl 1.4 * result in exceptions or errors possibly including ClassCastException.
536 dl 1.2 * @param tasks the array of tasks
537     * @throws NullPointerException if tasks or any element are null.
538     * @throws RuntimeException or Error if any task did so.
539 dl 1.1 */
540 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
541     Throwable ex = null;
542     int last = tasks.length - 1;
543     for (int i = last; i >= 0; --i) {
544     ForkJoinTask<?> t = tasks[i];
545     if (t == null) {
546     if (ex == null)
547     ex = new NullPointerException();
548     }
549     else if (i != 0)
550     t.fork();
551     else {
552     t.quietlyInvoke();
553     if (ex == null)
554     ex = t.getException();
555     }
556     }
557     for (int i = 1; i <= last; ++i) {
558     ForkJoinTask<?> t = tasks[i];
559     if (t != null) {
560     if (ex != null)
561     t.cancel(false);
562     else {
563     t.quietlyJoin();
564     if (ex == null)
565     ex = t.getException();
566     }
567     }
568 dl 1.1 }
569 dl 1.2 if (ex != null)
570     rethrowException(ex);
571 dl 1.1 }
572    
573     /**
574 dl 1.2 * Forks all tasks in the collection, returning when
575     * <code>isDone</code> holds for all of them. If any task
576     * encounters an exception, others may be cancelled. This method
577     * may be invoked only from within ForkJoinTask
578     * computations. Attempts to invoke in other contexts resul!t in
579 dl 1.4 * exceptions or errors possibly including ClassCastException.
580 dl 1.2 * @param tasks the collection of tasks
581     * @throws NullPointerException if tasks or any element are null.
582     * @throws RuntimeException or Error if any task did so.
583 dl 1.1 */
584 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
585     if (!(tasks instanceof List)) {
586     invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
587     return;
588     }
589     List<? extends ForkJoinTask<?>> ts =
590     (List<? extends ForkJoinTask<?>>)tasks;
591     Throwable ex = null;
592     int last = ts.size() - 1;
593     for (int i = last; i >= 0; --i) {
594     ForkJoinTask<?> t = ts.get(i);
595     if (t == null) {
596     if (ex == null)
597     ex = new NullPointerException();
598     }
599     else if (i != 0)
600     t.fork();
601     else {
602     t.quietlyInvoke();
603     if (ex == null)
604     ex = t.getException();
605     }
606     }
607     for (int i = 1; i <= last; ++i) {
608     ForkJoinTask<?> t = ts.get(i);
609     if (t != null) {
610     if (ex != null)
611     t.cancel(false);
612     else {
613     t.quietlyJoin();
614     if (ex == null)
615     ex = t.getException();
616     }
617     }
618     }
619     if (ex != null)
620     rethrowException(ex);
621 dl 1.1 }
622    
623     /**
624     * Returns true if the computation performed by this task has
625     * completed (or has been cancelled).
626     * @return true if this computation has completed
627     */
628     public final boolean isDone() {
629     return status < 0;
630     }
631    
632     /**
633     * Returns true if this task was cancelled.
634     * @return true if this task was cancelled
635     */
636     public final boolean isCancelled() {
637     return (status & COMPLETION_MASK) == CANCELLED;
638     }
639    
640     /**
641     * Asserts that the results of this task's computation will not be
642 dl 1.2 * used. If a cancellation occurs before atempting to execute this
643     * task, then execution will be suppressed, <code>isCancelled</code>
644     * will report true, and <code>join</code> will result in a
645     * <code>CancellationException</code> being thrown. Otherwise, when
646 dl 1.1 * cancellation races with completion, there are no guarantees
647 dl 1.2 * about whether <code>isCancelled</code> will report true, whether
648     * <code>join</code> will return normally or via an exception, or
649 dl 1.1 * whether these behaviors will remain consistent upon repeated
650     * invocation.
651     *
652     * <p>This method may be overridden in subclasses, but if so, must
653     * still ensure that these minimal properties hold. In particular,
654     * the cancel method itself must not throw exceptions.
655     *
656     * <p> This method is designed to be invoked by <em>other</em>
657     * tasks. To terminate the current task, you can just return or
658     * throw an unchecked exception from its computation method, or
659 dl 1.2 * invoke <code>completeExceptionally</code>.
660 dl 1.1 *
661     * @param mayInterruptIfRunning this value is ignored in the
662     * default implementation because tasks are not in general
663     * cancelled via interruption.
664     *
665     * @return true if this task is now cancelled
666     */
667     public boolean cancel(boolean mayInterruptIfRunning) {
668     setCompletion(CANCELLED);
669     return (status & COMPLETION_MASK) == CANCELLED;
670     }
671    
672     /**
673 dl 1.3 * Returns true if this task threw an exception or was cancelled
674     * @return true if this task threw an exception or was cancelled
675     */
676     public final boolean isCompletedAbnormally() {
677     return (status & COMPLETION_MASK) < NORMAL;
678     }
679    
680     /**
681     * Returns the exception thrown by the base computation, or a
682     * CancellationException if cancelled, or null if none or if the
683     * method has not yet completed.
684     * @return the exception, or null if none
685     */
686     public final Throwable getException() {
687     int s = status & COMPLETION_MASK;
688     if (s >= NORMAL)
689     return null;
690     if (s == CANCELLED)
691     return new CancellationException();
692     return exceptionMap.get(this);
693     }
694    
695     /**
696 dl 1.1 * Completes this task abnormally, and if not already aborted or
697     * cancelled, causes it to throw the given exception upon
698 dl 1.2 * <code>join</code> and related operations. This method may be used
699 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
700 dl 1.2 * completion of tasks that would not otherwise complete. Its use
701     * in other situations is likely to be wrong. This method is
702     * overridable, but overridden versions must invoke <code>super</code>
703     * implementation to maintain guarantees.
704     *
705 dl 1.1 * @param ex the exception to throw. If this exception is
706     * not a RuntimeException or Error, the actual exception thrown
707     * will be a RuntimeException with cause ex.
708     */
709     public void completeExceptionally(Throwable ex) {
710     setDoneExceptionally((ex instanceof RuntimeException) ||
711     (ex instanceof Error)? ex :
712     new RuntimeException(ex));
713     }
714    
715     /**
716     * Completes this task, and if not already aborted or cancelled,
717 dl 1.2 * returning a <code>null</code> result upon <code>join</code> and related
718 dl 1.1 * operations. This method may be used to provide results for
719     * asynchronous tasks, or to provide alternative handling for
720 dl 1.2 * tasks that would not otherwise complete normally. Its use in
721     * other situations is likely to be wrong. This method is
722     * overridable, but overridden versions must invoke <code>super</code>
723     * implementation to maintain guarantees.
724 dl 1.1 *
725     * @param value the result value for this task.
726     */
727     public void complete(V value) {
728     try {
729     setRawResult(value);
730     } catch(Throwable rex) {
731     setDoneExceptionally(rex);
732     return;
733     }
734     setNormalCompletion();
735     }
736    
737 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
738     ForkJoinWorkerThread w = getWorker();
739     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
740     awaitDone(w, true);
741     return reportFutureResult();
742     }
743    
744     public final V get(long timeout, TimeUnit unit)
745     throws InterruptedException, ExecutionException, TimeoutException {
746     ForkJoinWorkerThread w = getWorker();
747     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
748     awaitDone(w, unit.toNanos(timeout));
749     return reportTimedFutureResult();
750     }
751    
752 dl 1.1 /**
753 dl 1.2 * Possibly executes other tasks until this task is ready, then
754     * returns the result of the computation. This method may be more
755     * efficient than <code>join</code>, but is only applicable when
756     * there are no potemtial dependencies between continuation of the
757     * current task and that of any other task that might be executed
758     * while helping. (This usually holds for pure divide-and-conquer
759     * tasks). This method may be invoked only from within
760     * ForkJoinTask computations. Attempts to invoke in other contexts
761 dl 1.4 * resul!t in exceptions or errors possibly including ClassCastException.
762 dl 1.2 * @return the computed result
763     */
764     public final V helpJoin() {
765     ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
766     if (status < 0 || !w.unpushTask(this) || !tryExec())
767 dl 1.3 reportException(busyJoin(w));
768 dl 1.2 return getRawResult();
769     }
770    
771     /**
772     * Possibly executes other tasks until this task is ready. This
773     * method may be invoked only from within ForkJoinTask
774     * computations. Attempts to invoke in other contexts resul!t in
775 dl 1.4 * exceptions or errors possibly including ClassCastException.
776 dl 1.2 */
777     public final void quietlyHelpJoin() {
778     if (status >= 0) {
779     ForkJoinWorkerThread w =
780     (ForkJoinWorkerThread)(Thread.currentThread());
781     if (!w.unpushTask(this) || !tryQuietlyInvoke())
782 dl 1.3 busyJoin(w);
783 dl 1.2 }
784     }
785    
786     /**
787     * Joins this task, without returning its result or throwing an
788     * exception. This method may be useful when processing
789     * collections of tasks when some have been cancelled or otherwise
790     * known to have aborted.
791     */
792     public final void quietlyJoin() {
793     if (status >= 0) {
794     ForkJoinWorkerThread w = getWorker();
795     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
796     awaitDone(w, true);
797     }
798     }
799    
800     /**
801     * Commences performing this task and awaits its completion if
802     * necessary, without returning its result or throwing an
803     * exception. This method may be useful when processing
804     * collections of tasks when some have been cancelled or otherwise
805     * known to have aborted.
806     */
807     public final void quietlyInvoke() {
808     if (status >= 0 && !tryQuietlyInvoke())
809     quietlyJoin();
810     }
811    
812     /**
813 dl 1.3 * Possibly executes tasks until the pool hosting the current task
814     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
815     * designs in which many tasks are forked, but none are explicitly
816     * joined, instead executing them until all are processed.
817     */
818     public static void helpQuiesce() {
819     ((ForkJoinWorkerThread)(Thread.currentThread())).
820     helpQuiescePool();
821     }
822    
823     /**
824 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
825 dl 1.2 * subsequent <code>fork</code>. This method allows repeated reuse of
826 dl 1.1 * this task, but only if reuse occurs when this task has either
827     * never been forked, or has been forked, then completed and all
828     * outstanding joins of this task have also completed. Effects
829     * under any other usage conditions are not guaranteed, and are
830     * almost surely wrong. This method may be useful when executing
831     * pre-constructed trees of subtasks in loops.
832     */
833     public void reinitialize() {
834     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
835     exceptionMap.remove(this);
836     status = 0;
837     }
838    
839     /**
840 dl 1.2 * Returns the pool hosting the current task execution, or null
841     * if this task is executing outside of any pool.
842     * @return the pool, or null if none.
843 dl 1.1 */
844 dl 1.2 public static ForkJoinPool getPool() {
845     Thread t = Thread.currentThread();
846     return ((t instanceof ForkJoinWorkerThread)?
847     ((ForkJoinWorkerThread)t).pool : null);
848 dl 1.1 }
849    
850     /**
851 dl 1.2 * Tries to unschedule this task for execution. This method will
852     * typically succeed if this task is the most recently forked task
853     * by the current thread, and has not commenced executing in
854     * another thread. This method may be useful when arranging
855     * alternative local processing of tasks that could have been, but
856     * were not, stolen. This method may be invoked only from within
857 dl 1.1 * ForkJoinTask computations. Attempts to invoke in other contexts
858 dl 1.4 * result in exceptions or errors possibly including ClassCastException.
859 dl 1.2 * @return true if unforked
860 dl 1.1 */
861 dl 1.2 public boolean tryUnfork() {
862     return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
863 dl 1.1 }
864    
865     /**
866 dl 1.2 * Returns an estimate of the number of tasks that have been
867     * forked by the current worker thread but not yet executed. This
868     * value may be useful for heuristic decisions about whether to
869     * fork other tasks.
870     * @return the number of tasks
871     */
872     public static int getQueuedTaskCount() {
873     return ((ForkJoinWorkerThread)(Thread.currentThread())).
874     getQueueSize();
875     }
876    
877     /**
878 dl 1.1 * Returns a estimate of how many more locally queued tasks are
879     * held by the current worker thread than there are other worker
880 dl 1.2 * threads that might steal them. This value may be useful for
881     * heuristic decisions about whether to fork other tasks. In many
882     * usages of ForkJoinTasks, at steady state, each worker should
883     * aim to maintain a small constant surplus (for example, 3) of
884     * tasks, and to process computations locally if this threshold is
885     * exceeded.
886 dl 1.1 * @return the surplus number of tasks, which may be negative
887     */
888 dl 1.2 public static int getSurplusQueuedTaskCount() {
889 dl 1.1 return ((ForkJoinWorkerThread)(Thread.currentThread()))
890     .getEstimatedSurplusTaskCount();
891     }
892    
893 dl 1.2 // Extension methods
894 dl 1.1
895     /**
896 dl 1.2 * Returns the result that would be returned by <code>join</code>,
897     * even if this task completed abnormally, or null if this task is
898     * not known to have been completed. This method is designed to
899     * aid debugging, as well as to support extensions. Its use in any
900     * other context is discouraged.
901 dl 1.1 *
902     * @return the result, or null if not completed.
903     */
904     public abstract V getRawResult();
905    
906     /**
907     * Forces the given value to be returned as a result. This method
908     * is designed to support extensions, and should not in general be
909     * called otherwise.
910     *
911     * @param value the value
912     */
913     protected abstract void setRawResult(V value);
914    
915     /**
916     * Immediately performs the base action of this task. This method
917     * is designed to support extensions, and should not in general be
918     * called otherwise. The return value controls whether this task
919     * is considered to be done normally. It may return false in
920     * asynchronous actions that require explicit invocations of
921 dl 1.2 * <code>complete</code> to become joinable. It may throw exceptions
922 dl 1.1 * to indicate abnormal exit.
923     * @return true if completed normally
924     * @throws Error or RuntimeException if encountered during computation
925     */
926     protected abstract boolean exec();
927    
928 dl 1.2 /**
929 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
930     * the current thread but not yet executed, if one is
931     * available. There is no guarantee that this task will actually
932     * be polled or executed next. This method is designed primarily
933     * to support extensions, and is unlikely to be useful otherwise.
934     * This method may be invoked only from within ForkJoinTask
935     * computations. Attempts to invoke in other contexts result in
936     * exceptions or errors possibly including ClassCastException.
937 dl 1.2 *
938     * @return the next task, or null if none are available
939     */
940     protected static ForkJoinTask<?> peekNextLocalTask() {
941     return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
942     }
943    
944     /**
945 dl 1.6 * Unschedules and returns, without executing, the next task
946     * queued by the current thread but not yet executed. This method
947     * is designed primarily to support extensions, and is unlikely to
948     * be useful otherwise. This method may be invoked only from
949     * within ForkJoinTask computations. Attempts to invoke in other
950     * contexts result in exceptions or errors possibly including
951     * ClassCastException.
952 dl 1.2 *
953     * @return the next task, or null if none are available
954     */
955     protected static ForkJoinTask<?> pollNextLocalTask() {
956 dl 1.6 return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
957 dl 1.2 }
958 dl 1.6
959 dl 1.2 /**
960 dl 1.6 * Unschedules and returns, without executing, the next task
961     * queued by the current thread but not yet executed, if one is
962     * available, or if not available, a task that was forked by some
963     * other thread, if available. Availability may be transient, so a
964     * <code>null</code> result does not necessarily imply quiecence
965     * of the pool this task is operating in. This method is designed
966     * primarily to support extensions, and is unlikely to be useful
967     * otherwise. This method may be invoked only from within
968 dl 1.4 * ForkJoinTask computations. Attempts to invoke in other contexts
969 dl 1.6 * result in exceptions or errors possibly including
970     * ClassCastException.
971 dl 1.4 *
972 dl 1.2 * @return a task, or null if none are available
973     */
974     protected static ForkJoinTask<?> pollTask() {
975     return ((ForkJoinWorkerThread)(Thread.currentThread())).
976 dl 1.4 pollTask();
977 dl 1.2 }
978    
979 dl 1.1 // Serialization support
980    
981     private static final long serialVersionUID = -7721805057305804111L;
982    
983     /**
984     * Save the state to a stream.
985     *
986     * @serialData the current run status and the exception thrown
987     * during execution, or null if none.
988     * @param s the stream
989     */
990     private void writeObject(java.io.ObjectOutputStream s)
991     throws java.io.IOException {
992     s.defaultWriteObject();
993     s.writeObject(getException());
994     }
995    
996     /**
997     * Reconstitute the instance from a stream.
998     * @param s the stream
999     */
1000     private void readObject(java.io.ObjectInputStream s)
1001     throws java.io.IOException, ClassNotFoundException {
1002     s.defaultReadObject();
1003 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1004     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1005 dl 1.1 Object ex = s.readObject();
1006     if (ex != null)
1007     setDoneExceptionally((Throwable)ex);
1008     }
1009    
1010     // Temporary Unsafe mechanics for preliminary release
1011 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1012     try {
1013     return Unsafe.getUnsafe();
1014     } catch (SecurityException se) {
1015     try {
1016     return java.security.AccessController.doPrivileged
1017     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1018     public Unsafe run() throws Exception {
1019     return getUnsafePrivileged();
1020     }});
1021     } catch (java.security.PrivilegedActionException e) {
1022     throw e.getCause();
1023     }
1024     }
1025     }
1026    
1027     private static Unsafe getUnsafePrivileged()
1028     throws NoSuchFieldException, IllegalAccessException {
1029     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1030     f.setAccessible(true);
1031     return (Unsafe) f.get(null);
1032     }
1033    
1034     private static long fieldOffset(String fieldName)
1035     throws NoSuchFieldException {
1036     return _unsafe.objectFieldOffset
1037     (ForkJoinTask.class.getDeclaredField(fieldName));
1038     }
1039 dl 1.1
1040     static final Unsafe _unsafe;
1041     static final long statusOffset;
1042    
1043     static {
1044     try {
1045 jsr166 1.5 _unsafe = getUnsafe();
1046     statusOffset = fieldOffset("status");
1047     } catch (Throwable e) {
1048     throw new RuntimeException("Could not initialize intrinsics", e);
1049     }
1050 dl 1.1 }
1051    
1052     }