ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.14
Committed: Thu Jul 23 23:07:57 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +68 -60 lines
Log Message:
j.u.c. coding standards

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * Abstract base class for tasks that run within a {@link
17     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18     * lighter weight than a normal thread. Huge numbers of tasks and
19     * subtasks may be hosted by a small number of actual threads in a
20     * ForkJoinPool, at the price of some usage limitations.
21 dl 1.4 *
22 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
23     * {@link ForkJoinPool}. Once started, it will usually in turn start
24     * other subtasks. As indicated by the name of this class, many
25 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
26     * and {@code join}, or derivatives such as
27     * {@code invokeAll}. However, this class also provides a number
28 dl 1.2 * of other methods that can come into play in advanced usages, as
29     * well as extension mechanics that allow support of new forms of
30     * fork/join processing.
31 dl 1.4 *
32 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33     * efficiency of ForkJoinTasks stems from a set of restrictions (that
34     * are only partially statically enforceable) reflecting their
35     * intended use as computational tasks calculating pure functions or
36     * operating on purely isolated objects. The primary coordination
37     * mechanisms are {@link #fork}, that arranges asynchronous execution,
38     * and {@link #join}, that doesn't proceed until the task's result has
39 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
40 dl 1.2 * methods or blocks, and should minimize other blocking
41     * synchronization apart from joining other tasks or using
42 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
43     * fork/join scheduling. Tasks should also not perform blocking IO,
44     * and should ideally access variables that are completely independent
45     * of those accessed by other running tasks. Minor breaches of these
46     * restrictions, for example using shared output streams, may be
47     * tolerable in practice, but frequent use may result in poor
48     * performance, and the potential to indefinitely stall if the number
49 dl 1.2 * of threads not waiting for IO or other external synchronization
50     * becomes exhausted. This usage restriction is in part enforced by
51 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
52 dl 1.2 * to be thrown. However, computations may still encounter unchecked
53 dl 1.1 * exceptions, that are rethrown to callers attempting join
54     * them. These exceptions may additionally include
55     * RejectedExecutionExceptions stemming from internal resource
56     * exhaustion such as failure to allocate internal task queues.
57     *
58 dl 1.2 * <p>The primary method for awaiting completion and extracting
59     * results of a task is {@link #join}, but there are several variants:
60     * The {@link Future#get} methods support interruptible and/or timed
61 jsr166 1.8 * waits for completion and report results using {@code Future}
62 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
63     * execute other tasks while awaiting joins, which is sometimes more
64     * efficient but only applies when all subtasks are known to be
65     * strictly tree-structured. Method {@link #invoke} is semantically
66 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
67 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
68     * of these methods do not extract results or report exceptions. These
69     * may be useful when a set of tasks are being executed, and you need
70     * to delay processing of results or exceptions until all complete.
71 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
72 dl 1.2 * performs the most common form of parallel invocation: forking a set
73     * of tasks and joining them all.
74     *
75     * <p> The ForkJoinTask class is not usually directly subclassed.
76     * Instead, you subclass one of the abstract classes that support a
77     * particular style of fork/join processing. Normally, a concrete
78     * ForkJoinTask subclass declares fields comprising its parameters,
79 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
80 dl 1.2 * method that somehow uses the control methods supplied by this base
81 jsr166 1.8 * class. While these methods have {@code public} access (to allow
82 dl 1.2 * instances of different task subclasses to call each others
83     * methods), some of them may only be called from within other
84 dl 1.13 * ForkJoinTasks (as may be determined using method {@link
85     * #inForkJoinPool}). Attempts to invoke them in other contexts
86 jsr166 1.14 * result in exceptions or errors, possibly including
87 dl 1.13 * ClassCastException.
88 dl 1.1 *
89 jsr166 1.8 * <p>Most base support methods are {@code final} because their
90 dl 1.1 * implementations are intrinsically tied to the underlying
91     * lightweight task scheduling framework, and so cannot be overridden.
92     * Developers creating new basic styles of fork/join processing should
93 jsr166 1.8 * minimally implement {@code protected} methods
94     * {@code exec}, {@code setRawResult}, and
95     * {@code getRawResult}, while also introducing an abstract
96 dl 1.2 * computational method that can be implemented in its subclasses,
97 jsr166 1.8 * possibly relying on other {@code protected} methods provided
98 dl 1.2 * by this class.
99 dl 1.1 *
100     * <p>ForkJoinTasks should perform relatively small amounts of
101 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
102 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
103     * than 10000 basic computational steps. If tasks are too big, then
104 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
105 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
106     *
107 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
108 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
109     * in general sensible to serialize tasks only before or after, but
110 dl 1.1 * not during execution. Serialization is not relied on during
111     * execution itself.
112 jsr166 1.12 *
113     * @since 1.7
114     * @author Doug Lea
115 dl 1.1 */
116     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
117 dl 1.2
118 dl 1.1 /**
119 dl 1.2 * Run control status bits packed into a single int to minimize
120     * footprint and to ensure atomicity (via CAS). Status is
121     * initially zero, and takes on nonnegative values until
122 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
123     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
124     * blocking waits by other threads have SIGNAL_MASK bits set --
125     * bit 15 for external (nonFJ) waits, and the rest a count of
126     * waiting FJ threads. (This representation relies on
127     * ForkJoinPool max thread limits). Completion of a stolen task
128     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
129     * though suboptimal for some purposes, we use basic builtin
130     * wait/notify to take advantage of "monitor inflation" in JVMs
131     * that we would otherwise need to emulate to avoid adding further
132     * per-task bookkeeping overhead. Note that bits 16-28 are
133     * currently unused. Also value 0x80000000 is available as spare
134     * completion value.
135     */
136 jsr166 1.9 volatile int status; // accessed directly by pool and workers
137 dl 1.1
138     static final int COMPLETION_MASK = 0xe0000000;
139     static final int NORMAL = 0xe0000000; // == mask
140     static final int CANCELLED = 0xc0000000;
141     static final int EXCEPTIONAL = 0xa0000000;
142     static final int SIGNAL_MASK = 0x0000ffff;
143     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
144     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
145    
146     /**
147     * Table of exceptions thrown by tasks, to enable reporting by
148     * callers. Because exceptions are rare, we don't directly keep
149 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
150 dl 1.1 * that cancellation exceptions don't appear in the table, but are
151     * instead recorded as status values.
152 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
153 dl 1.1 */
154     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
155     Collections.synchronizedMap
156     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
157    
158     // within-package utilities
159    
160     /**
161 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
162 dl 1.1 */
163     static ForkJoinWorkerThread getWorker() {
164     Thread t = Thread.currentThread();
165 jsr166 1.14 return ((t instanceof ForkJoinWorkerThread) ?
166     (ForkJoinWorkerThread) t : null);
167 dl 1.1 }
168    
169     final boolean casStatus(int cmp, int val) {
170 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
171 dl 1.1 }
172    
173     /**
174     * Workaround for not being able to rethrow unchecked exceptions.
175     */
176     static void rethrowException(Throwable ex) {
177     if (ex != null)
178 jsr166 1.11 UNSAFE.throwException(ex);
179 dl 1.1 }
180    
181     // Setting completion status
182    
183     /**
184 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
185     *
186 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
187     */
188     final void setCompletion(int completion) {
189 dl 1.2 ForkJoinPool pool = getPool();
190 dl 1.1 if (pool != null) {
191     int s; // Clear signal bits while setting completion status
192 jsr166 1.14 do {} while ((s = status) >= 0 && !casStatus(s, completion));
193 dl 1.1
194     if ((s & SIGNAL_MASK) != 0) {
195     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
196     pool.updateRunningCount(s);
197 jsr166 1.14 synchronized (this) { notifyAll(); }
198 dl 1.1 }
199     }
200     else
201     externallySetCompletion(completion);
202     }
203    
204     /**
205     * Version of setCompletion for non-FJ threads. Leaves signal
206     * bits for unblocked threads to adjust, and always notifies.
207     */
208     private void externallySetCompletion(int completion) {
209     int s;
210 jsr166 1.14 do {} while ((s = status) >= 0 &&
211     !casStatus(s, (s & SIGNAL_MASK) | completion));
212     synchronized (this) { notifyAll(); }
213 dl 1.1 }
214    
215     /**
216 jsr166 1.14 * Sets status to indicate normal completion.
217 dl 1.1 */
218     final void setNormalCompletion() {
219     // Try typical fast case -- single CAS, no signal, not already done.
220     // Manually expand casStatus to improve chances of inlining it
221 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
222 dl 1.1 setCompletion(NORMAL);
223     }
224    
225     // internal waiting and notification
226    
227     /**
228 jsr166 1.14 * Performs the actual monitor wait for awaitDone.
229 dl 1.1 */
230     private void doAwaitDone() {
231     // Minimize lock bias and in/de-flation effects by maximizing
232     // chances of waiting inside sync
233     try {
234     while (status >= 0)
235 jsr166 1.14 synchronized (this) { if (status >= 0) wait(); }
236 dl 1.1 } catch (InterruptedException ie) {
237     onInterruptedWait();
238     }
239     }
240    
241     /**
242 jsr166 1.14 * Performs the actual timed monitor wait for awaitDone.
243 dl 1.1 */
244     private void doAwaitDone(long startTime, long nanos) {
245 jsr166 1.14 synchronized (this) {
246 dl 1.1 try {
247     while (status >= 0) {
248     long nt = nanos - System.nanoTime() - startTime;
249     if (nt <= 0)
250     break;
251 jsr166 1.14 wait(nt / 1000000, (int) (nt % 1000000));
252 dl 1.1 }
253     } catch (InterruptedException ie) {
254     onInterruptedWait();
255     }
256     }
257     }
258    
259     // Awaiting completion
260    
261     /**
262     * Sets status to indicate there is joiner, then waits for join,
263     * surrounded with pool notifications.
264 jsr166 1.10 *
265 dl 1.1 * @return status upon exit
266     */
267 jsr166 1.14 private int awaitDone(ForkJoinWorkerThread w,
268     boolean maintainParallelism) {
269     ForkJoinPool pool = (w == null) ? null : w.pool;
270 dl 1.1 int s;
271     while ((s = status) >= 0) {
272 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
273 dl 1.1 if (pool == null || !pool.preJoin(this, maintainParallelism))
274     doAwaitDone();
275     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
276     adjustPoolCountsOnUnblock(pool);
277     break;
278     }
279     }
280     return s;
281     }
282    
283     /**
284     * Timed version of awaitDone
285 jsr166 1.14 *
286 dl 1.1 * @return status upon exit
287     */
288 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
289 jsr166 1.14 ForkJoinPool pool = (w == null) ? null : w.pool;
290 dl 1.1 int s;
291     while ((s = status) >= 0) {
292 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
293 dl 1.1 long startTime = System.nanoTime();
294     if (pool == null || !pool.preJoin(this, false))
295     doAwaitDone(startTime, nanos);
296     if ((s = status) >= 0) {
297     adjustPoolCountsOnCancelledWait(pool);
298     s = status;
299     }
300     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
301     adjustPoolCountsOnUnblock(pool);
302     break;
303     }
304     }
305     return s;
306     }
307    
308     /**
309 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
310 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
311     */
312     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
313     int s;
314 jsr166 1.14 do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
315 dl 1.1 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
316     pool.updateRunningCount(s);
317     }
318    
319     /**
320 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
321 dl 1.1 */
322     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
323     if (pool != null) {
324     int s;
325     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
326     if (casStatus(s, s - 1)) {
327     pool.updateRunningCount(1);
328     break;
329     }
330     }
331     }
332     }
333    
334 dl 1.2 /**
335 jsr166 1.10 * Handles interruptions during waits.
336 dl 1.2 */
337 dl 1.1 private void onInterruptedWait() {
338 dl 1.2 ForkJoinWorkerThread w = getWorker();
339     if (w == null)
340     Thread.currentThread().interrupt(); // re-interrupt
341     else if (w.isTerminating())
342 dl 1.3 cancelIgnoringExceptions();
343 dl 1.2 // else if FJworker, ignore interrupt
344 dl 1.1 }
345    
346     // Recording and reporting exceptions
347    
348     private void setDoneExceptionally(Throwable rex) {
349     exceptionMap.put(this, rex);
350     setCompletion(EXCEPTIONAL);
351     }
352    
353     /**
354 jsr166 1.10 * Throws the exception associated with status s.
355     *
356 dl 1.1 * @throws the exception
357     */
358     private void reportException(int s) {
359     if ((s &= COMPLETION_MASK) < NORMAL) {
360     if (s == CANCELLED)
361     throw new CancellationException();
362     else
363     rethrowException(exceptionMap.get(this));
364     }
365     }
366    
367     /**
368 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
369 jsr166 1.14 * Only call when {@code isDone} known to be true.
370 dl 1.1 */
371     private V reportFutureResult()
372     throws ExecutionException, InterruptedException {
373     int s = status & COMPLETION_MASK;
374     if (s < NORMAL) {
375     Throwable ex;
376     if (s == CANCELLED)
377     throw new CancellationException();
378     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
379     throw new ExecutionException(ex);
380     if (Thread.interrupted())
381     throw new InterruptedException();
382     }
383     return getRawResult();
384     }
385    
386     /**
387     * Returns result or throws exception using j.u.c.Future conventions
388 jsr166 1.10 * with timeouts.
389 dl 1.1 */
390     private V reportTimedFutureResult()
391     throws InterruptedException, ExecutionException, TimeoutException {
392     Throwable ex;
393     int s = status & COMPLETION_MASK;
394     if (s == NORMAL)
395     return getRawResult();
396     if (s == CANCELLED)
397     throw new CancellationException();
398     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
399     throw new ExecutionException(ex);
400     if (Thread.interrupted())
401     throw new InterruptedException();
402     throw new TimeoutException();
403     }
404    
405     // internal execution methods
406    
407     /**
408     * Calls exec, recording completion, and rethrowing exception if
409 jsr166 1.10 * encountered. Caller should normally check status before calling.
410     *
411 dl 1.1 * @return true if completed normally
412     */
413     private boolean tryExec() {
414     try { // try block must contain only call to exec
415     if (!exec())
416     return false;
417     } catch (Throwable rex) {
418     setDoneExceptionally(rex);
419     rethrowException(rex);
420     return false; // not reached
421     }
422     setNormalCompletion();
423     return true;
424     }
425    
426     /**
427     * Main execution method used by worker threads. Invokes
428 jsr166 1.10 * base computation unless already complete.
429 dl 1.1 */
430     final void quietlyExec() {
431     if (status >= 0) {
432     try {
433     if (!exec())
434     return;
435 jsr166 1.14 } catch (Throwable rex) {
436 dl 1.1 setDoneExceptionally(rex);
437     return;
438     }
439     setNormalCompletion();
440     }
441     }
442    
443     /**
444 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
445     * Caller should normally check status before calling.
446     *
447 dl 1.1 * @return true if completed normally
448     */
449     private boolean tryQuietlyInvoke() {
450     try {
451     if (!exec())
452     return false;
453     } catch (Throwable rex) {
454     setDoneExceptionally(rex);
455     return false;
456     }
457     setNormalCompletion();
458     return true;
459     }
460    
461     /**
462 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
463 dl 1.1 */
464 dl 1.3 final void cancelIgnoringExceptions() {
465 dl 1.1 try {
466     cancel(false);
467 jsr166 1.14 } catch (Throwable ignore) {
468 dl 1.1 }
469     }
470    
471 dl 1.3 /**
472     * Main implementation of helpJoin
473     */
474     private int busyJoin(ForkJoinWorkerThread w) {
475     int s;
476     ForkJoinTask<?> t;
477     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
478     t.quietlyExec();
479 jsr166 1.14 return (s >= 0) ? awaitDone(w, false) : s; // block if no work
480 dl 1.3 }
481    
482 dl 1.1 // public methods
483    
484     /**
485     * Arranges to asynchronously execute this task. While it is not
486     * necessarily enforced, it is a usage error to fork a task more
487     * than once unless it has completed and been reinitialized. This
488 dl 1.2 * method may be invoked only from within ForkJoinTask
489 dl 1.13 * computations (as may be determined using method {@link
490     * #inForkJoinPool}). Attempts to invoke in other contexts result
491 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
492 dl 1.1 */
493     public final void fork() {
494 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
495     .pushTask(this);
496 dl 1.1 }
497    
498     /**
499     * Returns the result of the computation when it is ready.
500 jsr166 1.8 * This method differs from {@code get} in that abnormal
501 dl 1.1 * completion results in RuntimeExceptions or Errors, not
502     * ExecutionExceptions.
503     *
504     * @return the computed result
505     */
506     public final V join() {
507     ForkJoinWorkerThread w = getWorker();
508     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
509     reportException(awaitDone(w, true));
510     return getRawResult();
511     }
512    
513     /**
514 dl 1.2 * Commences performing this task, awaits its completion if
515     * necessary, and return its result.
516 jsr166 1.10 *
517 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
518 jsr166 1.10 * exception) if the underlying computation did so
519 dl 1.1 * @return the computed result
520     */
521     public final V invoke() {
522     if (status >= 0 && tryExec())
523     return getRawResult();
524     else
525     return join();
526     }
527    
528     /**
529 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
530 dl 1.2 * both of them or an exception is encountered. This method may be
531 dl 1.13 * invoked only from within ForkJoinTask computations (as may be
532     * determined using method {@link #inForkJoinPool}). Attempts to
533 jsr166 1.14 * invoke in other contexts result in exceptions or errors,
534 dl 1.4 * possibly including ClassCastException.
535 jsr166 1.10 *
536 dl 1.2 * @param t1 one task
537     * @param t2 the other task
538     * @throws NullPointerException if t1 or t2 are null
539 jsr166 1.10 * @throws RuntimeException or Error if either task did so
540 dl 1.1 */
541 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
542     t2.fork();
543     t1.invoke();
544     t2.join();
545 dl 1.1 }
546    
547     /**
548 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
549 dl 1.2 * for all of them. If any task encounters an exception, others
550     * may be cancelled. This method may be invoked only from within
551 dl 1.13 * ForkJoinTask computations (as may be determined using method
552     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
553 jsr166 1.14 * result in exceptions or errors, possibly including
554 dl 1.13 * ClassCastException.
555 jsr166 1.14 *
556 dl 1.2 * @param tasks the array of tasks
557 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
558     * @throws RuntimeException or Error if any task did so
559 dl 1.1 */
560 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
561     Throwable ex = null;
562     int last = tasks.length - 1;
563     for (int i = last; i >= 0; --i) {
564     ForkJoinTask<?> t = tasks[i];
565     if (t == null) {
566     if (ex == null)
567     ex = new NullPointerException();
568     }
569     else if (i != 0)
570     t.fork();
571     else {
572     t.quietlyInvoke();
573     if (ex == null)
574     ex = t.getException();
575     }
576     }
577     for (int i = 1; i <= last; ++i) {
578     ForkJoinTask<?> t = tasks[i];
579     if (t != null) {
580     if (ex != null)
581     t.cancel(false);
582     else {
583     t.quietlyJoin();
584     if (ex == null)
585     ex = t.getException();
586     }
587     }
588 dl 1.1 }
589 dl 1.2 if (ex != null)
590     rethrowException(ex);
591 dl 1.1 }
592    
593     /**
594 dl 1.2 * Forks all tasks in the collection, returning when
595 jsr166 1.8 * {@code isDone} holds for all of them. If any task
596 dl 1.2 * encounters an exception, others may be cancelled. This method
597 dl 1.13 * may be invoked only from within ForkJoinTask computations (as
598     * may be determined using method {@link
599 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
600     * in exceptions or errors, possibly including ClassCastException.
601 jsr166 1.10 *
602 dl 1.2 * @param tasks the collection of tasks
603 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
604     * @throws RuntimeException or Error if any task did so
605 dl 1.1 */
606 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
607     if (!(tasks instanceof List)) {
608 jsr166 1.14 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
609 dl 1.2 return;
610     }
611     List<? extends ForkJoinTask<?>> ts =
612 jsr166 1.14 (List<? extends ForkJoinTask<?>>) tasks;
613 dl 1.2 Throwable ex = null;
614     int last = ts.size() - 1;
615     for (int i = last; i >= 0; --i) {
616     ForkJoinTask<?> t = ts.get(i);
617     if (t == null) {
618     if (ex == null)
619     ex = new NullPointerException();
620     }
621     else if (i != 0)
622     t.fork();
623     else {
624     t.quietlyInvoke();
625     if (ex == null)
626     ex = t.getException();
627     }
628     }
629     for (int i = 1; i <= last; ++i) {
630     ForkJoinTask<?> t = ts.get(i);
631     if (t != null) {
632     if (ex != null)
633     t.cancel(false);
634     else {
635     t.quietlyJoin();
636     if (ex == null)
637     ex = t.getException();
638     }
639     }
640     }
641     if (ex != null)
642     rethrowException(ex);
643 dl 1.1 }
644    
645     /**
646     * Returns true if the computation performed by this task has
647     * completed (or has been cancelled).
648 jsr166 1.10 *
649 dl 1.1 * @return true if this computation has completed
650     */
651     public final boolean isDone() {
652     return status < 0;
653     }
654    
655     /**
656     * Returns true if this task was cancelled.
657 jsr166 1.10 *
658 dl 1.1 * @return true if this task was cancelled
659     */
660     public final boolean isCancelled() {
661     return (status & COMPLETION_MASK) == CANCELLED;
662     }
663    
664     /**
665     * Asserts that the results of this task's computation will not be
666 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
667 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
668     * will report true, and {@code join} will result in a
669     * {@code CancellationException} being thrown. Otherwise, when
670 dl 1.1 * cancellation races with completion, there are no guarantees
671 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
672     * {@code join} will return normally or via an exception, or
673 dl 1.1 * whether these behaviors will remain consistent upon repeated
674     * invocation.
675     *
676     * <p>This method may be overridden in subclasses, but if so, must
677     * still ensure that these minimal properties hold. In particular,
678     * the cancel method itself must not throw exceptions.
679     *
680     * <p> This method is designed to be invoked by <em>other</em>
681     * tasks. To terminate the current task, you can just return or
682     * throw an unchecked exception from its computation method, or
683 jsr166 1.8 * invoke {@code completeExceptionally}.
684 dl 1.1 *
685     * @param mayInterruptIfRunning this value is ignored in the
686     * default implementation because tasks are not in general
687 jsr166 1.14 * cancelled via interruption
688 dl 1.1 *
689     * @return true if this task is now cancelled
690     */
691     public boolean cancel(boolean mayInterruptIfRunning) {
692     setCompletion(CANCELLED);
693     return (status & COMPLETION_MASK) == CANCELLED;
694     }
695    
696     /**
697 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
698     *
699 dl 1.3 * @return true if this task threw an exception or was cancelled
700     */
701     public final boolean isCompletedAbnormally() {
702     return (status & COMPLETION_MASK) < NORMAL;
703     }
704    
705     /**
706     * Returns the exception thrown by the base computation, or a
707     * CancellationException if cancelled, or null if none or if the
708     * method has not yet completed.
709 jsr166 1.10 *
710 dl 1.3 * @return the exception, or null if none
711     */
712     public final Throwable getException() {
713     int s = status & COMPLETION_MASK;
714     if (s >= NORMAL)
715     return null;
716     if (s == CANCELLED)
717     return new CancellationException();
718     return exceptionMap.get(this);
719     }
720    
721     /**
722 dl 1.1 * Completes this task abnormally, and if not already aborted or
723     * cancelled, causes it to throw the given exception upon
724 jsr166 1.8 * {@code join} and related operations. This method may be used
725 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
726 dl 1.2 * completion of tasks that would not otherwise complete. Its use
727     * in other situations is likely to be wrong. This method is
728 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
729 dl 1.2 * implementation to maintain guarantees.
730     *
731 dl 1.1 * @param ex the exception to throw. If this exception is
732     * not a RuntimeException or Error, the actual exception thrown
733     * will be a RuntimeException with cause ex.
734     */
735     public void completeExceptionally(Throwable ex) {
736     setDoneExceptionally((ex instanceof RuntimeException) ||
737 jsr166 1.14 (ex instanceof Error) ? ex :
738 dl 1.1 new RuntimeException(ex));
739     }
740    
741     /**
742     * Completes this task, and if not already aborted or cancelled,
743 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
744 dl 1.1 * operations. This method may be used to provide results for
745     * asynchronous tasks, or to provide alternative handling for
746 dl 1.2 * tasks that would not otherwise complete normally. Its use in
747     * other situations is likely to be wrong. This method is
748 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
749 dl 1.2 * implementation to maintain guarantees.
750 dl 1.1 *
751 jsr166 1.10 * @param value the result value for this task
752 dl 1.1 */
753     public void complete(V value) {
754     try {
755     setRawResult(value);
756 jsr166 1.14 } catch (Throwable rex) {
757 dl 1.1 setDoneExceptionally(rex);
758     return;
759     }
760     setNormalCompletion();
761     }
762    
763 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
764     ForkJoinWorkerThread w = getWorker();
765     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
766     awaitDone(w, true);
767     return reportFutureResult();
768     }
769    
770     public final V get(long timeout, TimeUnit unit)
771     throws InterruptedException, ExecutionException, TimeoutException {
772     ForkJoinWorkerThread w = getWorker();
773     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
774     awaitDone(w, unit.toNanos(timeout));
775     return reportTimedFutureResult();
776     }
777    
778 dl 1.1 /**
779 dl 1.2 * Possibly executes other tasks until this task is ready, then
780     * returns the result of the computation. This method may be more
781 jsr166 1.8 * efficient than {@code join}, but is only applicable when
782 jsr166 1.9 * there are no potential dependencies between continuation of the
783 dl 1.2 * current task and that of any other task that might be executed
784     * while helping. (This usually holds for pure divide-and-conquer
785     * tasks). This method may be invoked only from within
786 dl 1.13 * ForkJoinTask computations (as may be determined using method
787     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
788 jsr166 1.14 * result in exceptions or errors, possibly including
789 dl 1.13 * ClassCastException.
790 jsr166 1.10 *
791 dl 1.2 * @return the computed result
792     */
793     public final V helpJoin() {
794 jsr166 1.14 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
795 dl 1.2 if (status < 0 || !w.unpushTask(this) || !tryExec())
796 dl 1.3 reportException(busyJoin(w));
797 dl 1.2 return getRawResult();
798     }
799    
800     /**
801     * Possibly executes other tasks until this task is ready. This
802     * method may be invoked only from within ForkJoinTask
803 dl 1.13 * computations (as may be determined using method {@link
804 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
805     * in exceptions or errors, possibly including ClassCastException.
806 dl 1.2 */
807     public final void quietlyHelpJoin() {
808     if (status >= 0) {
809     ForkJoinWorkerThread w =
810 jsr166 1.14 (ForkJoinWorkerThread) Thread.currentThread();
811 dl 1.2 if (!w.unpushTask(this) || !tryQuietlyInvoke())
812 dl 1.3 busyJoin(w);
813 dl 1.2 }
814     }
815    
816     /**
817     * Joins this task, without returning its result or throwing an
818     * exception. This method may be useful when processing
819     * collections of tasks when some have been cancelled or otherwise
820     * known to have aborted.
821     */
822     public final void quietlyJoin() {
823     if (status >= 0) {
824     ForkJoinWorkerThread w = getWorker();
825     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
826     awaitDone(w, true);
827     }
828     }
829    
830     /**
831     * Commences performing this task and awaits its completion if
832     * necessary, without returning its result or throwing an
833     * exception. This method may be useful when processing
834     * collections of tasks when some have been cancelled or otherwise
835     * known to have aborted.
836     */
837     public final void quietlyInvoke() {
838     if (status >= 0 && !tryQuietlyInvoke())
839     quietlyJoin();
840     }
841    
842     /**
843 dl 1.3 * Possibly executes tasks until the pool hosting the current task
844     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
845     * designs in which many tasks are forked, but none are explicitly
846     * joined, instead executing them until all are processed.
847     */
848     public static void helpQuiesce() {
849 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
850     .helpQuiescePool();
851 dl 1.3 }
852    
853     /**
854 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
855 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
856 dl 1.1 * this task, but only if reuse occurs when this task has either
857     * never been forked, or has been forked, then completed and all
858     * outstanding joins of this task have also completed. Effects
859     * under any other usage conditions are not guaranteed, and are
860     * almost surely wrong. This method may be useful when executing
861     * pre-constructed trees of subtasks in loops.
862     */
863     public void reinitialize() {
864     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
865     exceptionMap.remove(this);
866     status = 0;
867     }
868    
869     /**
870 dl 1.2 * Returns the pool hosting the current task execution, or null
871 dl 1.13 * if this task is executing outside of any ForkJoinPool.
872 jsr166 1.10 *
873 jsr166 1.14 * @return the pool, or null if none
874 dl 1.1 */
875 dl 1.2 public static ForkJoinPool getPool() {
876     Thread t = Thread.currentThread();
877 jsr166 1.14 return ((t instanceof ForkJoinWorkerThread) ?
878     ((ForkJoinWorkerThread) t).pool : null);
879 dl 1.1 }
880    
881     /**
882 jsr166 1.14 * Returns {@code true} if the current thread is executing as a
883 dl 1.13 * ForkJoinPool computation.
884 jsr166 1.14 *
885     * @return {@code true} if the current thread is executing as a
886 dl 1.13 * ForkJoinPool computation, or false otherwise
887     */
888     public static boolean inForkJoinPool() {
889     return Thread.currentThread() instanceof ForkJoinWorkerThread;
890     }
891    
892     /**
893 dl 1.2 * Tries to unschedule this task for execution. This method will
894     * typically succeed if this task is the most recently forked task
895     * by the current thread, and has not commenced executing in
896     * another thread. This method may be useful when arranging
897     * alternative local processing of tasks that could have been, but
898     * were not, stolen. This method may be invoked only from within
899 dl 1.13 * ForkJoinTask computations (as may be determined using method
900     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
901 jsr166 1.14 * result in exceptions or errors, possibly including
902 dl 1.13 * ClassCastException.
903 jsr166 1.10 *
904 dl 1.2 * @return true if unforked
905 dl 1.1 */
906 dl 1.2 public boolean tryUnfork() {
907 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
908     .unpushTask(this);
909 dl 1.1 }
910    
911     /**
912 dl 1.2 * Returns an estimate of the number of tasks that have been
913     * forked by the current worker thread but not yet executed. This
914     * value may be useful for heuristic decisions about whether to
915     * fork other tasks.
916 jsr166 1.10 *
917 dl 1.2 * @return the number of tasks
918     */
919     public static int getQueuedTaskCount() {
920 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
921     .getQueueSize();
922 dl 1.2 }
923    
924     /**
925 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
926 dl 1.1 * held by the current worker thread than there are other worker
927 dl 1.2 * threads that might steal them. This value may be useful for
928     * heuristic decisions about whether to fork other tasks. In many
929     * usages of ForkJoinTasks, at steady state, each worker should
930     * aim to maintain a small constant surplus (for example, 3) of
931     * tasks, and to process computations locally if this threshold is
932     * exceeded.
933 jsr166 1.10 *
934 dl 1.1 * @return the surplus number of tasks, which may be negative
935     */
936 dl 1.2 public static int getSurplusQueuedTaskCount() {
937 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
938 dl 1.1 .getEstimatedSurplusTaskCount();
939     }
940    
941 dl 1.2 // Extension methods
942 dl 1.1
943     /**
944 jsr166 1.8 * Returns the result that would be returned by {@code join},
945 dl 1.2 * even if this task completed abnormally, or null if this task is
946     * not known to have been completed. This method is designed to
947     * aid debugging, as well as to support extensions. Its use in any
948     * other context is discouraged.
949 dl 1.1 *
950 jsr166 1.10 * @return the result, or null if not completed
951 dl 1.1 */
952     public abstract V getRawResult();
953    
954     /**
955     * Forces the given value to be returned as a result. This method
956     * is designed to support extensions, and should not in general be
957     * called otherwise.
958     *
959     * @param value the value
960     */
961     protected abstract void setRawResult(V value);
962    
963     /**
964     * Immediately performs the base action of this task. This method
965     * is designed to support extensions, and should not in general be
966     * called otherwise. The return value controls whether this task
967     * is considered to be done normally. It may return false in
968     * asynchronous actions that require explicit invocations of
969 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
970 dl 1.1 * to indicate abnormal exit.
971 jsr166 1.10 *
972 dl 1.1 * @return true if completed normally
973     * @throws Error or RuntimeException if encountered during computation
974     */
975     protected abstract boolean exec();
976    
977 dl 1.2 /**
978 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
979     * the current thread but not yet executed, if one is
980     * available. There is no guarantee that this task will actually
981     * be polled or executed next. This method is designed primarily
982     * to support extensions, and is unlikely to be useful otherwise.
983     * This method may be invoked only from within ForkJoinTask
984 dl 1.13 * computations (as may be determined using method {@link
985     * #inForkJoinPool}). Attempts to invoke in other contexts result
986 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
987 dl 1.2 *
988     * @return the next task, or null if none are available
989     */
990     protected static ForkJoinTask<?> peekNextLocalTask() {
991 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
992     .peekTask();
993 dl 1.2 }
994    
995     /**
996 dl 1.6 * Unschedules and returns, without executing, the next task
997     * queued by the current thread but not yet executed. This method
998     * is designed primarily to support extensions, and is unlikely to
999     * be useful otherwise. This method may be invoked only from
1000 dl 1.13 * within ForkJoinTask computations (as may be determined using
1001     * method {@link #inForkJoinPool}). Attempts to invoke in other
1002 jsr166 1.14 * contexts result in exceptions or errors, possibly including
1003 dl 1.6 * ClassCastException.
1004 dl 1.2 *
1005     * @return the next task, or null if none are available
1006     */
1007     protected static ForkJoinTask<?> pollNextLocalTask() {
1008 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1009     .pollLocalTask();
1010 dl 1.2 }
1011 jsr166 1.7
1012 dl 1.2 /**
1013 dl 1.6 * Unschedules and returns, without executing, the next task
1014     * queued by the current thread but not yet executed, if one is
1015     * available, or if not available, a task that was forked by some
1016     * other thread, if available. Availability may be transient, so a
1017 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
1018 dl 1.6 * of the pool this task is operating in. This method is designed
1019     * primarily to support extensions, and is unlikely to be useful
1020     * otherwise. This method may be invoked only from within
1021 dl 1.13 * ForkJoinTask computations (as may be determined using method
1022     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1023 jsr166 1.14 * result in exceptions or errors, possibly including
1024 dl 1.6 * ClassCastException.
1025 dl 1.4 *
1026 dl 1.2 * @return a task, or null if none are available
1027     */
1028     protected static ForkJoinTask<?> pollTask() {
1029 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1030     .pollTask();
1031 dl 1.2 }
1032    
1033 dl 1.1 // Serialization support
1034    
1035     private static final long serialVersionUID = -7721805057305804111L;
1036    
1037     /**
1038     * Save the state to a stream.
1039     *
1040     * @serialData the current run status and the exception thrown
1041 jsr166 1.10 * during execution, or null if none
1042 dl 1.1 * @param s the stream
1043     */
1044     private void writeObject(java.io.ObjectOutputStream s)
1045     throws java.io.IOException {
1046     s.defaultWriteObject();
1047     s.writeObject(getException());
1048     }
1049    
1050     /**
1051     * Reconstitute the instance from a stream.
1052 jsr166 1.10 *
1053 dl 1.1 * @param s the stream
1054     */
1055     private void readObject(java.io.ObjectInputStream s)
1056     throws java.io.IOException, ClassNotFoundException {
1057     s.defaultReadObject();
1058 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1059     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1060 dl 1.1 Object ex = s.readObject();
1061     if (ex != null)
1062 jsr166 1.14 setDoneExceptionally((Throwable) ex);
1063 dl 1.1 }
1064    
1065     // Temporary Unsafe mechanics for preliminary release
1066 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1067     try {
1068     return Unsafe.getUnsafe();
1069     } catch (SecurityException se) {
1070     try {
1071     return java.security.AccessController.doPrivileged
1072     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1073     public Unsafe run() throws Exception {
1074     return getUnsafePrivileged();
1075     }});
1076     } catch (java.security.PrivilegedActionException e) {
1077     throw e.getCause();
1078     }
1079     }
1080     }
1081    
1082     private static Unsafe getUnsafePrivileged()
1083     throws NoSuchFieldException, IllegalAccessException {
1084     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1085     f.setAccessible(true);
1086     return (Unsafe) f.get(null);
1087     }
1088    
1089     private static long fieldOffset(String fieldName)
1090     throws NoSuchFieldException {
1091 jsr166 1.11 return UNSAFE.objectFieldOffset
1092 jsr166 1.5 (ForkJoinTask.class.getDeclaredField(fieldName));
1093     }
1094 dl 1.1
1095 jsr166 1.11 static final Unsafe UNSAFE;
1096 dl 1.1 static final long statusOffset;
1097    
1098     static {
1099     try {
1100 jsr166 1.11 UNSAFE = getUnsafe();
1101 jsr166 1.5 statusOffset = fieldOffset("status");
1102     } catch (Throwable e) {
1103     throw new RuntimeException("Could not initialize intrinsics", e);
1104     }
1105 dl 1.1 }
1106    
1107     }