ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.15
Committed: Fri Jul 24 22:05:22 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.14: +4 -3 lines
Log Message:
warning suppression

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21 dl 1.4 *
22 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
26     * and {@code join}, or derivatives such as
27     * {@code invokeAll}. However, this class also provides a number
28 dl 1.2 * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31 dl 1.4 *
32 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
40 dl 1.2 * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
52 dl 1.2 * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61 jsr166 1.8 * waits for completion and report results using {@code Future}
62 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
67 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
72 dl 1.2 * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
80 dl 1.2 * method that somehow uses the control methods supplied by this base
81 jsr166 1.8 * class. While these methods have {@code public} access (to allow
82 dl 1.2 * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84 dl 1.13 * ForkJoinTasks (as may be determined using method {@link
85     * #inForkJoinPool}). Attempts to invoke them in other contexts
86 jsr166 1.14 * result in exceptions or errors, possibly including
87 dl 1.13 * ClassCastException.
88 dl 1.1 *
89 jsr166 1.8 * <p>Most base support methods are {@code final} because their
90 dl 1.1 * implementations are intrinsically tied to the underlying
91     * lightweight task scheduling framework, and so cannot be overridden.
92     * Developers creating new basic styles of fork/join processing should
93 jsr166 1.8 * minimally implement {@code protected} methods
94     * {@code exec}, {@code setRawResult}, and
95     * {@code getRawResult}, while also introducing an abstract
96 dl 1.2 * computational method that can be implemented in its subclasses,
97 jsr166 1.8 * possibly relying on other {@code protected} methods provided
98 dl 1.2 * by this class.
99 dl 1.1 *
100     * <p>ForkJoinTasks should perform relatively small amounts of
101 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
102 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
103     * than 10000 basic computational steps. If tasks are too big, then
104 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
105 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
106     *
107 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
108 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
109     * in general sensible to serialize tasks only before or after, but
110 dl 1.1 * not during execution. Serialization is not relied on during
111     * execution itself.
112 jsr166 1.12 *
113     * @since 1.7
114     * @author Doug Lea
115 dl 1.1 */
116     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
117 dl 1.2
118 dl 1.1 /**
119 dl 1.2 * Run control status bits packed into a single int to minimize
120     * footprint and to ensure atomicity (via CAS). Status is
121     * initially zero, and takes on nonnegative values until
122 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
123     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
124     * blocking waits by other threads have SIGNAL_MASK bits set --
125     * bit 15 for external (nonFJ) waits, and the rest a count of
126     * waiting FJ threads. (This representation relies on
127     * ForkJoinPool max thread limits). Completion of a stolen task
128     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
129     * though suboptimal for some purposes, we use basic builtin
130     * wait/notify to take advantage of "monitor inflation" in JVMs
131     * that we would otherwise need to emulate to avoid adding further
132     * per-task bookkeeping overhead. Note that bits 16-28 are
133     * currently unused. Also value 0x80000000 is available as spare
134     * completion value.
135     */
136 jsr166 1.9 volatile int status; // accessed directly by pool and workers
137 dl 1.1
138     static final int COMPLETION_MASK = 0xe0000000;
139     static final int NORMAL = 0xe0000000; // == mask
140     static final int CANCELLED = 0xc0000000;
141     static final int EXCEPTIONAL = 0xa0000000;
142     static final int SIGNAL_MASK = 0x0000ffff;
143     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
144     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
145    
146     /**
147     * Table of exceptions thrown by tasks, to enable reporting by
148     * callers. Because exceptions are rare, we don't directly keep
149 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
150 dl 1.1 * that cancellation exceptions don't appear in the table, but are
151     * instead recorded as status values.
152 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
153 dl 1.1 */
154     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
155     Collections.synchronizedMap
156     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
157    
158     // within-package utilities
159    
160     /**
161 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
162 dl 1.1 */
163     static ForkJoinWorkerThread getWorker() {
164     Thread t = Thread.currentThread();
165 jsr166 1.14 return ((t instanceof ForkJoinWorkerThread) ?
166     (ForkJoinWorkerThread) t : null);
167 dl 1.1 }
168    
169     final boolean casStatus(int cmp, int val) {
170 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
171 dl 1.1 }
172    
173     /**
174     * Workaround for not being able to rethrow unchecked exceptions.
175     */
176     static void rethrowException(Throwable ex) {
177     if (ex != null)
178 jsr166 1.11 UNSAFE.throwException(ex);
179 dl 1.1 }
180    
181     // Setting completion status
182    
183     /**
184 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
185     *
186 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
187     */
188     final void setCompletion(int completion) {
189 dl 1.2 ForkJoinPool pool = getPool();
190 dl 1.1 if (pool != null) {
191     int s; // Clear signal bits while setting completion status
192 jsr166 1.14 do {} while ((s = status) >= 0 && !casStatus(s, completion));
193 dl 1.1
194     if ((s & SIGNAL_MASK) != 0) {
195     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
196     pool.updateRunningCount(s);
197 jsr166 1.14 synchronized (this) { notifyAll(); }
198 dl 1.1 }
199     }
200     else
201     externallySetCompletion(completion);
202     }
203    
204     /**
205     * Version of setCompletion for non-FJ threads. Leaves signal
206     * bits for unblocked threads to adjust, and always notifies.
207     */
208     private void externallySetCompletion(int completion) {
209     int s;
210 jsr166 1.14 do {} while ((s = status) >= 0 &&
211     !casStatus(s, (s & SIGNAL_MASK) | completion));
212     synchronized (this) { notifyAll(); }
213 dl 1.1 }
214    
215     /**
216 jsr166 1.14 * Sets status to indicate normal completion.
217 dl 1.1 */
218     final void setNormalCompletion() {
219     // Try typical fast case -- single CAS, no signal, not already done.
220     // Manually expand casStatus to improve chances of inlining it
221 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
222 dl 1.1 setCompletion(NORMAL);
223     }
224    
225     // internal waiting and notification
226    
227     /**
228 jsr166 1.14 * Performs the actual monitor wait for awaitDone.
229 dl 1.1 */
230     private void doAwaitDone() {
231     // Minimize lock bias and in/de-flation effects by maximizing
232     // chances of waiting inside sync
233     try {
234     while (status >= 0)
235 jsr166 1.14 synchronized (this) { if (status >= 0) wait(); }
236 dl 1.1 } catch (InterruptedException ie) {
237     onInterruptedWait();
238     }
239     }
240    
241     /**
242 jsr166 1.14 * Performs the actual timed monitor wait for awaitDone.
243 dl 1.1 */
244     private void doAwaitDone(long startTime, long nanos) {
245 jsr166 1.14 synchronized (this) {
246 dl 1.1 try {
247     while (status >= 0) {
248     long nt = nanos - System.nanoTime() - startTime;
249     if (nt <= 0)
250     break;
251 jsr166 1.14 wait(nt / 1000000, (int) (nt % 1000000));
252 dl 1.1 }
253     } catch (InterruptedException ie) {
254     onInterruptedWait();
255     }
256     }
257     }
258    
259     // Awaiting completion
260    
261     /**
262     * Sets status to indicate there is joiner, then waits for join,
263     * surrounded with pool notifications.
264 jsr166 1.10 *
265 dl 1.1 * @return status upon exit
266     */
267 jsr166 1.14 private int awaitDone(ForkJoinWorkerThread w,
268     boolean maintainParallelism) {
269     ForkJoinPool pool = (w == null) ? null : w.pool;
270 dl 1.1 int s;
271     while ((s = status) >= 0) {
272 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
273 dl 1.1 if (pool == null || !pool.preJoin(this, maintainParallelism))
274     doAwaitDone();
275     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
276     adjustPoolCountsOnUnblock(pool);
277     break;
278     }
279     }
280     return s;
281     }
282    
283     /**
284     * Timed version of awaitDone
285 jsr166 1.14 *
286 dl 1.1 * @return status upon exit
287     */
288 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
289 jsr166 1.14 ForkJoinPool pool = (w == null) ? null : w.pool;
290 dl 1.1 int s;
291     while ((s = status) >= 0) {
292 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
293 dl 1.1 long startTime = System.nanoTime();
294     if (pool == null || !pool.preJoin(this, false))
295     doAwaitDone(startTime, nanos);
296     if ((s = status) >= 0) {
297     adjustPoolCountsOnCancelledWait(pool);
298     s = status;
299     }
300     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
301     adjustPoolCountsOnUnblock(pool);
302     break;
303     }
304     }
305     return s;
306     }
307    
308     /**
309 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
310 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
311     */
312     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
313     int s;
314 jsr166 1.14 do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
315 dl 1.1 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
316     pool.updateRunningCount(s);
317     }
318    
319     /**
320 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
321 dl 1.1 */
322     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
323     if (pool != null) {
324     int s;
325     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
326     if (casStatus(s, s - 1)) {
327     pool.updateRunningCount(1);
328     break;
329     }
330     }
331     }
332     }
333    
334 dl 1.2 /**
335 jsr166 1.10 * Handles interruptions during waits.
336 dl 1.2 */
337 dl 1.1 private void onInterruptedWait() {
338 dl 1.2 ForkJoinWorkerThread w = getWorker();
339     if (w == null)
340     Thread.currentThread().interrupt(); // re-interrupt
341     else if (w.isTerminating())
342 dl 1.3 cancelIgnoringExceptions();
343 dl 1.2 // else if FJworker, ignore interrupt
344 dl 1.1 }
345    
346     // Recording and reporting exceptions
347    
348     private void setDoneExceptionally(Throwable rex) {
349     exceptionMap.put(this, rex);
350     setCompletion(EXCEPTIONAL);
351     }
352    
353     /**
354 jsr166 1.10 * Throws the exception associated with status s.
355     *
356 dl 1.1 * @throws the exception
357     */
358     private void reportException(int s) {
359     if ((s &= COMPLETION_MASK) < NORMAL) {
360     if (s == CANCELLED)
361     throw new CancellationException();
362     else
363     rethrowException(exceptionMap.get(this));
364     }
365     }
366    
367     /**
368 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
369 jsr166 1.14 * Only call when {@code isDone} known to be true.
370 dl 1.1 */
371     private V reportFutureResult()
372     throws ExecutionException, InterruptedException {
373     int s = status & COMPLETION_MASK;
374     if (s < NORMAL) {
375     Throwable ex;
376     if (s == CANCELLED)
377     throw new CancellationException();
378     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
379     throw new ExecutionException(ex);
380     if (Thread.interrupted())
381     throw new InterruptedException();
382     }
383     return getRawResult();
384     }
385    
386     /**
387     * Returns result or throws exception using j.u.c.Future conventions
388 jsr166 1.10 * with timeouts.
389 dl 1.1 */
390     private V reportTimedFutureResult()
391     throws InterruptedException, ExecutionException, TimeoutException {
392     Throwable ex;
393     int s = status & COMPLETION_MASK;
394     if (s == NORMAL)
395     return getRawResult();
396     if (s == CANCELLED)
397     throw new CancellationException();
398     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
399     throw new ExecutionException(ex);
400     if (Thread.interrupted())
401     throw new InterruptedException();
402     throw new TimeoutException();
403     }
404    
405     // internal execution methods
406    
407     /**
408     * Calls exec, recording completion, and rethrowing exception if
409 jsr166 1.10 * encountered. Caller should normally check status before calling.
410     *
411 dl 1.1 * @return true if completed normally
412     */
413     private boolean tryExec() {
414     try { // try block must contain only call to exec
415     if (!exec())
416     return false;
417     } catch (Throwable rex) {
418     setDoneExceptionally(rex);
419     rethrowException(rex);
420     return false; // not reached
421     }
422     setNormalCompletion();
423     return true;
424     }
425    
426     /**
427     * Main execution method used by worker threads. Invokes
428 jsr166 1.10 * base computation unless already complete.
429 dl 1.1 */
430     final void quietlyExec() {
431     if (status >= 0) {
432     try {
433     if (!exec())
434     return;
435 jsr166 1.14 } catch (Throwable rex) {
436 dl 1.1 setDoneExceptionally(rex);
437     return;
438     }
439     setNormalCompletion();
440     }
441     }
442    
443     /**
444 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
445     * Caller should normally check status before calling.
446     *
447 dl 1.1 * @return true if completed normally
448     */
449     private boolean tryQuietlyInvoke() {
450     try {
451     if (!exec())
452     return false;
453     } catch (Throwable rex) {
454     setDoneExceptionally(rex);
455     return false;
456     }
457     setNormalCompletion();
458     return true;
459     }
460    
461     /**
462 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
463 dl 1.1 */
464 dl 1.3 final void cancelIgnoringExceptions() {
465 dl 1.1 try {
466     cancel(false);
467 jsr166 1.14 } catch (Throwable ignore) {
468 dl 1.1 }
469     }
470    
471 dl 1.3 /**
472     * Main implementation of helpJoin
473     */
474     private int busyJoin(ForkJoinWorkerThread w) {
475     int s;
476     ForkJoinTask<?> t;
477     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
478     t.quietlyExec();
479 jsr166 1.14 return (s >= 0) ? awaitDone(w, false) : s; // block if no work
480 dl 1.3 }
481    
482 dl 1.1 // public methods
483    
484     /**
485     * Arranges to asynchronously execute this task. While it is not
486     * necessarily enforced, it is a usage error to fork a task more
487     * than once unless it has completed and been reinitialized. This
488 dl 1.2 * method may be invoked only from within ForkJoinTask
489 dl 1.13 * computations (as may be determined using method {@link
490     * #inForkJoinPool}). Attempts to invoke in other contexts result
491 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
492 dl 1.1 */
493     public final void fork() {
494 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
495     .pushTask(this);
496 dl 1.1 }
497    
498     /**
499     * Returns the result of the computation when it is ready.
500 jsr166 1.8 * This method differs from {@code get} in that abnormal
501 dl 1.1 * completion results in RuntimeExceptions or Errors, not
502     * ExecutionExceptions.
503     *
504     * @return the computed result
505     */
506     public final V join() {
507     ForkJoinWorkerThread w = getWorker();
508     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
509     reportException(awaitDone(w, true));
510     return getRawResult();
511     }
512    
513     /**
514 dl 1.2 * Commences performing this task, awaits its completion if
515     * necessary, and return its result.
516 jsr166 1.10 *
517 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
518 jsr166 1.10 * exception) if the underlying computation did so
519 dl 1.1 * @return the computed result
520     */
521     public final V invoke() {
522     if (status >= 0 && tryExec())
523     return getRawResult();
524     else
525     return join();
526     }
527    
528     /**
529 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
530 dl 1.2 * both of them or an exception is encountered. This method may be
531 dl 1.13 * invoked only from within ForkJoinTask computations (as may be
532     * determined using method {@link #inForkJoinPool}). Attempts to
533 jsr166 1.14 * invoke in other contexts result in exceptions or errors,
534 dl 1.4 * possibly including ClassCastException.
535 jsr166 1.10 *
536 dl 1.2 * @param t1 one task
537     * @param t2 the other task
538     * @throws NullPointerException if t1 or t2 are null
539 jsr166 1.10 * @throws RuntimeException or Error if either task did so
540 dl 1.1 */
541 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
542     t2.fork();
543     t1.invoke();
544     t2.join();
545 dl 1.1 }
546    
547     /**
548 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
549 dl 1.2 * for all of them. If any task encounters an exception, others
550     * may be cancelled. This method may be invoked only from within
551 dl 1.13 * ForkJoinTask computations (as may be determined using method
552     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
553 jsr166 1.14 * result in exceptions or errors, possibly including
554 dl 1.13 * ClassCastException.
555 jsr166 1.14 *
556 dl 1.2 * @param tasks the array of tasks
557 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
558     * @throws RuntimeException or Error if any task did so
559 dl 1.1 */
560 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
561     Throwable ex = null;
562     int last = tasks.length - 1;
563     for (int i = last; i >= 0; --i) {
564     ForkJoinTask<?> t = tasks[i];
565     if (t == null) {
566     if (ex == null)
567     ex = new NullPointerException();
568     }
569     else if (i != 0)
570     t.fork();
571     else {
572     t.quietlyInvoke();
573     if (ex == null)
574     ex = t.getException();
575     }
576     }
577     for (int i = 1; i <= last; ++i) {
578     ForkJoinTask<?> t = tasks[i];
579     if (t != null) {
580     if (ex != null)
581     t.cancel(false);
582     else {
583     t.quietlyJoin();
584     if (ex == null)
585     ex = t.getException();
586     }
587     }
588 dl 1.1 }
589 dl 1.2 if (ex != null)
590     rethrowException(ex);
591 dl 1.1 }
592    
593     /**
594 dl 1.2 * Forks all tasks in the collection, returning when
595 jsr166 1.8 * {@code isDone} holds for all of them. If any task
596 dl 1.2 * encounters an exception, others may be cancelled. This method
597 dl 1.13 * may be invoked only from within ForkJoinTask computations (as
598     * may be determined using method {@link
599 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
600     * in exceptions or errors, possibly including ClassCastException.
601 jsr166 1.10 *
602 dl 1.2 * @param tasks the collection of tasks
603 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
604     * @throws RuntimeException or Error if any task did so
605 dl 1.1 */
606 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
607 jsr166 1.15 if (!(tasks instanceof List<?>)) {
608 jsr166 1.14 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
609 dl 1.2 return;
610     }
611 jsr166 1.15 @SuppressWarnings("unchecked")
612 dl 1.2 List<? extends ForkJoinTask<?>> ts =
613 jsr166 1.14 (List<? extends ForkJoinTask<?>>) tasks;
614 dl 1.2 Throwable ex = null;
615     int last = ts.size() - 1;
616     for (int i = last; i >= 0; --i) {
617     ForkJoinTask<?> t = ts.get(i);
618     if (t == null) {
619     if (ex == null)
620     ex = new NullPointerException();
621     }
622     else if (i != 0)
623     t.fork();
624     else {
625     t.quietlyInvoke();
626     if (ex == null)
627     ex = t.getException();
628     }
629     }
630     for (int i = 1; i <= last; ++i) {
631     ForkJoinTask<?> t = ts.get(i);
632     if (t != null) {
633     if (ex != null)
634     t.cancel(false);
635     else {
636     t.quietlyJoin();
637     if (ex == null)
638     ex = t.getException();
639     }
640     }
641     }
642     if (ex != null)
643     rethrowException(ex);
644 dl 1.1 }
645    
646     /**
647     * Returns true if the computation performed by this task has
648     * completed (or has been cancelled).
649 jsr166 1.10 *
650 dl 1.1 * @return true if this computation has completed
651     */
652     public final boolean isDone() {
653     return status < 0;
654     }
655    
656     /**
657     * Returns true if this task was cancelled.
658 jsr166 1.10 *
659 dl 1.1 * @return true if this task was cancelled
660     */
661     public final boolean isCancelled() {
662     return (status & COMPLETION_MASK) == CANCELLED;
663     }
664    
665     /**
666     * Asserts that the results of this task's computation will not be
667 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
668 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
669     * will report true, and {@code join} will result in a
670     * {@code CancellationException} being thrown. Otherwise, when
671 dl 1.1 * cancellation races with completion, there are no guarantees
672 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
673     * {@code join} will return normally or via an exception, or
674 dl 1.1 * whether these behaviors will remain consistent upon repeated
675     * invocation.
676     *
677     * <p>This method may be overridden in subclasses, but if so, must
678     * still ensure that these minimal properties hold. In particular,
679     * the cancel method itself must not throw exceptions.
680     *
681     * <p> This method is designed to be invoked by <em>other</em>
682     * tasks. To terminate the current task, you can just return or
683     * throw an unchecked exception from its computation method, or
684 jsr166 1.8 * invoke {@code completeExceptionally}.
685 dl 1.1 *
686     * @param mayInterruptIfRunning this value is ignored in the
687     * default implementation because tasks are not in general
688 jsr166 1.14 * cancelled via interruption
689 dl 1.1 *
690     * @return true if this task is now cancelled
691     */
692     public boolean cancel(boolean mayInterruptIfRunning) {
693     setCompletion(CANCELLED);
694     return (status & COMPLETION_MASK) == CANCELLED;
695     }
696    
697     /**
698 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
699     *
700 dl 1.3 * @return true if this task threw an exception or was cancelled
701     */
702     public final boolean isCompletedAbnormally() {
703     return (status & COMPLETION_MASK) < NORMAL;
704     }
705    
706     /**
707     * Returns the exception thrown by the base computation, or a
708     * CancellationException if cancelled, or null if none or if the
709     * method has not yet completed.
710 jsr166 1.10 *
711 dl 1.3 * @return the exception, or null if none
712     */
713     public final Throwable getException() {
714     int s = status & COMPLETION_MASK;
715     if (s >= NORMAL)
716     return null;
717     if (s == CANCELLED)
718     return new CancellationException();
719     return exceptionMap.get(this);
720     }
721    
722     /**
723 dl 1.1 * Completes this task abnormally, and if not already aborted or
724     * cancelled, causes it to throw the given exception upon
725 jsr166 1.8 * {@code join} and related operations. This method may be used
726 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
727 dl 1.2 * completion of tasks that would not otherwise complete. Its use
728     * in other situations is likely to be wrong. This method is
729 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
730 dl 1.2 * implementation to maintain guarantees.
731     *
732 dl 1.1 * @param ex the exception to throw. If this exception is
733     * not a RuntimeException or Error, the actual exception thrown
734     * will be a RuntimeException with cause ex.
735     */
736     public void completeExceptionally(Throwable ex) {
737     setDoneExceptionally((ex instanceof RuntimeException) ||
738 jsr166 1.14 (ex instanceof Error) ? ex :
739 dl 1.1 new RuntimeException(ex));
740     }
741    
742     /**
743     * Completes this task, and if not already aborted or cancelled,
744 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
745 dl 1.1 * operations. This method may be used to provide results for
746     * asynchronous tasks, or to provide alternative handling for
747 dl 1.2 * tasks that would not otherwise complete normally. Its use in
748     * other situations is likely to be wrong. This method is
749 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
750 dl 1.2 * implementation to maintain guarantees.
751 dl 1.1 *
752 jsr166 1.10 * @param value the result value for this task
753 dl 1.1 */
754     public void complete(V value) {
755     try {
756     setRawResult(value);
757 jsr166 1.14 } catch (Throwable rex) {
758 dl 1.1 setDoneExceptionally(rex);
759     return;
760     }
761     setNormalCompletion();
762     }
763    
764 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
765     ForkJoinWorkerThread w = getWorker();
766     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
767     awaitDone(w, true);
768     return reportFutureResult();
769     }
770    
771     public final V get(long timeout, TimeUnit unit)
772     throws InterruptedException, ExecutionException, TimeoutException {
773     ForkJoinWorkerThread w = getWorker();
774     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
775     awaitDone(w, unit.toNanos(timeout));
776     return reportTimedFutureResult();
777     }
778    
779 dl 1.1 /**
780 dl 1.2 * Possibly executes other tasks until this task is ready, then
781     * returns the result of the computation. This method may be more
782 jsr166 1.8 * efficient than {@code join}, but is only applicable when
783 jsr166 1.9 * there are no potential dependencies between continuation of the
784 dl 1.2 * current task and that of any other task that might be executed
785     * while helping. (This usually holds for pure divide-and-conquer
786     * tasks). This method may be invoked only from within
787 dl 1.13 * ForkJoinTask computations (as may be determined using method
788     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
789 jsr166 1.14 * result in exceptions or errors, possibly including
790 dl 1.13 * ClassCastException.
791 jsr166 1.10 *
792 dl 1.2 * @return the computed result
793     */
794     public final V helpJoin() {
795 jsr166 1.14 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
796 dl 1.2 if (status < 0 || !w.unpushTask(this) || !tryExec())
797 dl 1.3 reportException(busyJoin(w));
798 dl 1.2 return getRawResult();
799     }
800    
801     /**
802     * Possibly executes other tasks until this task is ready. This
803     * method may be invoked only from within ForkJoinTask
804 dl 1.13 * computations (as may be determined using method {@link
805 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
806     * in exceptions or errors, possibly including ClassCastException.
807 dl 1.2 */
808     public final void quietlyHelpJoin() {
809     if (status >= 0) {
810     ForkJoinWorkerThread w =
811 jsr166 1.14 (ForkJoinWorkerThread) Thread.currentThread();
812 dl 1.2 if (!w.unpushTask(this) || !tryQuietlyInvoke())
813 dl 1.3 busyJoin(w);
814 dl 1.2 }
815     }
816    
817     /**
818     * Joins this task, without returning its result or throwing an
819     * exception. This method may be useful when processing
820     * collections of tasks when some have been cancelled or otherwise
821     * known to have aborted.
822     */
823     public final void quietlyJoin() {
824     if (status >= 0) {
825     ForkJoinWorkerThread w = getWorker();
826     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
827     awaitDone(w, true);
828     }
829     }
830    
831     /**
832     * Commences performing this task and awaits its completion if
833     * necessary, without returning its result or throwing an
834     * exception. This method may be useful when processing
835     * collections of tasks when some have been cancelled or otherwise
836     * known to have aborted.
837     */
838     public final void quietlyInvoke() {
839     if (status >= 0 && !tryQuietlyInvoke())
840     quietlyJoin();
841     }
842    
843     /**
844 dl 1.3 * Possibly executes tasks until the pool hosting the current task
845     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
846     * designs in which many tasks are forked, but none are explicitly
847     * joined, instead executing them until all are processed.
848     */
849     public static void helpQuiesce() {
850 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
851     .helpQuiescePool();
852 dl 1.3 }
853    
854     /**
855 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
856 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
857 dl 1.1 * this task, but only if reuse occurs when this task has either
858     * never been forked, or has been forked, then completed and all
859     * outstanding joins of this task have also completed. Effects
860     * under any other usage conditions are not guaranteed, and are
861     * almost surely wrong. This method may be useful when executing
862     * pre-constructed trees of subtasks in loops.
863     */
864     public void reinitialize() {
865     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
866     exceptionMap.remove(this);
867     status = 0;
868     }
869    
870     /**
871 dl 1.2 * Returns the pool hosting the current task execution, or null
872 dl 1.13 * if this task is executing outside of any ForkJoinPool.
873 jsr166 1.10 *
874 jsr166 1.14 * @return the pool, or null if none
875 dl 1.1 */
876 dl 1.2 public static ForkJoinPool getPool() {
877     Thread t = Thread.currentThread();
878 jsr166 1.15 return (t instanceof ForkJoinWorkerThread) ?
879     ((ForkJoinWorkerThread) t).pool : null;
880 dl 1.1 }
881    
882     /**
883 jsr166 1.14 * Returns {@code true} if the current thread is executing as a
884 dl 1.13 * ForkJoinPool computation.
885 jsr166 1.14 *
886     * @return {@code true} if the current thread is executing as a
887 dl 1.13 * ForkJoinPool computation, or false otherwise
888     */
889     public static boolean inForkJoinPool() {
890     return Thread.currentThread() instanceof ForkJoinWorkerThread;
891     }
892    
893     /**
894 dl 1.2 * Tries to unschedule this task for execution. This method will
895     * typically succeed if this task is the most recently forked task
896     * by the current thread, and has not commenced executing in
897     * another thread. This method may be useful when arranging
898     * alternative local processing of tasks that could have been, but
899     * were not, stolen. This method may be invoked only from within
900 dl 1.13 * ForkJoinTask computations (as may be determined using method
901     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
902 jsr166 1.14 * result in exceptions or errors, possibly including
903 dl 1.13 * ClassCastException.
904 jsr166 1.10 *
905 dl 1.2 * @return true if unforked
906 dl 1.1 */
907 dl 1.2 public boolean tryUnfork() {
908 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
909     .unpushTask(this);
910 dl 1.1 }
911    
912     /**
913 dl 1.2 * Returns an estimate of the number of tasks that have been
914     * forked by the current worker thread but not yet executed. This
915     * value may be useful for heuristic decisions about whether to
916     * fork other tasks.
917 jsr166 1.10 *
918 dl 1.2 * @return the number of tasks
919     */
920     public static int getQueuedTaskCount() {
921 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
922     .getQueueSize();
923 dl 1.2 }
924    
925     /**
926 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
927 dl 1.1 * held by the current worker thread than there are other worker
928 dl 1.2 * threads that might steal them. This value may be useful for
929     * heuristic decisions about whether to fork other tasks. In many
930     * usages of ForkJoinTasks, at steady state, each worker should
931     * aim to maintain a small constant surplus (for example, 3) of
932     * tasks, and to process computations locally if this threshold is
933     * exceeded.
934 jsr166 1.10 *
935 dl 1.1 * @return the surplus number of tasks, which may be negative
936     */
937 dl 1.2 public static int getSurplusQueuedTaskCount() {
938 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
939 dl 1.1 .getEstimatedSurplusTaskCount();
940     }
941    
942 dl 1.2 // Extension methods
943 dl 1.1
944     /**
945 jsr166 1.8 * Returns the result that would be returned by {@code join},
946 dl 1.2 * even if this task completed abnormally, or null if this task is
947     * not known to have been completed. This method is designed to
948     * aid debugging, as well as to support extensions. Its use in any
949     * other context is discouraged.
950 dl 1.1 *
951 jsr166 1.10 * @return the result, or null if not completed
952 dl 1.1 */
953     public abstract V getRawResult();
954    
955     /**
956     * Forces the given value to be returned as a result. This method
957     * is designed to support extensions, and should not in general be
958     * called otherwise.
959     *
960     * @param value the value
961     */
962     protected abstract void setRawResult(V value);
963    
964     /**
965     * Immediately performs the base action of this task. This method
966     * is designed to support extensions, and should not in general be
967     * called otherwise. The return value controls whether this task
968     * is considered to be done normally. It may return false in
969     * asynchronous actions that require explicit invocations of
970 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
971 dl 1.1 * to indicate abnormal exit.
972 jsr166 1.10 *
973 dl 1.1 * @return true if completed normally
974     * @throws Error or RuntimeException if encountered during computation
975     */
976     protected abstract boolean exec();
977    
978 dl 1.2 /**
979 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
980     * the current thread but not yet executed, if one is
981     * available. There is no guarantee that this task will actually
982     * be polled or executed next. This method is designed primarily
983     * to support extensions, and is unlikely to be useful otherwise.
984     * This method may be invoked only from within ForkJoinTask
985 dl 1.13 * computations (as may be determined using method {@link
986     * #inForkJoinPool}). Attempts to invoke in other contexts result
987 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
988 dl 1.2 *
989     * @return the next task, or null if none are available
990     */
991     protected static ForkJoinTask<?> peekNextLocalTask() {
992 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
993     .peekTask();
994 dl 1.2 }
995    
996     /**
997 dl 1.6 * Unschedules and returns, without executing, the next task
998     * queued by the current thread but not yet executed. This method
999     * is designed primarily to support extensions, and is unlikely to
1000     * be useful otherwise. This method may be invoked only from
1001 dl 1.13 * within ForkJoinTask computations (as may be determined using
1002     * method {@link #inForkJoinPool}). Attempts to invoke in other
1003 jsr166 1.14 * contexts result in exceptions or errors, possibly including
1004 dl 1.6 * ClassCastException.
1005 dl 1.2 *
1006     * @return the next task, or null if none are available
1007     */
1008     protected static ForkJoinTask<?> pollNextLocalTask() {
1009 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1010     .pollLocalTask();
1011 dl 1.2 }
1012 jsr166 1.7
1013 dl 1.2 /**
1014 dl 1.6 * Unschedules and returns, without executing, the next task
1015     * queued by the current thread but not yet executed, if one is
1016     * available, or if not available, a task that was forked by some
1017     * other thread, if available. Availability may be transient, so a
1018 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
1019 dl 1.6 * of the pool this task is operating in. This method is designed
1020     * primarily to support extensions, and is unlikely to be useful
1021     * otherwise. This method may be invoked only from within
1022 dl 1.13 * ForkJoinTask computations (as may be determined using method
1023     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1024 jsr166 1.14 * result in exceptions or errors, possibly including
1025 dl 1.6 * ClassCastException.
1026 dl 1.4 *
1027 dl 1.2 * @return a task, or null if none are available
1028     */
1029     protected static ForkJoinTask<?> pollTask() {
1030 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1031     .pollTask();
1032 dl 1.2 }
1033    
1034 dl 1.1 // Serialization support
1035    
1036     private static final long serialVersionUID = -7721805057305804111L;
1037    
1038     /**
1039     * Save the state to a stream.
1040     *
1041     * @serialData the current run status and the exception thrown
1042 jsr166 1.10 * during execution, or null if none
1043 dl 1.1 * @param s the stream
1044     */
1045     private void writeObject(java.io.ObjectOutputStream s)
1046     throws java.io.IOException {
1047     s.defaultWriteObject();
1048     s.writeObject(getException());
1049     }
1050    
1051     /**
1052     * Reconstitute the instance from a stream.
1053 jsr166 1.10 *
1054 dl 1.1 * @param s the stream
1055     */
1056     private void readObject(java.io.ObjectInputStream s)
1057     throws java.io.IOException, ClassNotFoundException {
1058     s.defaultReadObject();
1059 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1060     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1061 dl 1.1 Object ex = s.readObject();
1062     if (ex != null)
1063 jsr166 1.14 setDoneExceptionally((Throwable) ex);
1064 dl 1.1 }
1065    
1066     // Temporary Unsafe mechanics for preliminary release
1067 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1068     try {
1069     return Unsafe.getUnsafe();
1070     } catch (SecurityException se) {
1071     try {
1072     return java.security.AccessController.doPrivileged
1073     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1074     public Unsafe run() throws Exception {
1075     return getUnsafePrivileged();
1076     }});
1077     } catch (java.security.PrivilegedActionException e) {
1078     throw e.getCause();
1079     }
1080     }
1081     }
1082    
1083     private static Unsafe getUnsafePrivileged()
1084     throws NoSuchFieldException, IllegalAccessException {
1085     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1086     f.setAccessible(true);
1087     return (Unsafe) f.get(null);
1088     }
1089    
1090     private static long fieldOffset(String fieldName)
1091     throws NoSuchFieldException {
1092 jsr166 1.11 return UNSAFE.objectFieldOffset
1093 jsr166 1.5 (ForkJoinTask.class.getDeclaredField(fieldName));
1094     }
1095 dl 1.1
1096 jsr166 1.11 static final Unsafe UNSAFE;
1097 dl 1.1 static final long statusOffset;
1098    
1099     static {
1100     try {
1101 jsr166 1.11 UNSAFE = getUnsafe();
1102 jsr166 1.5 statusOffset = fieldOffset("status");
1103     } catch (Throwable e) {
1104     throw new RuntimeException("Could not initialize intrinsics", e);
1105     }
1106 dl 1.1 }
1107    
1108     }