ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.18
Committed: Sat Jul 25 15:50:57 2009 UTC (14 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.17: +44 -1 lines
Log Message:
Export adaptors; change some signatures to simplify usage

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.17
9     import java.util.concurrent.*;
10    
11 dl 1.1 import java.io.Serializable;
12 jsr166 1.17 import java.util.Collection;
13     import java.util.Collections;
14     import java.util.List;
15     import java.util.Map;
16     import java.util.WeakHashMap;
17 dl 1.1
18     /**
19 dl 1.2 * Abstract base class for tasks that run within a {@link
20     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
21     * lighter weight than a normal thread. Huge numbers of tasks and
22     * subtasks may be hosted by a small number of actual threads in a
23     * ForkJoinPool, at the price of some usage limitations.
24 dl 1.4 *
25 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
26     * {@link ForkJoinPool}. Once started, it will usually in turn start
27     * other subtasks. As indicated by the name of this class, many
28 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
29     * and {@code join}, or derivatives such as
30     * {@code invokeAll}. However, this class also provides a number
31 dl 1.2 * of other methods that can come into play in advanced usages, as
32     * well as extension mechanics that allow support of new forms of
33     * fork/join processing.
34 dl 1.4 *
35 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
36     * efficiency of ForkJoinTasks stems from a set of restrictions (that
37     * are only partially statically enforceable) reflecting their
38     * intended use as computational tasks calculating pure functions or
39     * operating on purely isolated objects. The primary coordination
40     * mechanisms are {@link #fork}, that arranges asynchronous execution,
41     * and {@link #join}, that doesn't proceed until the task's result has
42 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
43 dl 1.2 * methods or blocks, and should minimize other blocking
44     * synchronization apart from joining other tasks or using
45 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
46     * fork/join scheduling. Tasks should also not perform blocking IO,
47     * and should ideally access variables that are completely independent
48     * of those accessed by other running tasks. Minor breaches of these
49     * restrictions, for example using shared output streams, may be
50     * tolerable in practice, but frequent use may result in poor
51     * performance, and the potential to indefinitely stall if the number
52 dl 1.2 * of threads not waiting for IO or other external synchronization
53     * becomes exhausted. This usage restriction is in part enforced by
54 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
55 dl 1.2 * to be thrown. However, computations may still encounter unchecked
56 dl 1.1 * exceptions, that are rethrown to callers attempting join
57     * them. These exceptions may additionally include
58     * RejectedExecutionExceptions stemming from internal resource
59     * exhaustion such as failure to allocate internal task queues.
60     *
61 dl 1.2 * <p>The primary method for awaiting completion and extracting
62     * results of a task is {@link #join}, but there are several variants:
63     * The {@link Future#get} methods support interruptible and/or timed
64 jsr166 1.8 * waits for completion and report results using {@code Future}
65 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
66     * execute other tasks while awaiting joins, which is sometimes more
67     * efficient but only applies when all subtasks are known to be
68     * strictly tree-structured. Method {@link #invoke} is semantically
69 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
70 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
71     * of these methods do not extract results or report exceptions. These
72     * may be useful when a set of tasks are being executed, and you need
73     * to delay processing of results or exceptions until all complete.
74 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
75 dl 1.2 * performs the most common form of parallel invocation: forking a set
76     * of tasks and joining them all.
77     *
78     * <p> The ForkJoinTask class is not usually directly subclassed.
79     * Instead, you subclass one of the abstract classes that support a
80     * particular style of fork/join processing. Normally, a concrete
81     * ForkJoinTask subclass declares fields comprising its parameters,
82 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
83 dl 1.2 * method that somehow uses the control methods supplied by this base
84 jsr166 1.8 * class. While these methods have {@code public} access (to allow
85 dl 1.2 * instances of different task subclasses to call each others
86     * methods), some of them may only be called from within other
87 dl 1.13 * ForkJoinTasks (as may be determined using method {@link
88     * #inForkJoinPool}). Attempts to invoke them in other contexts
89 jsr166 1.14 * result in exceptions or errors, possibly including
90 dl 1.13 * ClassCastException.
91 dl 1.1 *
92 jsr166 1.8 * <p>Most base support methods are {@code final} because their
93 dl 1.1 * implementations are intrinsically tied to the underlying
94     * lightweight task scheduling framework, and so cannot be overridden.
95     * Developers creating new basic styles of fork/join processing should
96 jsr166 1.8 * minimally implement {@code protected} methods
97     * {@code exec}, {@code setRawResult}, and
98     * {@code getRawResult}, while also introducing an abstract
99 dl 1.2 * computational method that can be implemented in its subclasses,
100 jsr166 1.8 * possibly relying on other {@code protected} methods provided
101 dl 1.2 * by this class.
102 dl 1.1 *
103     * <p>ForkJoinTasks should perform relatively small amounts of
104 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
105 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
106     * than 10000 basic computational steps. If tasks are too big, then
107 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
108 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
109     *
110 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
111 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
112     * in general sensible to serialize tasks only before or after, but
113 dl 1.1 * not during execution. Serialization is not relied on during
114     * execution itself.
115 jsr166 1.12 *
116     * @since 1.7
117     * @author Doug Lea
118 dl 1.1 */
119     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
120 dl 1.2
121 dl 1.1 /**
122 dl 1.2 * Run control status bits packed into a single int to minimize
123     * footprint and to ensure atomicity (via CAS). Status is
124     * initially zero, and takes on nonnegative values until
125 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
126     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
127     * blocking waits by other threads have SIGNAL_MASK bits set --
128     * bit 15 for external (nonFJ) waits, and the rest a count of
129     * waiting FJ threads. (This representation relies on
130     * ForkJoinPool max thread limits). Completion of a stolen task
131     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
132     * though suboptimal for some purposes, we use basic builtin
133     * wait/notify to take advantage of "monitor inflation" in JVMs
134     * that we would otherwise need to emulate to avoid adding further
135     * per-task bookkeeping overhead. Note that bits 16-28 are
136     * currently unused. Also value 0x80000000 is available as spare
137     * completion value.
138     */
139 jsr166 1.9 volatile int status; // accessed directly by pool and workers
140 dl 1.1
141     static final int COMPLETION_MASK = 0xe0000000;
142     static final int NORMAL = 0xe0000000; // == mask
143     static final int CANCELLED = 0xc0000000;
144     static final int EXCEPTIONAL = 0xa0000000;
145     static final int SIGNAL_MASK = 0x0000ffff;
146     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
147     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
148    
149     /**
150     * Table of exceptions thrown by tasks, to enable reporting by
151     * callers. Because exceptions are rare, we don't directly keep
152 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
153 dl 1.1 * that cancellation exceptions don't appear in the table, but are
154     * instead recorded as status values.
155 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
156 dl 1.1 */
157     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
158     Collections.synchronizedMap
159     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
160    
161     // within-package utilities
162    
163     /**
164 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
165 dl 1.1 */
166     static ForkJoinWorkerThread getWorker() {
167     Thread t = Thread.currentThread();
168 jsr166 1.14 return ((t instanceof ForkJoinWorkerThread) ?
169     (ForkJoinWorkerThread) t : null);
170 dl 1.1 }
171    
172     final boolean casStatus(int cmp, int val) {
173 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
174 dl 1.1 }
175    
176     /**
177     * Workaround for not being able to rethrow unchecked exceptions.
178     */
179     static void rethrowException(Throwable ex) {
180     if (ex != null)
181 jsr166 1.11 UNSAFE.throwException(ex);
182 dl 1.1 }
183    
184     // Setting completion status
185    
186     /**
187 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
188     *
189 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
190     */
191     final void setCompletion(int completion) {
192 dl 1.2 ForkJoinPool pool = getPool();
193 dl 1.1 if (pool != null) {
194     int s; // Clear signal bits while setting completion status
195 jsr166 1.14 do {} while ((s = status) >= 0 && !casStatus(s, completion));
196 dl 1.1
197     if ((s & SIGNAL_MASK) != 0) {
198     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
199     pool.updateRunningCount(s);
200 jsr166 1.14 synchronized (this) { notifyAll(); }
201 dl 1.1 }
202     }
203     else
204     externallySetCompletion(completion);
205     }
206    
207     /**
208     * Version of setCompletion for non-FJ threads. Leaves signal
209     * bits for unblocked threads to adjust, and always notifies.
210     */
211     private void externallySetCompletion(int completion) {
212     int s;
213 jsr166 1.14 do {} while ((s = status) >= 0 &&
214     !casStatus(s, (s & SIGNAL_MASK) | completion));
215     synchronized (this) { notifyAll(); }
216 dl 1.1 }
217    
218     /**
219 jsr166 1.14 * Sets status to indicate normal completion.
220 dl 1.1 */
221     final void setNormalCompletion() {
222     // Try typical fast case -- single CAS, no signal, not already done.
223     // Manually expand casStatus to improve chances of inlining it
224 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
225 dl 1.1 setCompletion(NORMAL);
226     }
227    
228     // internal waiting and notification
229    
230     /**
231 jsr166 1.14 * Performs the actual monitor wait for awaitDone.
232 dl 1.1 */
233     private void doAwaitDone() {
234     // Minimize lock bias and in/de-flation effects by maximizing
235     // chances of waiting inside sync
236     try {
237     while (status >= 0)
238 jsr166 1.14 synchronized (this) { if (status >= 0) wait(); }
239 dl 1.1 } catch (InterruptedException ie) {
240     onInterruptedWait();
241     }
242     }
243    
244     /**
245 jsr166 1.14 * Performs the actual timed monitor wait for awaitDone.
246 dl 1.1 */
247     private void doAwaitDone(long startTime, long nanos) {
248 jsr166 1.14 synchronized (this) {
249 dl 1.1 try {
250     while (status >= 0) {
251     long nt = nanos - System.nanoTime() - startTime;
252     if (nt <= 0)
253     break;
254 jsr166 1.14 wait(nt / 1000000, (int) (nt % 1000000));
255 dl 1.1 }
256     } catch (InterruptedException ie) {
257     onInterruptedWait();
258     }
259     }
260     }
261    
262     // Awaiting completion
263    
264     /**
265     * Sets status to indicate there is joiner, then waits for join,
266     * surrounded with pool notifications.
267 jsr166 1.10 *
268 dl 1.1 * @return status upon exit
269     */
270 jsr166 1.14 private int awaitDone(ForkJoinWorkerThread w,
271     boolean maintainParallelism) {
272     ForkJoinPool pool = (w == null) ? null : w.pool;
273 dl 1.1 int s;
274     while ((s = status) >= 0) {
275 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
276 dl 1.1 if (pool == null || !pool.preJoin(this, maintainParallelism))
277     doAwaitDone();
278     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
279     adjustPoolCountsOnUnblock(pool);
280     break;
281     }
282     }
283     return s;
284     }
285    
286     /**
287     * Timed version of awaitDone
288 jsr166 1.14 *
289 dl 1.1 * @return status upon exit
290     */
291 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
292 jsr166 1.14 ForkJoinPool pool = (w == null) ? null : w.pool;
293 dl 1.1 int s;
294     while ((s = status) >= 0) {
295 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
296 dl 1.1 long startTime = System.nanoTime();
297     if (pool == null || !pool.preJoin(this, false))
298     doAwaitDone(startTime, nanos);
299     if ((s = status) >= 0) {
300     adjustPoolCountsOnCancelledWait(pool);
301     s = status;
302     }
303     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
304     adjustPoolCountsOnUnblock(pool);
305     break;
306     }
307     }
308     return s;
309     }
310    
311     /**
312 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
313 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
314     */
315     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
316     int s;
317 jsr166 1.14 do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
318 dl 1.1 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
319     pool.updateRunningCount(s);
320     }
321    
322     /**
323 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
324 dl 1.1 */
325     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
326     if (pool != null) {
327     int s;
328     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
329     if (casStatus(s, s - 1)) {
330     pool.updateRunningCount(1);
331     break;
332     }
333     }
334     }
335     }
336    
337 dl 1.2 /**
338 jsr166 1.10 * Handles interruptions during waits.
339 dl 1.2 */
340 dl 1.1 private void onInterruptedWait() {
341 dl 1.2 ForkJoinWorkerThread w = getWorker();
342     if (w == null)
343     Thread.currentThread().interrupt(); // re-interrupt
344     else if (w.isTerminating())
345 dl 1.3 cancelIgnoringExceptions();
346 dl 1.2 // else if FJworker, ignore interrupt
347 dl 1.1 }
348    
349     // Recording and reporting exceptions
350    
351     private void setDoneExceptionally(Throwable rex) {
352     exceptionMap.put(this, rex);
353     setCompletion(EXCEPTIONAL);
354     }
355    
356     /**
357 jsr166 1.10 * Throws the exception associated with status s.
358     *
359 dl 1.1 * @throws the exception
360     */
361     private void reportException(int s) {
362     if ((s &= COMPLETION_MASK) < NORMAL) {
363     if (s == CANCELLED)
364     throw new CancellationException();
365     else
366     rethrowException(exceptionMap.get(this));
367     }
368     }
369    
370     /**
371 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
372 jsr166 1.14 * Only call when {@code isDone} known to be true.
373 dl 1.1 */
374     private V reportFutureResult()
375     throws ExecutionException, InterruptedException {
376     int s = status & COMPLETION_MASK;
377     if (s < NORMAL) {
378     Throwable ex;
379     if (s == CANCELLED)
380     throw new CancellationException();
381     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
382     throw new ExecutionException(ex);
383     if (Thread.interrupted())
384     throw new InterruptedException();
385     }
386     return getRawResult();
387     }
388    
389     /**
390     * Returns result or throws exception using j.u.c.Future conventions
391 jsr166 1.10 * with timeouts.
392 dl 1.1 */
393     private V reportTimedFutureResult()
394     throws InterruptedException, ExecutionException, TimeoutException {
395     Throwable ex;
396     int s = status & COMPLETION_MASK;
397     if (s == NORMAL)
398     return getRawResult();
399     if (s == CANCELLED)
400     throw new CancellationException();
401     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
402     throw new ExecutionException(ex);
403     if (Thread.interrupted())
404     throw new InterruptedException();
405     throw new TimeoutException();
406     }
407    
408     // internal execution methods
409    
410     /**
411     * Calls exec, recording completion, and rethrowing exception if
412 jsr166 1.10 * encountered. Caller should normally check status before calling.
413     *
414 dl 1.1 * @return true if completed normally
415     */
416     private boolean tryExec() {
417     try { // try block must contain only call to exec
418     if (!exec())
419     return false;
420     } catch (Throwable rex) {
421     setDoneExceptionally(rex);
422     rethrowException(rex);
423     return false; // not reached
424     }
425     setNormalCompletion();
426     return true;
427     }
428    
429     /**
430     * Main execution method used by worker threads. Invokes
431 jsr166 1.10 * base computation unless already complete.
432 dl 1.1 */
433     final void quietlyExec() {
434     if (status >= 0) {
435     try {
436     if (!exec())
437     return;
438 jsr166 1.14 } catch (Throwable rex) {
439 dl 1.1 setDoneExceptionally(rex);
440     return;
441     }
442     setNormalCompletion();
443     }
444     }
445    
446     /**
447 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
448     * Caller should normally check status before calling.
449     *
450 dl 1.1 * @return true if completed normally
451     */
452     private boolean tryQuietlyInvoke() {
453     try {
454     if (!exec())
455     return false;
456     } catch (Throwable rex) {
457     setDoneExceptionally(rex);
458     return false;
459     }
460     setNormalCompletion();
461     return true;
462     }
463    
464     /**
465 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
466 dl 1.1 */
467 dl 1.3 final void cancelIgnoringExceptions() {
468 dl 1.1 try {
469     cancel(false);
470 jsr166 1.14 } catch (Throwable ignore) {
471 dl 1.1 }
472     }
473    
474 dl 1.3 /**
475     * Main implementation of helpJoin
476     */
477     private int busyJoin(ForkJoinWorkerThread w) {
478     int s;
479     ForkJoinTask<?> t;
480     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
481     t.quietlyExec();
482 jsr166 1.14 return (s >= 0) ? awaitDone(w, false) : s; // block if no work
483 dl 1.3 }
484    
485 dl 1.1 // public methods
486    
487     /**
488     * Arranges to asynchronously execute this task. While it is not
489     * necessarily enforced, it is a usage error to fork a task more
490     * than once unless it has completed and been reinitialized. This
491 dl 1.2 * method may be invoked only from within ForkJoinTask
492 dl 1.13 * computations (as may be determined using method {@link
493     * #inForkJoinPool}). Attempts to invoke in other contexts result
494 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
495 dl 1.18 *
496     * @return <code>this</code>, to simplify usage.
497 dl 1.1 */
498 dl 1.18 public final ForkJoinTask<V> fork() {
499 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
500     .pushTask(this);
501 dl 1.18 return this;
502 dl 1.1 }
503    
504     /**
505     * Returns the result of the computation when it is ready.
506 jsr166 1.8 * This method differs from {@code get} in that abnormal
507 dl 1.1 * completion results in RuntimeExceptions or Errors, not
508     * ExecutionExceptions.
509     *
510     * @return the computed result
511     */
512     public final V join() {
513     ForkJoinWorkerThread w = getWorker();
514     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
515     reportException(awaitDone(w, true));
516     return getRawResult();
517     }
518    
519     /**
520 dl 1.2 * Commences performing this task, awaits its completion if
521     * necessary, and return its result.
522 jsr166 1.10 *
523 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
524 jsr166 1.10 * exception) if the underlying computation did so
525 dl 1.1 * @return the computed result
526     */
527     public final V invoke() {
528     if (status >= 0 && tryExec())
529     return getRawResult();
530     else
531     return join();
532     }
533    
534     /**
535 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
536 dl 1.2 * both of them or an exception is encountered. This method may be
537 dl 1.13 * invoked only from within ForkJoinTask computations (as may be
538     * determined using method {@link #inForkJoinPool}). Attempts to
539 jsr166 1.14 * invoke in other contexts result in exceptions or errors,
540 dl 1.4 * possibly including ClassCastException.
541 jsr166 1.10 *
542 dl 1.2 * @param t1 one task
543     * @param t2 the other task
544     * @throws NullPointerException if t1 or t2 are null
545 jsr166 1.10 * @throws RuntimeException or Error if either task did so
546 dl 1.1 */
547 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
548     t2.fork();
549     t1.invoke();
550     t2.join();
551 dl 1.1 }
552    
553     /**
554 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
555 dl 1.2 * for all of them. If any task encounters an exception, others
556     * may be cancelled. This method may be invoked only from within
557 dl 1.13 * ForkJoinTask computations (as may be determined using method
558     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
559 jsr166 1.14 * result in exceptions or errors, possibly including
560 dl 1.13 * ClassCastException.
561 jsr166 1.14 *
562 dl 1.2 * @param tasks the array of tasks
563 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
564     * @throws RuntimeException or Error if any task did so
565 dl 1.1 */
566 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
567     Throwable ex = null;
568     int last = tasks.length - 1;
569     for (int i = last; i >= 0; --i) {
570     ForkJoinTask<?> t = tasks[i];
571     if (t == null) {
572     if (ex == null)
573     ex = new NullPointerException();
574     }
575     else if (i != 0)
576     t.fork();
577     else {
578     t.quietlyInvoke();
579     if (ex == null)
580     ex = t.getException();
581     }
582     }
583     for (int i = 1; i <= last; ++i) {
584     ForkJoinTask<?> t = tasks[i];
585     if (t != null) {
586     if (ex != null)
587     t.cancel(false);
588     else {
589     t.quietlyJoin();
590     if (ex == null)
591     ex = t.getException();
592     }
593     }
594 dl 1.1 }
595 dl 1.2 if (ex != null)
596     rethrowException(ex);
597 dl 1.1 }
598    
599     /**
600 dl 1.2 * Forks all tasks in the collection, returning when
601 jsr166 1.8 * {@code isDone} holds for all of them. If any task
602 dl 1.2 * encounters an exception, others may be cancelled. This method
603 dl 1.13 * may be invoked only from within ForkJoinTask computations (as
604     * may be determined using method {@link
605 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
606     * in exceptions or errors, possibly including ClassCastException.
607 jsr166 1.10 *
608 dl 1.2 * @param tasks the collection of tasks
609 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
610     * @throws RuntimeException or Error if any task did so
611 dl 1.1 */
612 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
613 jsr166 1.15 if (!(tasks instanceof List<?>)) {
614 jsr166 1.14 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
615 dl 1.2 return;
616     }
617 jsr166 1.15 @SuppressWarnings("unchecked")
618 dl 1.2 List<? extends ForkJoinTask<?>> ts =
619 jsr166 1.14 (List<? extends ForkJoinTask<?>>) tasks;
620 dl 1.2 Throwable ex = null;
621     int last = ts.size() - 1;
622     for (int i = last; i >= 0; --i) {
623     ForkJoinTask<?> t = ts.get(i);
624     if (t == null) {
625     if (ex == null)
626     ex = new NullPointerException();
627     }
628     else if (i != 0)
629     t.fork();
630     else {
631     t.quietlyInvoke();
632     if (ex == null)
633     ex = t.getException();
634     }
635     }
636     for (int i = 1; i <= last; ++i) {
637     ForkJoinTask<?> t = ts.get(i);
638     if (t != null) {
639     if (ex != null)
640     t.cancel(false);
641     else {
642     t.quietlyJoin();
643     if (ex == null)
644     ex = t.getException();
645     }
646     }
647     }
648     if (ex != null)
649     rethrowException(ex);
650 dl 1.1 }
651    
652     /**
653     * Returns true if the computation performed by this task has
654     * completed (or has been cancelled).
655 jsr166 1.10 *
656 dl 1.1 * @return true if this computation has completed
657     */
658     public final boolean isDone() {
659     return status < 0;
660     }
661    
662     /**
663     * Returns true if this task was cancelled.
664 jsr166 1.10 *
665 dl 1.1 * @return true if this task was cancelled
666     */
667     public final boolean isCancelled() {
668     return (status & COMPLETION_MASK) == CANCELLED;
669     }
670    
671     /**
672     * Asserts that the results of this task's computation will not be
673 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
674 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
675     * will report true, and {@code join} will result in a
676     * {@code CancellationException} being thrown. Otherwise, when
677 dl 1.1 * cancellation races with completion, there are no guarantees
678 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
679     * {@code join} will return normally or via an exception, or
680 dl 1.1 * whether these behaviors will remain consistent upon repeated
681     * invocation.
682     *
683     * <p>This method may be overridden in subclasses, but if so, must
684     * still ensure that these minimal properties hold. In particular,
685     * the cancel method itself must not throw exceptions.
686     *
687     * <p> This method is designed to be invoked by <em>other</em>
688     * tasks. To terminate the current task, you can just return or
689     * throw an unchecked exception from its computation method, or
690 jsr166 1.8 * invoke {@code completeExceptionally}.
691 dl 1.1 *
692     * @param mayInterruptIfRunning this value is ignored in the
693     * default implementation because tasks are not in general
694 jsr166 1.14 * cancelled via interruption
695 dl 1.1 *
696     * @return true if this task is now cancelled
697     */
698     public boolean cancel(boolean mayInterruptIfRunning) {
699     setCompletion(CANCELLED);
700     return (status & COMPLETION_MASK) == CANCELLED;
701     }
702    
703     /**
704 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
705     *
706 dl 1.3 * @return true if this task threw an exception or was cancelled
707     */
708     public final boolean isCompletedAbnormally() {
709     return (status & COMPLETION_MASK) < NORMAL;
710     }
711    
712     /**
713     * Returns the exception thrown by the base computation, or a
714     * CancellationException if cancelled, or null if none or if the
715     * method has not yet completed.
716 jsr166 1.10 *
717 dl 1.3 * @return the exception, or null if none
718     */
719     public final Throwable getException() {
720     int s = status & COMPLETION_MASK;
721     if (s >= NORMAL)
722     return null;
723     if (s == CANCELLED)
724     return new CancellationException();
725     return exceptionMap.get(this);
726     }
727    
728     /**
729 dl 1.1 * Completes this task abnormally, and if not already aborted or
730     * cancelled, causes it to throw the given exception upon
731 jsr166 1.8 * {@code join} and related operations. This method may be used
732 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
733 dl 1.2 * completion of tasks that would not otherwise complete. Its use
734     * in other situations is likely to be wrong. This method is
735 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
736 dl 1.2 * implementation to maintain guarantees.
737     *
738 dl 1.1 * @param ex the exception to throw. If this exception is
739     * not a RuntimeException or Error, the actual exception thrown
740     * will be a RuntimeException with cause ex.
741     */
742     public void completeExceptionally(Throwable ex) {
743     setDoneExceptionally((ex instanceof RuntimeException) ||
744 jsr166 1.14 (ex instanceof Error) ? ex :
745 dl 1.1 new RuntimeException(ex));
746     }
747    
748     /**
749     * Completes this task, and if not already aborted or cancelled,
750 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
751 dl 1.1 * operations. This method may be used to provide results for
752     * asynchronous tasks, or to provide alternative handling for
753 dl 1.2 * tasks that would not otherwise complete normally. Its use in
754     * other situations is likely to be wrong. This method is
755 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
756 dl 1.2 * implementation to maintain guarantees.
757 dl 1.1 *
758 jsr166 1.10 * @param value the result value for this task
759 dl 1.1 */
760     public void complete(V value) {
761     try {
762     setRawResult(value);
763 jsr166 1.14 } catch (Throwable rex) {
764 dl 1.1 setDoneExceptionally(rex);
765     return;
766     }
767     setNormalCompletion();
768     }
769    
770 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
771     ForkJoinWorkerThread w = getWorker();
772     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
773     awaitDone(w, true);
774     return reportFutureResult();
775     }
776    
777     public final V get(long timeout, TimeUnit unit)
778     throws InterruptedException, ExecutionException, TimeoutException {
779     ForkJoinWorkerThread w = getWorker();
780     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
781     awaitDone(w, unit.toNanos(timeout));
782     return reportTimedFutureResult();
783     }
784    
785 dl 1.1 /**
786 dl 1.2 * Possibly executes other tasks until this task is ready, then
787     * returns the result of the computation. This method may be more
788 jsr166 1.8 * efficient than {@code join}, but is only applicable when
789 jsr166 1.9 * there are no potential dependencies between continuation of the
790 dl 1.2 * current task and that of any other task that might be executed
791     * while helping. (This usually holds for pure divide-and-conquer
792     * tasks). This method may be invoked only from within
793 dl 1.13 * ForkJoinTask computations (as may be determined using method
794     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
795 jsr166 1.14 * result in exceptions or errors, possibly including
796 dl 1.13 * ClassCastException.
797 jsr166 1.10 *
798 dl 1.2 * @return the computed result
799     */
800     public final V helpJoin() {
801 jsr166 1.14 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
802 dl 1.2 if (status < 0 || !w.unpushTask(this) || !tryExec())
803 dl 1.3 reportException(busyJoin(w));
804 dl 1.2 return getRawResult();
805     }
806    
807     /**
808     * Possibly executes other tasks until this task is ready. This
809     * method may be invoked only from within ForkJoinTask
810 dl 1.13 * computations (as may be determined using method {@link
811 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
812     * in exceptions or errors, possibly including ClassCastException.
813 dl 1.2 */
814     public final void quietlyHelpJoin() {
815     if (status >= 0) {
816     ForkJoinWorkerThread w =
817 jsr166 1.14 (ForkJoinWorkerThread) Thread.currentThread();
818 dl 1.2 if (!w.unpushTask(this) || !tryQuietlyInvoke())
819 dl 1.3 busyJoin(w);
820 dl 1.2 }
821     }
822    
823     /**
824     * Joins this task, without returning its result or throwing an
825     * exception. This method may be useful when processing
826     * collections of tasks when some have been cancelled or otherwise
827     * known to have aborted.
828     */
829     public final void quietlyJoin() {
830     if (status >= 0) {
831     ForkJoinWorkerThread w = getWorker();
832     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
833     awaitDone(w, true);
834     }
835     }
836    
837     /**
838     * Commences performing this task and awaits its completion if
839     * necessary, without returning its result or throwing an
840     * exception. This method may be useful when processing
841     * collections of tasks when some have been cancelled or otherwise
842     * known to have aborted.
843     */
844     public final void quietlyInvoke() {
845     if (status >= 0 && !tryQuietlyInvoke())
846     quietlyJoin();
847     }
848    
849     /**
850 dl 1.3 * Possibly executes tasks until the pool hosting the current task
851     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
852     * designs in which many tasks are forked, but none are explicitly
853     * joined, instead executing them until all are processed.
854     */
855     public static void helpQuiesce() {
856 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
857     .helpQuiescePool();
858 dl 1.3 }
859    
860     /**
861 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
862 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
863 dl 1.1 * this task, but only if reuse occurs when this task has either
864     * never been forked, or has been forked, then completed and all
865     * outstanding joins of this task have also completed. Effects
866     * under any other usage conditions are not guaranteed, and are
867     * almost surely wrong. This method may be useful when executing
868     * pre-constructed trees of subtasks in loops.
869     */
870     public void reinitialize() {
871     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
872     exceptionMap.remove(this);
873     status = 0;
874     }
875    
876     /**
877 dl 1.2 * Returns the pool hosting the current task execution, or null
878 dl 1.13 * if this task is executing outside of any ForkJoinPool.
879 jsr166 1.10 *
880 jsr166 1.14 * @return the pool, or null if none
881 dl 1.1 */
882 dl 1.2 public static ForkJoinPool getPool() {
883     Thread t = Thread.currentThread();
884 jsr166 1.15 return (t instanceof ForkJoinWorkerThread) ?
885     ((ForkJoinWorkerThread) t).pool : null;
886 dl 1.1 }
887    
888     /**
889 jsr166 1.14 * Returns {@code true} if the current thread is executing as a
890 dl 1.13 * ForkJoinPool computation.
891 jsr166 1.14 *
892     * @return {@code true} if the current thread is executing as a
893 dl 1.13 * ForkJoinPool computation, or false otherwise
894     */
895     public static boolean inForkJoinPool() {
896     return Thread.currentThread() instanceof ForkJoinWorkerThread;
897     }
898    
899     /**
900 dl 1.2 * Tries to unschedule this task for execution. This method will
901     * typically succeed if this task is the most recently forked task
902     * by the current thread, and has not commenced executing in
903     * another thread. This method may be useful when arranging
904     * alternative local processing of tasks that could have been, but
905     * were not, stolen. This method may be invoked only from within
906 dl 1.13 * ForkJoinTask computations (as may be determined using method
907     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
908 jsr166 1.14 * result in exceptions or errors, possibly including
909 dl 1.13 * ClassCastException.
910 jsr166 1.10 *
911 dl 1.2 * @return true if unforked
912 dl 1.1 */
913 dl 1.2 public boolean tryUnfork() {
914 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
915     .unpushTask(this);
916 dl 1.1 }
917    
918     /**
919 dl 1.2 * Returns an estimate of the number of tasks that have been
920     * forked by the current worker thread but not yet executed. This
921     * value may be useful for heuristic decisions about whether to
922     * fork other tasks.
923 jsr166 1.10 *
924 dl 1.2 * @return the number of tasks
925     */
926     public static int getQueuedTaskCount() {
927 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
928     .getQueueSize();
929 dl 1.2 }
930    
931     /**
932 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
933 dl 1.1 * held by the current worker thread than there are other worker
934 dl 1.2 * threads that might steal them. This value may be useful for
935     * heuristic decisions about whether to fork other tasks. In many
936     * usages of ForkJoinTasks, at steady state, each worker should
937     * aim to maintain a small constant surplus (for example, 3) of
938     * tasks, and to process computations locally if this threshold is
939     * exceeded.
940 jsr166 1.10 *
941 dl 1.1 * @return the surplus number of tasks, which may be negative
942     */
943 dl 1.2 public static int getSurplusQueuedTaskCount() {
944 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
945 dl 1.1 .getEstimatedSurplusTaskCount();
946     }
947    
948 dl 1.2 // Extension methods
949 dl 1.1
950     /**
951 jsr166 1.8 * Returns the result that would be returned by {@code join},
952 dl 1.2 * even if this task completed abnormally, or null if this task is
953     * not known to have been completed. This method is designed to
954     * aid debugging, as well as to support extensions. Its use in any
955     * other context is discouraged.
956 dl 1.1 *
957 jsr166 1.10 * @return the result, or null if not completed
958 dl 1.1 */
959     public abstract V getRawResult();
960    
961     /**
962     * Forces the given value to be returned as a result. This method
963     * is designed to support extensions, and should not in general be
964     * called otherwise.
965     *
966     * @param value the value
967     */
968     protected abstract void setRawResult(V value);
969    
970     /**
971     * Immediately performs the base action of this task. This method
972     * is designed to support extensions, and should not in general be
973     * called otherwise. The return value controls whether this task
974     * is considered to be done normally. It may return false in
975     * asynchronous actions that require explicit invocations of
976 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
977 dl 1.1 * to indicate abnormal exit.
978 jsr166 1.10 *
979 dl 1.1 * @return true if completed normally
980     * @throws Error or RuntimeException if encountered during computation
981     */
982     protected abstract boolean exec();
983    
984 dl 1.2 /**
985 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
986     * the current thread but not yet executed, if one is
987     * available. There is no guarantee that this task will actually
988     * be polled or executed next. This method is designed primarily
989     * to support extensions, and is unlikely to be useful otherwise.
990     * This method may be invoked only from within ForkJoinTask
991 dl 1.13 * computations (as may be determined using method {@link
992     * #inForkJoinPool}). Attempts to invoke in other contexts result
993 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
994 dl 1.2 *
995     * @return the next task, or null if none are available
996     */
997     protected static ForkJoinTask<?> peekNextLocalTask() {
998 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
999     .peekTask();
1000 dl 1.2 }
1001    
1002     /**
1003 dl 1.6 * Unschedules and returns, without executing, the next task
1004     * queued by the current thread but not yet executed. This method
1005     * is designed primarily to support extensions, and is unlikely to
1006     * be useful otherwise. This method may be invoked only from
1007 dl 1.13 * within ForkJoinTask computations (as may be determined using
1008     * method {@link #inForkJoinPool}). Attempts to invoke in other
1009 jsr166 1.14 * contexts result in exceptions or errors, possibly including
1010 dl 1.6 * ClassCastException.
1011 dl 1.2 *
1012     * @return the next task, or null if none are available
1013     */
1014     protected static ForkJoinTask<?> pollNextLocalTask() {
1015 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1016     .pollLocalTask();
1017 dl 1.2 }
1018 jsr166 1.7
1019 dl 1.2 /**
1020 dl 1.6 * Unschedules and returns, without executing, the next task
1021     * queued by the current thread but not yet executed, if one is
1022     * available, or if not available, a task that was forked by some
1023     * other thread, if available. Availability may be transient, so a
1024 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
1025 dl 1.6 * of the pool this task is operating in. This method is designed
1026     * primarily to support extensions, and is unlikely to be useful
1027     * otherwise. This method may be invoked only from within
1028 dl 1.13 * ForkJoinTask computations (as may be determined using method
1029     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1030 jsr166 1.14 * result in exceptions or errors, possibly including
1031 dl 1.6 * ClassCastException.
1032 dl 1.4 *
1033 dl 1.2 * @return a task, or null if none are available
1034     */
1035     protected static ForkJoinTask<?> pollTask() {
1036 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1037     .pollTask();
1038 dl 1.2 }
1039    
1040 dl 1.18 // adaptors
1041    
1042     /**
1043     * Returns a new ForkJoinTask that performs the <code>run</code>
1044     * method of the given Runnable as its action, and returns a null
1045     * result upon <code>join</code>.
1046     *
1047     * @param runnable the runnable action
1048     * @return the task
1049     */
1050     public static ForkJoinTask<Void> adapt(Runnable runnable) {
1051     return new ForkJoinPool.AdaptedRunnable<Void>(runnable, null);
1052     }
1053    
1054     /**
1055     * Returns a new ForkJoinTask that performs the <code>run</code>
1056     * method of the given Runnable as its action, and returns the
1057     * given result upon <code>join</code>.
1058     *
1059     * @param runnable the runnable action
1060     * @param result the result upon completion
1061     * @return the task
1062     */
1063     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1064     return new ForkJoinPool.AdaptedRunnable<T>(runnable, result);
1065     }
1066    
1067     /**
1068     * Returns a new ForkJoinTask that performs the <code>call</code>
1069     * method of the given Callable as its action, and returns its
1070     * result upon <code>join</code>, translating any checked
1071     * exceptions encountered into <code>RuntimeException<code>.
1072     *
1073     * @param callable the callable action
1074     * @return the task
1075     */
1076     public static <T> ForkJoinTask<T> adapt(Callable<T> callable) {
1077     return new ForkJoinPool.AdaptedCallable<T>(callable);
1078     }
1079    
1080 dl 1.1 // Serialization support
1081    
1082     private static final long serialVersionUID = -7721805057305804111L;
1083    
1084     /**
1085     * Save the state to a stream.
1086     *
1087     * @serialData the current run status and the exception thrown
1088 jsr166 1.10 * during execution, or null if none
1089 dl 1.1 * @param s the stream
1090     */
1091     private void writeObject(java.io.ObjectOutputStream s)
1092     throws java.io.IOException {
1093     s.defaultWriteObject();
1094     s.writeObject(getException());
1095     }
1096    
1097     /**
1098     * Reconstitute the instance from a stream.
1099 jsr166 1.10 *
1100 dl 1.1 * @param s the stream
1101     */
1102     private void readObject(java.io.ObjectInputStream s)
1103     throws java.io.IOException, ClassNotFoundException {
1104     s.defaultReadObject();
1105 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1106     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1107 dl 1.1 Object ex = s.readObject();
1108     if (ex != null)
1109 jsr166 1.14 setDoneExceptionally((Throwable) ex);
1110 dl 1.1 }
1111    
1112 jsr166 1.16 // Unsafe mechanics for jsr166y 3rd party package.
1113     private static sun.misc.Unsafe getUnsafe() {
1114 jsr166 1.5 try {
1115 jsr166 1.16 return sun.misc.Unsafe.getUnsafe();
1116 jsr166 1.5 } catch (SecurityException se) {
1117     try {
1118     return java.security.AccessController.doPrivileged
1119 jsr166 1.16 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
1120     public sun.misc.Unsafe run() throws Exception {
1121     return getUnsafeByReflection();
1122 jsr166 1.5 }});
1123     } catch (java.security.PrivilegedActionException e) {
1124 jsr166 1.16 throw new RuntimeException("Could not initialize intrinsics",
1125     e.getCause());
1126 jsr166 1.5 }
1127     }
1128     }
1129    
1130 jsr166 1.16 private static sun.misc.Unsafe getUnsafeByReflection()
1131 jsr166 1.5 throws NoSuchFieldException, IllegalAccessException {
1132 jsr166 1.16 java.lang.reflect.Field f =
1133     sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
1134 jsr166 1.5 f.setAccessible(true);
1135 jsr166 1.16 return (sun.misc.Unsafe) f.get(null);
1136 jsr166 1.5 }
1137    
1138 jsr166 1.16 private static long fieldOffset(String fieldName, Class<?> klazz) {
1139 dl 1.1 try {
1140 jsr166 1.16 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
1141     } catch (NoSuchFieldException e) {
1142     // Convert Exception to Error
1143     NoSuchFieldError error = new NoSuchFieldError(fieldName);
1144     error.initCause(e);
1145     throw error;
1146 jsr166 1.5 }
1147 dl 1.1 }
1148    
1149 jsr166 1.16 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1150     static final long statusOffset =
1151     fieldOffset("status", ForkJoinTask.class);
1152    
1153 dl 1.1 }