ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.13
Committed: Wed Jul 22 19:04:11 2009 UTC (14 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.12: +47 -23 lines
Log Message:
Add ForkJoinTask.inForkJoinPool; other misc

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21 dl 1.4 *
22 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
26     * and {@code join}, or derivatives such as
27     * {@code invokeAll}. However, this class also provides a number
28 dl 1.2 * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31 dl 1.4 *
32 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
40 dl 1.2 * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
52 dl 1.2 * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61 jsr166 1.8 * waits for completion and report results using {@code Future}
62 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
67 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
72 dl 1.2 * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
80 dl 1.2 * method that somehow uses the control methods supplied by this base
81 jsr166 1.8 * class. While these methods have {@code public} access (to allow
82 dl 1.2 * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84 dl 1.13 * ForkJoinTasks (as may be determined using method {@link
85     * #inForkJoinPool}). Attempts to invoke them in other contexts
86     * result in exceptions or errors possibly including
87     * ClassCastException.
88 dl 1.1 *
89 jsr166 1.8 * <p>Most base support methods are {@code final} because their
90 dl 1.1 * implementations are intrinsically tied to the underlying
91     * lightweight task scheduling framework, and so cannot be overridden.
92     * Developers creating new basic styles of fork/join processing should
93 jsr166 1.8 * minimally implement {@code protected} methods
94     * {@code exec}, {@code setRawResult}, and
95     * {@code getRawResult}, while also introducing an abstract
96 dl 1.2 * computational method that can be implemented in its subclasses,
97 jsr166 1.8 * possibly relying on other {@code protected} methods provided
98 dl 1.2 * by this class.
99 dl 1.1 *
100     * <p>ForkJoinTasks should perform relatively small amounts of
101 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
102 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
103     * than 10000 basic computational steps. If tasks are too big, then
104 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
105 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
106     *
107 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
108 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
109     * in general sensible to serialize tasks only before or after, but
110 dl 1.1 * not during execution. Serialization is not relied on during
111     * execution itself.
112 jsr166 1.12 *
113     * @since 1.7
114     * @author Doug Lea
115 dl 1.1 */
116     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
117 dl 1.2
118 dl 1.1 /**
119 dl 1.2 * Run control status bits packed into a single int to minimize
120     * footprint and to ensure atomicity (via CAS). Status is
121     * initially zero, and takes on nonnegative values until
122 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
123     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
124     * blocking waits by other threads have SIGNAL_MASK bits set --
125     * bit 15 for external (nonFJ) waits, and the rest a count of
126     * waiting FJ threads. (This representation relies on
127     * ForkJoinPool max thread limits). Completion of a stolen task
128     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
129     * though suboptimal for some purposes, we use basic builtin
130     * wait/notify to take advantage of "monitor inflation" in JVMs
131     * that we would otherwise need to emulate to avoid adding further
132     * per-task bookkeeping overhead. Note that bits 16-28 are
133     * currently unused. Also value 0x80000000 is available as spare
134     * completion value.
135     */
136 jsr166 1.9 volatile int status; // accessed directly by pool and workers
137 dl 1.1
138     static final int COMPLETION_MASK = 0xe0000000;
139     static final int NORMAL = 0xe0000000; // == mask
140     static final int CANCELLED = 0xc0000000;
141     static final int EXCEPTIONAL = 0xa0000000;
142     static final int SIGNAL_MASK = 0x0000ffff;
143     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
144     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
145    
146     /**
147     * Table of exceptions thrown by tasks, to enable reporting by
148     * callers. Because exceptions are rare, we don't directly keep
149 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
150 dl 1.1 * that cancellation exceptions don't appear in the table, but are
151     * instead recorded as status values.
152 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
153 dl 1.1 */
154     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
155     Collections.synchronizedMap
156     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
157    
158     // within-package utilities
159    
160     /**
161 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
162 dl 1.1 */
163     static ForkJoinWorkerThread getWorker() {
164     Thread t = Thread.currentThread();
165     return ((t instanceof ForkJoinWorkerThread)?
166     (ForkJoinWorkerThread)t : null);
167     }
168    
169     final boolean casStatus(int cmp, int val) {
170 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
171 dl 1.1 }
172    
173     /**
174     * Workaround for not being able to rethrow unchecked exceptions.
175     */
176     static void rethrowException(Throwable ex) {
177     if (ex != null)
178 jsr166 1.11 UNSAFE.throwException(ex);
179 dl 1.1 }
180    
181     // Setting completion status
182    
183     /**
184 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
185     *
186 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
187     */
188     final void setCompletion(int completion) {
189 dl 1.2 ForkJoinPool pool = getPool();
190 dl 1.1 if (pool != null) {
191     int s; // Clear signal bits while setting completion status
192     do;while ((s = status) >= 0 && !casStatus(s, completion));
193    
194     if ((s & SIGNAL_MASK) != 0) {
195     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
196     pool.updateRunningCount(s);
197     synchronized(this) { notifyAll(); }
198     }
199     }
200     else
201     externallySetCompletion(completion);
202     }
203    
204     /**
205     * Version of setCompletion for non-FJ threads. Leaves signal
206     * bits for unblocked threads to adjust, and always notifies.
207     */
208     private void externallySetCompletion(int completion) {
209     int s;
210     do;while ((s = status) >= 0 &&
211     !casStatus(s, (s & SIGNAL_MASK) | completion));
212     synchronized(this) { notifyAll(); }
213     }
214    
215     /**
216     * Sets status to indicate normal completion
217     */
218     final void setNormalCompletion() {
219     // Try typical fast case -- single CAS, no signal, not already done.
220     // Manually expand casStatus to improve chances of inlining it
221 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
222 dl 1.1 setCompletion(NORMAL);
223     }
224    
225     // internal waiting and notification
226    
227     /**
228     * Performs the actual monitor wait for awaitDone
229     */
230     private void doAwaitDone() {
231     // Minimize lock bias and in/de-flation effects by maximizing
232     // chances of waiting inside sync
233     try {
234     while (status >= 0)
235     synchronized(this) { if (status >= 0) wait(); }
236     } catch (InterruptedException ie) {
237     onInterruptedWait();
238     }
239     }
240    
241     /**
242     * Performs the actual monitor wait for awaitDone
243     */
244     private void doAwaitDone(long startTime, long nanos) {
245     synchronized(this) {
246     try {
247     while (status >= 0) {
248     long nt = nanos - System.nanoTime() - startTime;
249     if (nt <= 0)
250     break;
251     wait(nt / 1000000, (int)(nt % 1000000));
252     }
253     } catch (InterruptedException ie) {
254     onInterruptedWait();
255     }
256     }
257     }
258    
259     // Awaiting completion
260    
261     /**
262     * Sets status to indicate there is joiner, then waits for join,
263     * surrounded with pool notifications.
264 jsr166 1.10 *
265 dl 1.1 * @return status upon exit
266     */
267 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
268 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
269     int s;
270     while ((s = status) >= 0) {
271     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
272     if (pool == null || !pool.preJoin(this, maintainParallelism))
273     doAwaitDone();
274     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
275     adjustPoolCountsOnUnblock(pool);
276     break;
277     }
278     }
279     return s;
280     }
281    
282     /**
283     * Timed version of awaitDone
284     * @return status upon exit
285     */
286 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
287 dl 1.1 ForkJoinPool pool = w == null? null : w.pool;
288     int s;
289     while ((s = status) >= 0) {
290     if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
291     long startTime = System.nanoTime();
292     if (pool == null || !pool.preJoin(this, false))
293     doAwaitDone(startTime, nanos);
294     if ((s = status) >= 0) {
295     adjustPoolCountsOnCancelledWait(pool);
296     s = status;
297     }
298     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
299     adjustPoolCountsOnUnblock(pool);
300     break;
301     }
302     }
303     return s;
304     }
305    
306     /**
307 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
308 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
309     */
310     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
311     int s;
312     do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
313     if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
314     pool.updateRunningCount(s);
315     }
316    
317     /**
318 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
319 dl 1.1 */
320     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
321     if (pool != null) {
322     int s;
323     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
324     if (casStatus(s, s - 1)) {
325     pool.updateRunningCount(1);
326     break;
327     }
328     }
329     }
330     }
331    
332 dl 1.2 /**
333 jsr166 1.10 * Handles interruptions during waits.
334 dl 1.2 */
335 dl 1.1 private void onInterruptedWait() {
336 dl 1.2 ForkJoinWorkerThread w = getWorker();
337     if (w == null)
338     Thread.currentThread().interrupt(); // re-interrupt
339     else if (w.isTerminating())
340 dl 1.3 cancelIgnoringExceptions();
341 dl 1.2 // else if FJworker, ignore interrupt
342 dl 1.1 }
343    
344     // Recording and reporting exceptions
345    
346     private void setDoneExceptionally(Throwable rex) {
347     exceptionMap.put(this, rex);
348     setCompletion(EXCEPTIONAL);
349     }
350    
351     /**
352 jsr166 1.10 * Throws the exception associated with status s.
353     *
354 dl 1.1 * @throws the exception
355     */
356     private void reportException(int s) {
357     if ((s &= COMPLETION_MASK) < NORMAL) {
358     if (s == CANCELLED)
359     throw new CancellationException();
360     else
361     rethrowException(exceptionMap.get(this));
362     }
363     }
364    
365     /**
366 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
367 dl 1.1 * Only call when isDone known to be true.
368     */
369     private V reportFutureResult()
370     throws ExecutionException, InterruptedException {
371     int s = status & COMPLETION_MASK;
372     if (s < NORMAL) {
373     Throwable ex;
374     if (s == CANCELLED)
375     throw new CancellationException();
376     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
377     throw new ExecutionException(ex);
378     if (Thread.interrupted())
379     throw new InterruptedException();
380     }
381     return getRawResult();
382     }
383    
384     /**
385     * Returns result or throws exception using j.u.c.Future conventions
386 jsr166 1.10 * with timeouts.
387 dl 1.1 */
388     private V reportTimedFutureResult()
389     throws InterruptedException, ExecutionException, TimeoutException {
390     Throwable ex;
391     int s = status & COMPLETION_MASK;
392     if (s == NORMAL)
393     return getRawResult();
394     if (s == CANCELLED)
395     throw new CancellationException();
396     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
397     throw new ExecutionException(ex);
398     if (Thread.interrupted())
399     throw new InterruptedException();
400     throw new TimeoutException();
401     }
402    
403     // internal execution methods
404    
405     /**
406     * Calls exec, recording completion, and rethrowing exception if
407 jsr166 1.10 * encountered. Caller should normally check status before calling.
408     *
409 dl 1.1 * @return true if completed normally
410     */
411     private boolean tryExec() {
412     try { // try block must contain only call to exec
413     if (!exec())
414     return false;
415     } catch (Throwable rex) {
416     setDoneExceptionally(rex);
417     rethrowException(rex);
418     return false; // not reached
419     }
420     setNormalCompletion();
421     return true;
422     }
423    
424     /**
425     * Main execution method used by worker threads. Invokes
426 jsr166 1.10 * base computation unless already complete.
427 dl 1.1 */
428     final void quietlyExec() {
429     if (status >= 0) {
430     try {
431     if (!exec())
432     return;
433     } catch(Throwable rex) {
434     setDoneExceptionally(rex);
435     return;
436     }
437     setNormalCompletion();
438     }
439     }
440    
441     /**
442 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
443     * Caller should normally check status before calling.
444     *
445 dl 1.1 * @return true if completed normally
446     */
447     private boolean tryQuietlyInvoke() {
448     try {
449     if (!exec())
450     return false;
451     } catch (Throwable rex) {
452     setDoneExceptionally(rex);
453     return false;
454     }
455     setNormalCompletion();
456     return true;
457     }
458    
459     /**
460 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
461 dl 1.1 */
462 dl 1.3 final void cancelIgnoringExceptions() {
463 dl 1.1 try {
464     cancel(false);
465     } catch(Throwable ignore) {
466     }
467     }
468    
469 dl 1.3 /**
470     * Main implementation of helpJoin
471     */
472     private int busyJoin(ForkJoinWorkerThread w) {
473     int s;
474     ForkJoinTask<?> t;
475     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
476     t.quietlyExec();
477     return (s >= 0)? awaitDone(w, false) : s; // block if no work
478     }
479    
480 dl 1.1 // public methods
481    
482     /**
483     * Arranges to asynchronously execute this task. While it is not
484     * necessarily enforced, it is a usage error to fork a task more
485     * than once unless it has completed and been reinitialized. This
486 dl 1.2 * method may be invoked only from within ForkJoinTask
487 dl 1.13 * computations (as may be determined using method {@link
488     * #inForkJoinPool}). Attempts to invoke in other contexts result
489     * in exceptions or errors possibly including ClassCastException.
490 dl 1.1 */
491     public final void fork() {
492     ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
493     }
494    
495     /**
496     * Returns the result of the computation when it is ready.
497 jsr166 1.8 * This method differs from {@code get} in that abnormal
498 dl 1.1 * completion results in RuntimeExceptions or Errors, not
499     * ExecutionExceptions.
500     *
501     * @return the computed result
502     */
503     public final V join() {
504     ForkJoinWorkerThread w = getWorker();
505     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
506     reportException(awaitDone(w, true));
507     return getRawResult();
508     }
509    
510     /**
511 dl 1.2 * Commences performing this task, awaits its completion if
512     * necessary, and return its result.
513 jsr166 1.10 *
514 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
515 jsr166 1.10 * exception) if the underlying computation did so
516 dl 1.1 * @return the computed result
517     */
518     public final V invoke() {
519     if (status >= 0 && tryExec())
520     return getRawResult();
521     else
522     return join();
523     }
524    
525     /**
526 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
527 dl 1.2 * both of them or an exception is encountered. This method may be
528 dl 1.13 * invoked only from within ForkJoinTask computations (as may be
529     * determined using method {@link #inForkJoinPool}). Attempts to
530 dl 1.2 * invoke in other contexts result in exceptions or errors
531 dl 1.4 * possibly including ClassCastException.
532 jsr166 1.10 *
533 dl 1.2 * @param t1 one task
534     * @param t2 the other task
535     * @throws NullPointerException if t1 or t2 are null
536 jsr166 1.10 * @throws RuntimeException or Error if either task did so
537 dl 1.1 */
538 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
539     t2.fork();
540     t1.invoke();
541     t2.join();
542 dl 1.1 }
543    
544     /**
545 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
546 dl 1.2 * for all of them. If any task encounters an exception, others
547     * may be cancelled. This method may be invoked only from within
548 dl 1.13 * ForkJoinTask computations (as may be determined using method
549     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
550     * result in exceptions or errors possibly including
551     * ClassCastException.
552 dl 1.2 * @param tasks the array of tasks
553 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
554     * @throws RuntimeException or Error if any task did so
555 dl 1.1 */
556 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
557     Throwable ex = null;
558     int last = tasks.length - 1;
559     for (int i = last; i >= 0; --i) {
560     ForkJoinTask<?> t = tasks[i];
561     if (t == null) {
562     if (ex == null)
563     ex = new NullPointerException();
564     }
565     else if (i != 0)
566     t.fork();
567     else {
568     t.quietlyInvoke();
569     if (ex == null)
570     ex = t.getException();
571     }
572     }
573     for (int i = 1; i <= last; ++i) {
574     ForkJoinTask<?> t = tasks[i];
575     if (t != null) {
576     if (ex != null)
577     t.cancel(false);
578     else {
579     t.quietlyJoin();
580     if (ex == null)
581     ex = t.getException();
582     }
583     }
584 dl 1.1 }
585 dl 1.2 if (ex != null)
586     rethrowException(ex);
587 dl 1.1 }
588    
589     /**
590 dl 1.2 * Forks all tasks in the collection, returning when
591 jsr166 1.8 * {@code isDone} holds for all of them. If any task
592 dl 1.2 * encounters an exception, others may be cancelled. This method
593 dl 1.13 * may be invoked only from within ForkJoinTask computations (as
594     * may be determined using method {@link
595     * #inForkJoinPool}). Attempts to invoke in other contexts resul!t
596     * in exceptions or errors possibly including ClassCastException.
597 jsr166 1.10 *
598 dl 1.2 * @param tasks the collection of tasks
599 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
600     * @throws RuntimeException or Error if any task did so
601 dl 1.1 */
602 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
603     if (!(tasks instanceof List)) {
604     invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
605     return;
606     }
607     List<? extends ForkJoinTask<?>> ts =
608     (List<? extends ForkJoinTask<?>>)tasks;
609     Throwable ex = null;
610     int last = ts.size() - 1;
611     for (int i = last; i >= 0; --i) {
612     ForkJoinTask<?> t = ts.get(i);
613     if (t == null) {
614     if (ex == null)
615     ex = new NullPointerException();
616     }
617     else if (i != 0)
618     t.fork();
619     else {
620     t.quietlyInvoke();
621     if (ex == null)
622     ex = t.getException();
623     }
624     }
625     for (int i = 1; i <= last; ++i) {
626     ForkJoinTask<?> t = ts.get(i);
627     if (t != null) {
628     if (ex != null)
629     t.cancel(false);
630     else {
631     t.quietlyJoin();
632     if (ex == null)
633     ex = t.getException();
634     }
635     }
636     }
637     if (ex != null)
638     rethrowException(ex);
639 dl 1.1 }
640    
641     /**
642     * Returns true if the computation performed by this task has
643     * completed (or has been cancelled).
644 jsr166 1.10 *
645 dl 1.1 * @return true if this computation has completed
646     */
647     public final boolean isDone() {
648     return status < 0;
649     }
650    
651     /**
652     * Returns true if this task was cancelled.
653 jsr166 1.10 *
654 dl 1.1 * @return true if this task was cancelled
655     */
656     public final boolean isCancelled() {
657     return (status & COMPLETION_MASK) == CANCELLED;
658     }
659    
660     /**
661     * Asserts that the results of this task's computation will not be
662 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
663 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
664     * will report true, and {@code join} will result in a
665     * {@code CancellationException} being thrown. Otherwise, when
666 dl 1.1 * cancellation races with completion, there are no guarantees
667 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
668     * {@code join} will return normally or via an exception, or
669 dl 1.1 * whether these behaviors will remain consistent upon repeated
670     * invocation.
671     *
672     * <p>This method may be overridden in subclasses, but if so, must
673     * still ensure that these minimal properties hold. In particular,
674     * the cancel method itself must not throw exceptions.
675     *
676     * <p> This method is designed to be invoked by <em>other</em>
677     * tasks. To terminate the current task, you can just return or
678     * throw an unchecked exception from its computation method, or
679 jsr166 1.8 * invoke {@code completeExceptionally}.
680 dl 1.1 *
681     * @param mayInterruptIfRunning this value is ignored in the
682     * default implementation because tasks are not in general
683     * cancelled via interruption.
684     *
685     * @return true if this task is now cancelled
686     */
687     public boolean cancel(boolean mayInterruptIfRunning) {
688     setCompletion(CANCELLED);
689     return (status & COMPLETION_MASK) == CANCELLED;
690     }
691    
692     /**
693 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
694     *
695 dl 1.3 * @return true if this task threw an exception or was cancelled
696     */
697     public final boolean isCompletedAbnormally() {
698     return (status & COMPLETION_MASK) < NORMAL;
699     }
700    
701     /**
702     * Returns the exception thrown by the base computation, or a
703     * CancellationException if cancelled, or null if none or if the
704     * method has not yet completed.
705 jsr166 1.10 *
706 dl 1.3 * @return the exception, or null if none
707     */
708     public final Throwable getException() {
709     int s = status & COMPLETION_MASK;
710     if (s >= NORMAL)
711     return null;
712     if (s == CANCELLED)
713     return new CancellationException();
714     return exceptionMap.get(this);
715     }
716    
717     /**
718 dl 1.1 * Completes this task abnormally, and if not already aborted or
719     * cancelled, causes it to throw the given exception upon
720 jsr166 1.8 * {@code join} and related operations. This method may be used
721 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
722 dl 1.2 * completion of tasks that would not otherwise complete. Its use
723     * in other situations is likely to be wrong. This method is
724 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
725 dl 1.2 * implementation to maintain guarantees.
726     *
727 dl 1.1 * @param ex the exception to throw. If this exception is
728     * not a RuntimeException or Error, the actual exception thrown
729     * will be a RuntimeException with cause ex.
730     */
731     public void completeExceptionally(Throwable ex) {
732     setDoneExceptionally((ex instanceof RuntimeException) ||
733     (ex instanceof Error)? ex :
734     new RuntimeException(ex));
735     }
736    
737     /**
738     * Completes this task, and if not already aborted or cancelled,
739 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
740 dl 1.1 * operations. This method may be used to provide results for
741     * asynchronous tasks, or to provide alternative handling for
742 dl 1.2 * tasks that would not otherwise complete normally. Its use in
743     * other situations is likely to be wrong. This method is
744 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
745 dl 1.2 * implementation to maintain guarantees.
746 dl 1.1 *
747 jsr166 1.10 * @param value the result value for this task
748 dl 1.1 */
749     public void complete(V value) {
750     try {
751     setRawResult(value);
752     } catch(Throwable rex) {
753     setDoneExceptionally(rex);
754     return;
755     }
756     setNormalCompletion();
757     }
758    
759 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
760     ForkJoinWorkerThread w = getWorker();
761     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
762     awaitDone(w, true);
763     return reportFutureResult();
764     }
765    
766     public final V get(long timeout, TimeUnit unit)
767     throws InterruptedException, ExecutionException, TimeoutException {
768     ForkJoinWorkerThread w = getWorker();
769     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
770     awaitDone(w, unit.toNanos(timeout));
771     return reportTimedFutureResult();
772     }
773    
774 dl 1.1 /**
775 dl 1.2 * Possibly executes other tasks until this task is ready, then
776     * returns the result of the computation. This method may be more
777 jsr166 1.8 * efficient than {@code join}, but is only applicable when
778 jsr166 1.9 * there are no potential dependencies between continuation of the
779 dl 1.2 * current task and that of any other task that might be executed
780     * while helping. (This usually holds for pure divide-and-conquer
781     * tasks). This method may be invoked only from within
782 dl 1.13 * ForkJoinTask computations (as may be determined using method
783     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
784     * resul!t in exceptions or errors possibly including
785     * ClassCastException.
786 jsr166 1.10 *
787 dl 1.2 * @return the computed result
788     */
789     public final V helpJoin() {
790     ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
791     if (status < 0 || !w.unpushTask(this) || !tryExec())
792 dl 1.3 reportException(busyJoin(w));
793 dl 1.2 return getRawResult();
794     }
795    
796     /**
797     * Possibly executes other tasks until this task is ready. This
798     * method may be invoked only from within ForkJoinTask
799 dl 1.13 * computations (as may be determined using method {@link
800     * #inForkJoinPool}). Attempts to invoke in other contexts resul!t
801     * in exceptions or errors possibly including ClassCastException.
802 dl 1.2 */
803     public final void quietlyHelpJoin() {
804     if (status >= 0) {
805     ForkJoinWorkerThread w =
806     (ForkJoinWorkerThread)(Thread.currentThread());
807     if (!w.unpushTask(this) || !tryQuietlyInvoke())
808 dl 1.3 busyJoin(w);
809 dl 1.2 }
810     }
811    
812     /**
813     * Joins this task, without returning its result or throwing an
814     * exception. This method may be useful when processing
815     * collections of tasks when some have been cancelled or otherwise
816     * known to have aborted.
817     */
818     public final void quietlyJoin() {
819     if (status >= 0) {
820     ForkJoinWorkerThread w = getWorker();
821     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
822     awaitDone(w, true);
823     }
824     }
825    
826     /**
827     * Commences performing this task and awaits its completion if
828     * necessary, without returning its result or throwing an
829     * exception. This method may be useful when processing
830     * collections of tasks when some have been cancelled or otherwise
831     * known to have aborted.
832     */
833     public final void quietlyInvoke() {
834     if (status >= 0 && !tryQuietlyInvoke())
835     quietlyJoin();
836     }
837    
838     /**
839 dl 1.3 * Possibly executes tasks until the pool hosting the current task
840     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
841     * designs in which many tasks are forked, but none are explicitly
842     * joined, instead executing them until all are processed.
843     */
844     public static void helpQuiesce() {
845     ((ForkJoinWorkerThread)(Thread.currentThread())).
846     helpQuiescePool();
847     }
848    
849     /**
850 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
851 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
852 dl 1.1 * this task, but only if reuse occurs when this task has either
853     * never been forked, or has been forked, then completed and all
854     * outstanding joins of this task have also completed. Effects
855     * under any other usage conditions are not guaranteed, and are
856     * almost surely wrong. This method may be useful when executing
857     * pre-constructed trees of subtasks in loops.
858     */
859     public void reinitialize() {
860     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
861     exceptionMap.remove(this);
862     status = 0;
863     }
864    
865     /**
866 dl 1.2 * Returns the pool hosting the current task execution, or null
867 dl 1.13 * if this task is executing outside of any ForkJoinPool.
868 jsr166 1.10 *
869 dl 1.13 * @return the pool, or null if none.
870 dl 1.1 */
871 dl 1.2 public static ForkJoinPool getPool() {
872     Thread t = Thread.currentThread();
873     return ((t instanceof ForkJoinWorkerThread)?
874     ((ForkJoinWorkerThread)t).pool : null);
875 dl 1.1 }
876    
877     /**
878 dl 1.13 * Returns true if the current thread is executing as a
879     * ForkJoinPool computation.
880     * @return <code>true</code> if the current thread is executing as a
881     * ForkJoinPool computation, or false otherwise
882     */
883     public static boolean inForkJoinPool() {
884     return Thread.currentThread() instanceof ForkJoinWorkerThread;
885     }
886    
887     /**
888 dl 1.2 * Tries to unschedule this task for execution. This method will
889     * typically succeed if this task is the most recently forked task
890     * by the current thread, and has not commenced executing in
891     * another thread. This method may be useful when arranging
892     * alternative local processing of tasks that could have been, but
893     * were not, stolen. This method may be invoked only from within
894 dl 1.13 * ForkJoinTask computations (as may be determined using method
895     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
896     * result in exceptions or errors possibly including
897     * ClassCastException.
898 jsr166 1.10 *
899 dl 1.2 * @return true if unforked
900 dl 1.1 */
901 dl 1.2 public boolean tryUnfork() {
902     return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
903 dl 1.1 }
904    
905     /**
906 dl 1.2 * Returns an estimate of the number of tasks that have been
907     * forked by the current worker thread but not yet executed. This
908     * value may be useful for heuristic decisions about whether to
909     * fork other tasks.
910 jsr166 1.10 *
911 dl 1.2 * @return the number of tasks
912     */
913     public static int getQueuedTaskCount() {
914     return ((ForkJoinWorkerThread)(Thread.currentThread())).
915     getQueueSize();
916     }
917    
918     /**
919 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
920 dl 1.1 * held by the current worker thread than there are other worker
921 dl 1.2 * threads that might steal them. This value may be useful for
922     * heuristic decisions about whether to fork other tasks. In many
923     * usages of ForkJoinTasks, at steady state, each worker should
924     * aim to maintain a small constant surplus (for example, 3) of
925     * tasks, and to process computations locally if this threshold is
926     * exceeded.
927 jsr166 1.10 *
928 dl 1.1 * @return the surplus number of tasks, which may be negative
929     */
930 dl 1.2 public static int getSurplusQueuedTaskCount() {
931 dl 1.1 return ((ForkJoinWorkerThread)(Thread.currentThread()))
932     .getEstimatedSurplusTaskCount();
933     }
934    
935 dl 1.2 // Extension methods
936 dl 1.1
937     /**
938 jsr166 1.8 * Returns the result that would be returned by {@code join},
939 dl 1.2 * even if this task completed abnormally, or null if this task is
940     * not known to have been completed. This method is designed to
941     * aid debugging, as well as to support extensions. Its use in any
942     * other context is discouraged.
943 dl 1.1 *
944 jsr166 1.10 * @return the result, or null if not completed
945 dl 1.1 */
946     public abstract V getRawResult();
947    
948     /**
949     * Forces the given value to be returned as a result. This method
950     * is designed to support extensions, and should not in general be
951     * called otherwise.
952     *
953     * @param value the value
954     */
955     protected abstract void setRawResult(V value);
956    
957     /**
958     * Immediately performs the base action of this task. This method
959     * is designed to support extensions, and should not in general be
960     * called otherwise. The return value controls whether this task
961     * is considered to be done normally. It may return false in
962     * asynchronous actions that require explicit invocations of
963 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
964 dl 1.1 * to indicate abnormal exit.
965 jsr166 1.10 *
966 dl 1.1 * @return true if completed normally
967     * @throws Error or RuntimeException if encountered during computation
968     */
969     protected abstract boolean exec();
970    
971 dl 1.2 /**
972 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
973     * the current thread but not yet executed, if one is
974     * available. There is no guarantee that this task will actually
975     * be polled or executed next. This method is designed primarily
976     * to support extensions, and is unlikely to be useful otherwise.
977     * This method may be invoked only from within ForkJoinTask
978 dl 1.13 * computations (as may be determined using method {@link
979     * #inForkJoinPool}). Attempts to invoke in other contexts result
980     * in exceptions or errors possibly including ClassCastException.
981 dl 1.2 *
982     * @return the next task, or null if none are available
983     */
984     protected static ForkJoinTask<?> peekNextLocalTask() {
985     return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
986     }
987    
988     /**
989 dl 1.6 * Unschedules and returns, without executing, the next task
990     * queued by the current thread but not yet executed. This method
991     * is designed primarily to support extensions, and is unlikely to
992     * be useful otherwise. This method may be invoked only from
993 dl 1.13 * within ForkJoinTask computations (as may be determined using
994     * method {@link #inForkJoinPool}). Attempts to invoke in other
995 dl 1.6 * contexts result in exceptions or errors possibly including
996     * ClassCastException.
997 dl 1.2 *
998     * @return the next task, or null if none are available
999     */
1000     protected static ForkJoinTask<?> pollNextLocalTask() {
1001 dl 1.6 return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
1002 dl 1.2 }
1003 jsr166 1.7
1004 dl 1.2 /**
1005 dl 1.6 * Unschedules and returns, without executing, the next task
1006     * queued by the current thread but not yet executed, if one is
1007     * available, or if not available, a task that was forked by some
1008     * other thread, if available. Availability may be transient, so a
1009 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
1010 dl 1.6 * of the pool this task is operating in. This method is designed
1011     * primarily to support extensions, and is unlikely to be useful
1012     * otherwise. This method may be invoked only from within
1013 dl 1.13 * ForkJoinTask computations (as may be determined using method
1014     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1015 dl 1.6 * result in exceptions or errors possibly including
1016     * ClassCastException.
1017 dl 1.4 *
1018 dl 1.2 * @return a task, or null if none are available
1019     */
1020     protected static ForkJoinTask<?> pollTask() {
1021     return ((ForkJoinWorkerThread)(Thread.currentThread())).
1022 dl 1.4 pollTask();
1023 dl 1.2 }
1024    
1025 dl 1.1 // Serialization support
1026    
1027     private static final long serialVersionUID = -7721805057305804111L;
1028    
1029     /**
1030     * Save the state to a stream.
1031     *
1032     * @serialData the current run status and the exception thrown
1033 jsr166 1.10 * during execution, or null if none
1034 dl 1.1 * @param s the stream
1035     */
1036     private void writeObject(java.io.ObjectOutputStream s)
1037     throws java.io.IOException {
1038     s.defaultWriteObject();
1039     s.writeObject(getException());
1040     }
1041    
1042     /**
1043     * Reconstitute the instance from a stream.
1044 jsr166 1.10 *
1045 dl 1.1 * @param s the stream
1046     */
1047     private void readObject(java.io.ObjectInputStream s)
1048     throws java.io.IOException, ClassNotFoundException {
1049     s.defaultReadObject();
1050 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1051     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1052 dl 1.1 Object ex = s.readObject();
1053     if (ex != null)
1054     setDoneExceptionally((Throwable)ex);
1055     }
1056    
1057     // Temporary Unsafe mechanics for preliminary release
1058 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1059     try {
1060     return Unsafe.getUnsafe();
1061     } catch (SecurityException se) {
1062     try {
1063     return java.security.AccessController.doPrivileged
1064     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1065     public Unsafe run() throws Exception {
1066     return getUnsafePrivileged();
1067     }});
1068     } catch (java.security.PrivilegedActionException e) {
1069     throw e.getCause();
1070     }
1071     }
1072     }
1073    
1074     private static Unsafe getUnsafePrivileged()
1075     throws NoSuchFieldException, IllegalAccessException {
1076     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1077     f.setAccessible(true);
1078     return (Unsafe) f.get(null);
1079     }
1080    
1081     private static long fieldOffset(String fieldName)
1082     throws NoSuchFieldException {
1083 jsr166 1.11 return UNSAFE.objectFieldOffset
1084 jsr166 1.5 (ForkJoinTask.class.getDeclaredField(fieldName));
1085     }
1086 dl 1.1
1087 jsr166 1.11 static final Unsafe UNSAFE;
1088 dl 1.1 static final long statusOffset;
1089    
1090     static {
1091     try {
1092 jsr166 1.11 UNSAFE = getUnsafe();
1093 jsr166 1.5 statusOffset = fieldOffset("status");
1094     } catch (Throwable e) {
1095     throw new RuntimeException("Could not initialize intrinsics", e);
1096     }
1097 dl 1.1 }
1098    
1099     }