ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.23
Committed: Mon Jul 27 20:57:44 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.22: +23 -23 lines
Log Message:
{@code (true|null)}

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.17
9     import java.util.concurrent.*;
10    
11 dl 1.1 import java.io.Serializable;
12 jsr166 1.17 import java.util.Collection;
13     import java.util.Collections;
14     import java.util.List;
15     import java.util.Map;
16     import java.util.WeakHashMap;
17 dl 1.1
18     /**
19 dl 1.2 * Abstract base class for tasks that run within a {@link
20     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
21     * lighter weight than a normal thread. Huge numbers of tasks and
22     * subtasks may be hosted by a small number of actual threads in a
23     * ForkJoinPool, at the price of some usage limitations.
24 dl 1.4 *
25 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
26     * {@link ForkJoinPool}. Once started, it will usually in turn start
27     * other subtasks. As indicated by the name of this class, many
28 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
29     * and {@code join}, or derivatives such as
30     * {@code invokeAll}. However, this class also provides a number
31 dl 1.2 * of other methods that can come into play in advanced usages, as
32     * well as extension mechanics that allow support of new forms of
33     * fork/join processing.
34 dl 1.4 *
35 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
36     * efficiency of ForkJoinTasks stems from a set of restrictions (that
37     * are only partially statically enforceable) reflecting their
38     * intended use as computational tasks calculating pure functions or
39     * operating on purely isolated objects. The primary coordination
40     * mechanisms are {@link #fork}, that arranges asynchronous execution,
41     * and {@link #join}, that doesn't proceed until the task's result has
42 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
43 dl 1.2 * methods or blocks, and should minimize other blocking
44     * synchronization apart from joining other tasks or using
45 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
46     * fork/join scheduling. Tasks should also not perform blocking IO,
47     * and should ideally access variables that are completely independent
48     * of those accessed by other running tasks. Minor breaches of these
49     * restrictions, for example using shared output streams, may be
50     * tolerable in practice, but frequent use may result in poor
51     * performance, and the potential to indefinitely stall if the number
52 dl 1.2 * of threads not waiting for IO or other external synchronization
53     * becomes exhausted. This usage restriction is in part enforced by
54 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
55 dl 1.2 * to be thrown. However, computations may still encounter unchecked
56 dl 1.1 * exceptions, that are rethrown to callers attempting join
57     * them. These exceptions may additionally include
58     * RejectedExecutionExceptions stemming from internal resource
59     * exhaustion such as failure to allocate internal task queues.
60     *
61 dl 1.2 * <p>The primary method for awaiting completion and extracting
62     * results of a task is {@link #join}, but there are several variants:
63     * The {@link Future#get} methods support interruptible and/or timed
64 jsr166 1.8 * waits for completion and report results using {@code Future}
65 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
66     * execute other tasks while awaiting joins, which is sometimes more
67     * efficient but only applies when all subtasks are known to be
68     * strictly tree-structured. Method {@link #invoke} is semantically
69 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
70 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
71     * of these methods do not extract results or report exceptions. These
72     * may be useful when a set of tasks are being executed, and you need
73     * to delay processing of results or exceptions until all complete.
74 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
75 dl 1.2 * performs the most common form of parallel invocation: forking a set
76     * of tasks and joining them all.
77     *
78     * <p> The ForkJoinTask class is not usually directly subclassed.
79     * Instead, you subclass one of the abstract classes that support a
80     * particular style of fork/join processing. Normally, a concrete
81     * ForkJoinTask subclass declares fields comprising its parameters,
82 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
83 dl 1.2 * method that somehow uses the control methods supplied by this base
84 jsr166 1.8 * class. While these methods have {@code public} access (to allow
85 dl 1.2 * instances of different task subclasses to call each others
86     * methods), some of them may only be called from within other
87 dl 1.13 * ForkJoinTasks (as may be determined using method {@link
88     * #inForkJoinPool}). Attempts to invoke them in other contexts
89 jsr166 1.14 * result in exceptions or errors, possibly including
90 dl 1.13 * ClassCastException.
91 dl 1.1 *
92 jsr166 1.8 * <p>Most base support methods are {@code final} because their
93 dl 1.1 * implementations are intrinsically tied to the underlying
94     * lightweight task scheduling framework, and so cannot be overridden.
95     * Developers creating new basic styles of fork/join processing should
96 jsr166 1.8 * minimally implement {@code protected} methods
97     * {@code exec}, {@code setRawResult}, and
98     * {@code getRawResult}, while also introducing an abstract
99 dl 1.2 * computational method that can be implemented in its subclasses,
100 jsr166 1.8 * possibly relying on other {@code protected} methods provided
101 dl 1.2 * by this class.
102 dl 1.1 *
103     * <p>ForkJoinTasks should perform relatively small amounts of
104 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
105 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
106     * than 10000 basic computational steps. If tasks are too big, then
107 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
108 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
109     *
110 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
111 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
112     * in general sensible to serialize tasks only before or after, but
113 dl 1.1 * not during execution. Serialization is not relied on during
114     * execution itself.
115 jsr166 1.12 *
116     * @since 1.7
117     * @author Doug Lea
118 dl 1.1 */
119     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
120 dl 1.2
121 dl 1.1 /**
122 dl 1.2 * Run control status bits packed into a single int to minimize
123     * footprint and to ensure atomicity (via CAS). Status is
124     * initially zero, and takes on nonnegative values until
125 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
126     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
127     * blocking waits by other threads have SIGNAL_MASK bits set --
128     * bit 15 for external (nonFJ) waits, and the rest a count of
129     * waiting FJ threads. (This representation relies on
130     * ForkJoinPool max thread limits). Completion of a stolen task
131     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
132     * though suboptimal for some purposes, we use basic builtin
133     * wait/notify to take advantage of "monitor inflation" in JVMs
134     * that we would otherwise need to emulate to avoid adding further
135     * per-task bookkeeping overhead. Note that bits 16-28 are
136     * currently unused. Also value 0x80000000 is available as spare
137     * completion value.
138     */
139 jsr166 1.9 volatile int status; // accessed directly by pool and workers
140 dl 1.1
141     static final int COMPLETION_MASK = 0xe0000000;
142     static final int NORMAL = 0xe0000000; // == mask
143     static final int CANCELLED = 0xc0000000;
144     static final int EXCEPTIONAL = 0xa0000000;
145     static final int SIGNAL_MASK = 0x0000ffff;
146     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
147     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
148    
149     /**
150     * Table of exceptions thrown by tasks, to enable reporting by
151     * callers. Because exceptions are rare, we don't directly keep
152 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
153 dl 1.1 * that cancellation exceptions don't appear in the table, but are
154     * instead recorded as status values.
155 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
156 dl 1.1 */
157     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
158     Collections.synchronizedMap
159     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
160    
161     // within-package utilities
162    
163     /**
164 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
165 dl 1.1 */
166     static ForkJoinWorkerThread getWorker() {
167     Thread t = Thread.currentThread();
168 jsr166 1.14 return ((t instanceof ForkJoinWorkerThread) ?
169     (ForkJoinWorkerThread) t : null);
170 dl 1.1 }
171    
172     final boolean casStatus(int cmp, int val) {
173 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
174 dl 1.1 }
175    
176     /**
177     * Workaround for not being able to rethrow unchecked exceptions.
178     */
179     static void rethrowException(Throwable ex) {
180     if (ex != null)
181 jsr166 1.11 UNSAFE.throwException(ex);
182 dl 1.1 }
183    
184     // Setting completion status
185    
186     /**
187 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
188     *
189 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
190     */
191     final void setCompletion(int completion) {
192 dl 1.2 ForkJoinPool pool = getPool();
193 dl 1.1 if (pool != null) {
194     int s; // Clear signal bits while setting completion status
195 jsr166 1.14 do {} while ((s = status) >= 0 && !casStatus(s, completion));
196 dl 1.1
197     if ((s & SIGNAL_MASK) != 0) {
198     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
199     pool.updateRunningCount(s);
200 jsr166 1.14 synchronized (this) { notifyAll(); }
201 dl 1.1 }
202     }
203     else
204     externallySetCompletion(completion);
205     }
206    
207     /**
208     * Version of setCompletion for non-FJ threads. Leaves signal
209     * bits for unblocked threads to adjust, and always notifies.
210     */
211     private void externallySetCompletion(int completion) {
212     int s;
213 jsr166 1.14 do {} while ((s = status) >= 0 &&
214     !casStatus(s, (s & SIGNAL_MASK) | completion));
215     synchronized (this) { notifyAll(); }
216 dl 1.1 }
217    
218     /**
219 jsr166 1.14 * Sets status to indicate normal completion.
220 dl 1.1 */
221     final void setNormalCompletion() {
222     // Try typical fast case -- single CAS, no signal, not already done.
223     // Manually expand casStatus to improve chances of inlining it
224 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
225 dl 1.1 setCompletion(NORMAL);
226     }
227    
228     // internal waiting and notification
229    
230     /**
231 jsr166 1.14 * Performs the actual monitor wait for awaitDone.
232 dl 1.1 */
233     private void doAwaitDone() {
234     // Minimize lock bias and in/de-flation effects by maximizing
235     // chances of waiting inside sync
236     try {
237     while (status >= 0)
238 jsr166 1.14 synchronized (this) { if (status >= 0) wait(); }
239 dl 1.1 } catch (InterruptedException ie) {
240     onInterruptedWait();
241     }
242     }
243    
244     /**
245 jsr166 1.14 * Performs the actual timed monitor wait for awaitDone.
246 dl 1.1 */
247     private void doAwaitDone(long startTime, long nanos) {
248 jsr166 1.14 synchronized (this) {
249 dl 1.1 try {
250     while (status >= 0) {
251     long nt = nanos - System.nanoTime() - startTime;
252     if (nt <= 0)
253     break;
254 jsr166 1.14 wait(nt / 1000000, (int) (nt % 1000000));
255 dl 1.1 }
256     } catch (InterruptedException ie) {
257     onInterruptedWait();
258     }
259     }
260     }
261    
262     // Awaiting completion
263    
264     /**
265     * Sets status to indicate there is joiner, then waits for join,
266     * surrounded with pool notifications.
267 jsr166 1.10 *
268 dl 1.1 * @return status upon exit
269     */
270 jsr166 1.14 private int awaitDone(ForkJoinWorkerThread w,
271     boolean maintainParallelism) {
272     ForkJoinPool pool = (w == null) ? null : w.pool;
273 dl 1.1 int s;
274     while ((s = status) >= 0) {
275 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
276 dl 1.1 if (pool == null || !pool.preJoin(this, maintainParallelism))
277     doAwaitDone();
278     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
279     adjustPoolCountsOnUnblock(pool);
280     break;
281     }
282     }
283     return s;
284     }
285    
286     /**
287     * Timed version of awaitDone
288 jsr166 1.14 *
289 dl 1.1 * @return status upon exit
290     */
291 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
292 jsr166 1.14 ForkJoinPool pool = (w == null) ? null : w.pool;
293 dl 1.1 int s;
294     while ((s = status) >= 0) {
295 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
296 dl 1.1 long startTime = System.nanoTime();
297     if (pool == null || !pool.preJoin(this, false))
298     doAwaitDone(startTime, nanos);
299     if ((s = status) >= 0) {
300     adjustPoolCountsOnCancelledWait(pool);
301     s = status;
302     }
303     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
304     adjustPoolCountsOnUnblock(pool);
305     break;
306     }
307     }
308     return s;
309     }
310    
311     /**
312 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
313 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
314     */
315     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
316     int s;
317 jsr166 1.14 do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
318 dl 1.1 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
319     pool.updateRunningCount(s);
320     }
321    
322     /**
323 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
324 dl 1.1 */
325     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
326     if (pool != null) {
327     int s;
328     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
329     if (casStatus(s, s - 1)) {
330     pool.updateRunningCount(1);
331     break;
332     }
333     }
334     }
335     }
336    
337 dl 1.2 /**
338 jsr166 1.10 * Handles interruptions during waits.
339 dl 1.2 */
340 dl 1.1 private void onInterruptedWait() {
341 dl 1.2 ForkJoinWorkerThread w = getWorker();
342     if (w == null)
343     Thread.currentThread().interrupt(); // re-interrupt
344     else if (w.isTerminating())
345 dl 1.3 cancelIgnoringExceptions();
346 dl 1.2 // else if FJworker, ignore interrupt
347 dl 1.1 }
348    
349     // Recording and reporting exceptions
350    
351     private void setDoneExceptionally(Throwable rex) {
352     exceptionMap.put(this, rex);
353     setCompletion(EXCEPTIONAL);
354     }
355    
356     /**
357 jsr166 1.10 * Throws the exception associated with status s.
358     *
359 dl 1.1 * @throws the exception
360     */
361     private void reportException(int s) {
362     if ((s &= COMPLETION_MASK) < NORMAL) {
363     if (s == CANCELLED)
364     throw new CancellationException();
365     else
366     rethrowException(exceptionMap.get(this));
367     }
368     }
369    
370     /**
371 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
372 jsr166 1.14 * Only call when {@code isDone} known to be true.
373 dl 1.1 */
374     private V reportFutureResult()
375     throws ExecutionException, InterruptedException {
376     int s = status & COMPLETION_MASK;
377     if (s < NORMAL) {
378     Throwable ex;
379     if (s == CANCELLED)
380     throw new CancellationException();
381     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
382     throw new ExecutionException(ex);
383     if (Thread.interrupted())
384     throw new InterruptedException();
385     }
386     return getRawResult();
387     }
388    
389     /**
390     * Returns result or throws exception using j.u.c.Future conventions
391 jsr166 1.10 * with timeouts.
392 dl 1.1 */
393     private V reportTimedFutureResult()
394     throws InterruptedException, ExecutionException, TimeoutException {
395     Throwable ex;
396     int s = status & COMPLETION_MASK;
397     if (s == NORMAL)
398     return getRawResult();
399     if (s == CANCELLED)
400     throw new CancellationException();
401     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
402     throw new ExecutionException(ex);
403     if (Thread.interrupted())
404     throw new InterruptedException();
405     throw new TimeoutException();
406     }
407    
408     // internal execution methods
409    
410     /**
411     * Calls exec, recording completion, and rethrowing exception if
412 jsr166 1.10 * encountered. Caller should normally check status before calling.
413     *
414 dl 1.1 * @return true if completed normally
415     */
416     private boolean tryExec() {
417     try { // try block must contain only call to exec
418     if (!exec())
419     return false;
420     } catch (Throwable rex) {
421     setDoneExceptionally(rex);
422     rethrowException(rex);
423     return false; // not reached
424     }
425     setNormalCompletion();
426     return true;
427     }
428    
429     /**
430     * Main execution method used by worker threads. Invokes
431 jsr166 1.10 * base computation unless already complete.
432 dl 1.1 */
433     final void quietlyExec() {
434     if (status >= 0) {
435     try {
436     if (!exec())
437     return;
438 jsr166 1.14 } catch (Throwable rex) {
439 dl 1.1 setDoneExceptionally(rex);
440     return;
441     }
442     setNormalCompletion();
443     }
444     }
445    
446     /**
447 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
448     * Caller should normally check status before calling.
449     *
450 dl 1.1 * @return true if completed normally
451     */
452     private boolean tryQuietlyInvoke() {
453     try {
454     if (!exec())
455     return false;
456     } catch (Throwable rex) {
457     setDoneExceptionally(rex);
458     return false;
459     }
460     setNormalCompletion();
461     return true;
462     }
463    
464     /**
465 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
466 dl 1.1 */
467 dl 1.3 final void cancelIgnoringExceptions() {
468 dl 1.1 try {
469     cancel(false);
470 jsr166 1.14 } catch (Throwable ignore) {
471 dl 1.1 }
472     }
473    
474 dl 1.3 /**
475     * Main implementation of helpJoin
476     */
477     private int busyJoin(ForkJoinWorkerThread w) {
478     int s;
479     ForkJoinTask<?> t;
480     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
481     t.quietlyExec();
482 jsr166 1.14 return (s >= 0) ? awaitDone(w, false) : s; // block if no work
483 dl 1.3 }
484    
485 dl 1.1 // public methods
486    
487     /**
488     * Arranges to asynchronously execute this task. While it is not
489     * necessarily enforced, it is a usage error to fork a task more
490     * than once unless it has completed and been reinitialized. This
491 dl 1.2 * method may be invoked only from within ForkJoinTask
492 dl 1.13 * computations (as may be determined using method {@link
493     * #inForkJoinPool}). Attempts to invoke in other contexts result
494 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
495 dl 1.18 *
496 jsr166 1.21 * @return {@code this}, to simplify usage.
497 dl 1.1 */
498 dl 1.18 public final ForkJoinTask<V> fork() {
499 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
500     .pushTask(this);
501 dl 1.18 return this;
502 dl 1.1 }
503    
504     /**
505     * Returns the result of the computation when it is ready.
506 jsr166 1.8 * This method differs from {@code get} in that abnormal
507 dl 1.1 * completion results in RuntimeExceptions or Errors, not
508     * ExecutionExceptions.
509     *
510     * @return the computed result
511     */
512     public final V join() {
513     ForkJoinWorkerThread w = getWorker();
514     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
515     reportException(awaitDone(w, true));
516     return getRawResult();
517     }
518    
519     /**
520 dl 1.2 * Commences performing this task, awaits its completion if
521     * necessary, and return its result.
522 jsr166 1.10 *
523 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
524 jsr166 1.10 * exception) if the underlying computation did so
525 dl 1.1 * @return the computed result
526     */
527     public final V invoke() {
528     if (status >= 0 && tryExec())
529     return getRawResult();
530     else
531     return join();
532     }
533    
534     /**
535 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
536 dl 1.2 * both of them or an exception is encountered. This method may be
537 dl 1.13 * invoked only from within ForkJoinTask computations (as may be
538     * determined using method {@link #inForkJoinPool}). Attempts to
539 jsr166 1.14 * invoke in other contexts result in exceptions or errors,
540 dl 1.4 * possibly including ClassCastException.
541 jsr166 1.10 *
542 dl 1.2 * @param t1 one task
543     * @param t2 the other task
544     * @throws NullPointerException if t1 or t2 are null
545 jsr166 1.10 * @throws RuntimeException or Error if either task did so
546 dl 1.1 */
547 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
548     t2.fork();
549     t1.invoke();
550     t2.join();
551 dl 1.1 }
552    
553     /**
554 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
555 dl 1.2 * for all of them. If any task encounters an exception, others
556     * may be cancelled. This method may be invoked only from within
557 dl 1.13 * ForkJoinTask computations (as may be determined using method
558     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
559 jsr166 1.14 * result in exceptions or errors, possibly including
560 dl 1.13 * ClassCastException.
561 jsr166 1.14 *
562 dl 1.2 * @param tasks the array of tasks
563 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
564     * @throws RuntimeException or Error if any task did so
565 dl 1.1 */
566 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
567     Throwable ex = null;
568     int last = tasks.length - 1;
569     for (int i = last; i >= 0; --i) {
570     ForkJoinTask<?> t = tasks[i];
571     if (t == null) {
572     if (ex == null)
573     ex = new NullPointerException();
574     }
575     else if (i != 0)
576     t.fork();
577     else {
578     t.quietlyInvoke();
579     if (ex == null)
580     ex = t.getException();
581     }
582     }
583     for (int i = 1; i <= last; ++i) {
584     ForkJoinTask<?> t = tasks[i];
585     if (t != null) {
586     if (ex != null)
587     t.cancel(false);
588     else {
589     t.quietlyJoin();
590     if (ex == null)
591     ex = t.getException();
592     }
593     }
594 dl 1.1 }
595 dl 1.2 if (ex != null)
596     rethrowException(ex);
597 dl 1.1 }
598    
599     /**
600 dl 1.2 * Forks all tasks in the collection, returning when
601 jsr166 1.8 * {@code isDone} holds for all of them. If any task
602 dl 1.2 * encounters an exception, others may be cancelled. This method
603 dl 1.13 * may be invoked only from within ForkJoinTask computations (as
604     * may be determined using method {@link
605 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
606     * in exceptions or errors, possibly including ClassCastException.
607 jsr166 1.10 *
608 dl 1.2 * @param tasks the collection of tasks
609 dl 1.19 * @return the tasks argument, to simplify usage
610 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
611     * @throws RuntimeException or Error if any task did so
612 dl 1.1 */
613 dl 1.19 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
614 jsr166 1.15 if (!(tasks instanceof List<?>)) {
615 jsr166 1.14 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
616 dl 1.19 return tasks;
617 dl 1.2 }
618 jsr166 1.15 @SuppressWarnings("unchecked")
619 dl 1.2 List<? extends ForkJoinTask<?>> ts =
620 jsr166 1.14 (List<? extends ForkJoinTask<?>>) tasks;
621 dl 1.2 Throwable ex = null;
622     int last = ts.size() - 1;
623     for (int i = last; i >= 0; --i) {
624     ForkJoinTask<?> t = ts.get(i);
625     if (t == null) {
626     if (ex == null)
627     ex = new NullPointerException();
628     }
629     else if (i != 0)
630     t.fork();
631     else {
632     t.quietlyInvoke();
633     if (ex == null)
634     ex = t.getException();
635     }
636     }
637     for (int i = 1; i <= last; ++i) {
638     ForkJoinTask<?> t = ts.get(i);
639     if (t != null) {
640     if (ex != null)
641     t.cancel(false);
642     else {
643     t.quietlyJoin();
644     if (ex == null)
645     ex = t.getException();
646     }
647     }
648     }
649     if (ex != null)
650     rethrowException(ex);
651 dl 1.19 return tasks;
652 dl 1.1 }
653    
654     /**
655 jsr166 1.23 * Returns {@code true} if the computation performed by this task
656     * has completed (or has been cancelled).
657 jsr166 1.10 *
658 jsr166 1.23 * @return {@code true} if this computation has completed
659 dl 1.1 */
660     public final boolean isDone() {
661     return status < 0;
662     }
663    
664     /**
665 jsr166 1.23 * Returns {@code true} if this task was cancelled.
666 jsr166 1.10 *
667 jsr166 1.23 * @return {@code true} if this task was cancelled
668 dl 1.1 */
669     public final boolean isCancelled() {
670     return (status & COMPLETION_MASK) == CANCELLED;
671     }
672    
673     /**
674     * Asserts that the results of this task's computation will not be
675 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
676 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
677     * will report true, and {@code join} will result in a
678     * {@code CancellationException} being thrown. Otherwise, when
679 dl 1.1 * cancellation races with completion, there are no guarantees
680 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
681     * {@code join} will return normally or via an exception, or
682 dl 1.1 * whether these behaviors will remain consistent upon repeated
683     * invocation.
684     *
685     * <p>This method may be overridden in subclasses, but if so, must
686     * still ensure that these minimal properties hold. In particular,
687     * the cancel method itself must not throw exceptions.
688     *
689     * <p> This method is designed to be invoked by <em>other</em>
690     * tasks. To terminate the current task, you can just return or
691     * throw an unchecked exception from its computation method, or
692 jsr166 1.8 * invoke {@code completeExceptionally}.
693 dl 1.1 *
694     * @param mayInterruptIfRunning this value is ignored in the
695     * default implementation because tasks are not in general
696 jsr166 1.14 * cancelled via interruption
697 dl 1.1 *
698 jsr166 1.23 * @return {@code true} if this task is now cancelled
699 dl 1.1 */
700     public boolean cancel(boolean mayInterruptIfRunning) {
701     setCompletion(CANCELLED);
702     return (status & COMPLETION_MASK) == CANCELLED;
703     }
704    
705     /**
706 jsr166 1.23 * Returns {@code true} if this task threw an exception or was cancelled.
707 jsr166 1.10 *
708 jsr166 1.23 * @return {@code true} if this task threw an exception or was cancelled
709 dl 1.3 */
710     public final boolean isCompletedAbnormally() {
711     return (status & COMPLETION_MASK) < NORMAL;
712     }
713    
714     /**
715     * Returns the exception thrown by the base computation, or a
716     * CancellationException if cancelled, or null if none or if the
717     * method has not yet completed.
718 jsr166 1.10 *
719 jsr166 1.23 * @return the exception, or {@code null} if none
720 dl 1.3 */
721     public final Throwable getException() {
722     int s = status & COMPLETION_MASK;
723     if (s >= NORMAL)
724     return null;
725     if (s == CANCELLED)
726     return new CancellationException();
727     return exceptionMap.get(this);
728     }
729    
730     /**
731 dl 1.1 * Completes this task abnormally, and if not already aborted or
732     * cancelled, causes it to throw the given exception upon
733 jsr166 1.8 * {@code join} and related operations. This method may be used
734 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
735 dl 1.2 * completion of tasks that would not otherwise complete. Its use
736     * in other situations is likely to be wrong. This method is
737 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
738 dl 1.2 * implementation to maintain guarantees.
739     *
740 dl 1.1 * @param ex the exception to throw. If this exception is
741     * not a RuntimeException or Error, the actual exception thrown
742     * will be a RuntimeException with cause ex.
743     */
744     public void completeExceptionally(Throwable ex) {
745     setDoneExceptionally((ex instanceof RuntimeException) ||
746 jsr166 1.14 (ex instanceof Error) ? ex :
747 dl 1.1 new RuntimeException(ex));
748     }
749    
750     /**
751     * Completes this task, and if not already aborted or cancelled,
752 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
753 dl 1.1 * operations. This method may be used to provide results for
754     * asynchronous tasks, or to provide alternative handling for
755 dl 1.2 * tasks that would not otherwise complete normally. Its use in
756     * other situations is likely to be wrong. This method is
757 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
758 dl 1.2 * implementation to maintain guarantees.
759 dl 1.1 *
760 jsr166 1.10 * @param value the result value for this task
761 dl 1.1 */
762     public void complete(V value) {
763     try {
764     setRawResult(value);
765 jsr166 1.14 } catch (Throwable rex) {
766 dl 1.1 setDoneExceptionally(rex);
767     return;
768     }
769     setNormalCompletion();
770     }
771    
772 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
773     ForkJoinWorkerThread w = getWorker();
774     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
775     awaitDone(w, true);
776     return reportFutureResult();
777     }
778    
779     public final V get(long timeout, TimeUnit unit)
780     throws InterruptedException, ExecutionException, TimeoutException {
781     ForkJoinWorkerThread w = getWorker();
782     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
783     awaitDone(w, unit.toNanos(timeout));
784     return reportTimedFutureResult();
785     }
786    
787 dl 1.1 /**
788 dl 1.2 * Possibly executes other tasks until this task is ready, then
789     * returns the result of the computation. This method may be more
790 jsr166 1.8 * efficient than {@code join}, but is only applicable when
791 jsr166 1.9 * there are no potential dependencies between continuation of the
792 dl 1.2 * current task and that of any other task that might be executed
793     * while helping. (This usually holds for pure divide-and-conquer
794     * tasks). This method may be invoked only from within
795 dl 1.13 * ForkJoinTask computations (as may be determined using method
796     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
797 jsr166 1.14 * result in exceptions or errors, possibly including
798 dl 1.13 * ClassCastException.
799 jsr166 1.10 *
800 dl 1.2 * @return the computed result
801     */
802     public final V helpJoin() {
803 jsr166 1.14 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
804 dl 1.2 if (status < 0 || !w.unpushTask(this) || !tryExec())
805 dl 1.3 reportException(busyJoin(w));
806 dl 1.2 return getRawResult();
807     }
808    
809     /**
810     * Possibly executes other tasks until this task is ready. This
811     * method may be invoked only from within ForkJoinTask
812 dl 1.13 * computations (as may be determined using method {@link
813 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
814     * in exceptions or errors, possibly including ClassCastException.
815 dl 1.2 */
816     public final void quietlyHelpJoin() {
817     if (status >= 0) {
818     ForkJoinWorkerThread w =
819 jsr166 1.14 (ForkJoinWorkerThread) Thread.currentThread();
820 dl 1.2 if (!w.unpushTask(this) || !tryQuietlyInvoke())
821 dl 1.3 busyJoin(w);
822 dl 1.2 }
823     }
824    
825     /**
826     * Joins this task, without returning its result or throwing an
827     * exception. This method may be useful when processing
828     * collections of tasks when some have been cancelled or otherwise
829     * known to have aborted.
830     */
831     public final void quietlyJoin() {
832     if (status >= 0) {
833     ForkJoinWorkerThread w = getWorker();
834     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
835     awaitDone(w, true);
836     }
837     }
838    
839     /**
840     * Commences performing this task and awaits its completion if
841     * necessary, without returning its result or throwing an
842     * exception. This method may be useful when processing
843     * collections of tasks when some have been cancelled or otherwise
844     * known to have aborted.
845     */
846     public final void quietlyInvoke() {
847     if (status >= 0 && !tryQuietlyInvoke())
848     quietlyJoin();
849     }
850    
851     /**
852 dl 1.3 * Possibly executes tasks until the pool hosting the current task
853     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
854     * designs in which many tasks are forked, but none are explicitly
855     * joined, instead executing them until all are processed.
856     */
857     public static void helpQuiesce() {
858 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
859     .helpQuiescePool();
860 dl 1.3 }
861    
862     /**
863 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
864 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
865 dl 1.1 * this task, but only if reuse occurs when this task has either
866     * never been forked, or has been forked, then completed and all
867     * outstanding joins of this task have also completed. Effects
868     * under any other usage conditions are not guaranteed, and are
869     * almost surely wrong. This method may be useful when executing
870     * pre-constructed trees of subtasks in loops.
871     */
872     public void reinitialize() {
873     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
874     exceptionMap.remove(this);
875     status = 0;
876     }
877    
878     /**
879 dl 1.2 * Returns the pool hosting the current task execution, or null
880 dl 1.13 * if this task is executing outside of any ForkJoinPool.
881 jsr166 1.10 *
882 jsr166 1.23 * @return the pool, or {@code null} if none
883 dl 1.1 */
884 dl 1.2 public static ForkJoinPool getPool() {
885     Thread t = Thread.currentThread();
886 jsr166 1.15 return (t instanceof ForkJoinWorkerThread) ?
887     ((ForkJoinWorkerThread) t).pool : null;
888 dl 1.1 }
889    
890     /**
891 jsr166 1.14 * Returns {@code true} if the current thread is executing as a
892 dl 1.13 * ForkJoinPool computation.
893 jsr166 1.14 *
894     * @return {@code true} if the current thread is executing as a
895 dl 1.13 * ForkJoinPool computation, or false otherwise
896     */
897     public static boolean inForkJoinPool() {
898     return Thread.currentThread() instanceof ForkJoinWorkerThread;
899     }
900    
901     /**
902 dl 1.2 * Tries to unschedule this task for execution. This method will
903     * typically succeed if this task is the most recently forked task
904     * by the current thread, and has not commenced executing in
905     * another thread. This method may be useful when arranging
906     * alternative local processing of tasks that could have been, but
907     * were not, stolen. This method may be invoked only from within
908 dl 1.13 * ForkJoinTask computations (as may be determined using method
909     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
910 jsr166 1.14 * result in exceptions or errors, possibly including
911 dl 1.13 * ClassCastException.
912 jsr166 1.10 *
913 jsr166 1.23 * @return {@code true} if unforked
914 dl 1.1 */
915 dl 1.2 public boolean tryUnfork() {
916 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
917     .unpushTask(this);
918 dl 1.1 }
919    
920     /**
921 dl 1.2 * Returns an estimate of the number of tasks that have been
922     * forked by the current worker thread but not yet executed. This
923     * value may be useful for heuristic decisions about whether to
924     * fork other tasks.
925 jsr166 1.10 *
926 dl 1.2 * @return the number of tasks
927     */
928     public static int getQueuedTaskCount() {
929 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
930     .getQueueSize();
931 dl 1.2 }
932    
933     /**
934 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
935 dl 1.1 * held by the current worker thread than there are other worker
936 dl 1.2 * threads that might steal them. This value may be useful for
937     * heuristic decisions about whether to fork other tasks. In many
938     * usages of ForkJoinTasks, at steady state, each worker should
939     * aim to maintain a small constant surplus (for example, 3) of
940     * tasks, and to process computations locally if this threshold is
941     * exceeded.
942 jsr166 1.10 *
943 dl 1.1 * @return the surplus number of tasks, which may be negative
944     */
945 dl 1.2 public static int getSurplusQueuedTaskCount() {
946 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
947 dl 1.1 .getEstimatedSurplusTaskCount();
948     }
949    
950 dl 1.2 // Extension methods
951 dl 1.1
952     /**
953 jsr166 1.23 * Returns the result that would be returned by {@link #join}, even
954     * if this task completed abnormally, or {@code null} if this task
955     * is not known to have been completed. This method is designed
956     * to aid debugging, as well as to support extensions. Its use in
957     * any other context is discouraged.
958 dl 1.1 *
959 jsr166 1.23 * @return the result, or {@code null} if not completed
960 dl 1.1 */
961     public abstract V getRawResult();
962    
963     /**
964     * Forces the given value to be returned as a result. This method
965     * is designed to support extensions, and should not in general be
966     * called otherwise.
967     *
968     * @param value the value
969     */
970     protected abstract void setRawResult(V value);
971    
972     /**
973     * Immediately performs the base action of this task. This method
974     * is designed to support extensions, and should not in general be
975     * called otherwise. The return value controls whether this task
976     * is considered to be done normally. It may return false in
977     * asynchronous actions that require explicit invocations of
978 jsr166 1.23 * {@link #complete} to become joinable. It may throw exceptions
979 dl 1.1 * to indicate abnormal exit.
980 jsr166 1.10 *
981 jsr166 1.23 * @return {@code true} if completed normally
982 dl 1.1 * @throws Error or RuntimeException if encountered during computation
983     */
984     protected abstract boolean exec();
985    
986 dl 1.2 /**
987 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
988     * the current thread but not yet executed, if one is
989     * available. There is no guarantee that this task will actually
990     * be polled or executed next. This method is designed primarily
991     * to support extensions, and is unlikely to be useful otherwise.
992     * This method may be invoked only from within ForkJoinTask
993 dl 1.13 * computations (as may be determined using method {@link
994     * #inForkJoinPool}). Attempts to invoke in other contexts result
995 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
996 dl 1.2 *
997 jsr166 1.23 * @return the next task, or {@code null} if none are available
998 dl 1.2 */
999     protected static ForkJoinTask<?> peekNextLocalTask() {
1000 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1001     .peekTask();
1002 dl 1.2 }
1003    
1004     /**
1005 dl 1.6 * Unschedules and returns, without executing, the next task
1006     * queued by the current thread but not yet executed. This method
1007     * is designed primarily to support extensions, and is unlikely to
1008     * be useful otherwise. This method may be invoked only from
1009 dl 1.13 * within ForkJoinTask computations (as may be determined using
1010     * method {@link #inForkJoinPool}). Attempts to invoke in other
1011 jsr166 1.14 * contexts result in exceptions or errors, possibly including
1012 dl 1.6 * ClassCastException.
1013 dl 1.2 *
1014 jsr166 1.23 * @return the next task, or {@code null} if none are available
1015 dl 1.2 */
1016     protected static ForkJoinTask<?> pollNextLocalTask() {
1017 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1018     .pollLocalTask();
1019 dl 1.2 }
1020 jsr166 1.7
1021 dl 1.2 /**
1022 dl 1.6 * Unschedules and returns, without executing, the next task
1023     * queued by the current thread but not yet executed, if one is
1024     * available, or if not available, a task that was forked by some
1025     * other thread, if available. Availability may be transient, so a
1026 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
1027 dl 1.6 * of the pool this task is operating in. This method is designed
1028     * primarily to support extensions, and is unlikely to be useful
1029     * otherwise. This method may be invoked only from within
1030 dl 1.13 * ForkJoinTask computations (as may be determined using method
1031     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1032 jsr166 1.14 * result in exceptions or errors, possibly including
1033 dl 1.6 * ClassCastException.
1034 dl 1.4 *
1035 jsr166 1.23 * @return a task, or {@code null} if none are available
1036 dl 1.2 */
1037     protected static ForkJoinTask<?> pollTask() {
1038 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1039     .pollTask();
1040 dl 1.2 }
1041    
1042 dl 1.18 // adaptors
1043    
1044     /**
1045 jsr166 1.21 * Returns a new ForkJoinTask that performs the {@code run}
1046 dl 1.18 * method of the given Runnable as its action, and returns a null
1047 jsr166 1.21 * result upon {@code join}.
1048 dl 1.18 *
1049     * @param runnable the runnable action
1050     * @return the task
1051     */
1052     public static ForkJoinTask<Void> adapt(Runnable runnable) {
1053     return new ForkJoinPool.AdaptedRunnable<Void>(runnable, null);
1054     }
1055    
1056     /**
1057 jsr166 1.21 * Returns a new ForkJoinTask that performs the {@code run}
1058 dl 1.18 * method of the given Runnable as its action, and returns the
1059 jsr166 1.21 * given result upon {@code join}.
1060 dl 1.18 *
1061     * @param runnable the runnable action
1062     * @param result the result upon completion
1063     * @return the task
1064     */
1065     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1066     return new ForkJoinPool.AdaptedRunnable<T>(runnable, result);
1067     }
1068    
1069     /**
1070 jsr166 1.21 * Returns a new ForkJoinTask that performs the {@code call}
1071 dl 1.18 * method of the given Callable as its action, and returns its
1072 jsr166 1.21 * result upon {@code join}, translating any checked
1073     * exceptions encountered into {@code RuntimeException}.
1074 dl 1.18 *
1075     * @param callable the callable action
1076     * @return the task
1077     */
1078     public static <T> ForkJoinTask<T> adapt(Callable<T> callable) {
1079     return new ForkJoinPool.AdaptedCallable<T>(callable);
1080     }
1081    
1082 dl 1.1 // Serialization support
1083    
1084     private static final long serialVersionUID = -7721805057305804111L;
1085    
1086     /**
1087     * Save the state to a stream.
1088     *
1089     * @serialData the current run status and the exception thrown
1090 jsr166 1.23 * during execution, or {@code null} if none
1091 dl 1.1 * @param s the stream
1092     */
1093     private void writeObject(java.io.ObjectOutputStream s)
1094     throws java.io.IOException {
1095     s.defaultWriteObject();
1096     s.writeObject(getException());
1097     }
1098    
1099     /**
1100     * Reconstitute the instance from a stream.
1101 jsr166 1.10 *
1102 dl 1.1 * @param s the stream
1103     */
1104     private void readObject(java.io.ObjectInputStream s)
1105     throws java.io.IOException, ClassNotFoundException {
1106     s.defaultReadObject();
1107 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1108     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1109 dl 1.1 Object ex = s.readObject();
1110     if (ex != null)
1111 jsr166 1.14 setDoneExceptionally((Throwable) ex);
1112 dl 1.1 }
1113    
1114 jsr166 1.22 // Unsafe mechanics
1115    
1116     private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1117     private static final long statusOffset =
1118     objectFieldOffset("status", ForkJoinTask.class);
1119    
1120     private static long objectFieldOffset(String field, Class<?> klazz) {
1121     try {
1122     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1123     } catch (NoSuchFieldException e) {
1124     // Convert Exception to corresponding Error
1125     NoSuchFieldError error = new NoSuchFieldError(field);
1126     error.initCause(e);
1127     throw error;
1128     }
1129     }
1130    
1131     /**
1132     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
1133     * Replace with a simple call to Unsafe.getUnsafe when integrating
1134     * into a jdk.
1135     *
1136     * @return a sun.misc.Unsafe
1137     */
1138 jsr166 1.16 private static sun.misc.Unsafe getUnsafe() {
1139 jsr166 1.5 try {
1140 jsr166 1.16 return sun.misc.Unsafe.getUnsafe();
1141 jsr166 1.5 } catch (SecurityException se) {
1142     try {
1143     return java.security.AccessController.doPrivileged
1144 jsr166 1.22 (new java.security
1145     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1146 jsr166 1.16 public sun.misc.Unsafe run() throws Exception {
1147 jsr166 1.22 java.lang.reflect.Field f = sun.misc
1148     .Unsafe.class.getDeclaredField("theUnsafe");
1149     f.setAccessible(true);
1150     return (sun.misc.Unsafe) f.get(null);
1151 jsr166 1.5 }});
1152     } catch (java.security.PrivilegedActionException e) {
1153 jsr166 1.16 throw new RuntimeException("Could not initialize intrinsics",
1154     e.getCause());
1155 jsr166 1.5 }
1156     }
1157     }
1158 dl 1.1 }