ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.16
Committed: Fri Jul 24 23:47:01 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.15: +23 -26 lines
Log Message:
Unsafe mechanics

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.io.Serializable;
9     import java.util.*;
10     import java.util.concurrent.*;
11     import java.util.concurrent.atomic.*;
12    
13     /**
14 dl 1.2 * Abstract base class for tasks that run within a {@link
15     * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
16     * lighter weight than a normal thread. Huge numbers of tasks and
17     * subtasks may be hosted by a small number of actual threads in a
18     * ForkJoinPool, at the price of some usage limitations.
19 dl 1.4 *
20 dl 1.2 * <p> A "main" ForkJoinTask begins execution when submitted to a
21     * {@link ForkJoinPool}. Once started, it will usually in turn start
22     * other subtasks. As indicated by the name of this class, many
23 jsr166 1.8 * programs using ForkJoinTasks employ only methods {@code fork}
24     * and {@code join}, or derivatives such as
25     * {@code invokeAll}. However, this class also provides a number
26 dl 1.2 * of other methods that can come into play in advanced usages, as
27     * well as extension mechanics that allow support of new forms of
28     * fork/join processing.
29 dl 1.4 *
30 dl 1.2 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
31     * efficiency of ForkJoinTasks stems from a set of restrictions (that
32     * are only partially statically enforceable) reflecting their
33     * intended use as computational tasks calculating pure functions or
34     * operating on purely isolated objects. The primary coordination
35     * mechanisms are {@link #fork}, that arranges asynchronous execution,
36     * and {@link #join}, that doesn't proceed until the task's result has
37 jsr166 1.8 * been computed. Computations should avoid {@code synchronized}
38 dl 1.2 * methods or blocks, and should minimize other blocking
39     * synchronization apart from joining other tasks or using
40 dl 1.1 * synchronizers such as Phasers that are advertised to cooperate with
41     * fork/join scheduling. Tasks should also not perform blocking IO,
42     * and should ideally access variables that are completely independent
43     * of those accessed by other running tasks. Minor breaches of these
44     * restrictions, for example using shared output streams, may be
45     * tolerable in practice, but frequent use may result in poor
46     * performance, and the potential to indefinitely stall if the number
47 dl 1.2 * of threads not waiting for IO or other external synchronization
48     * becomes exhausted. This usage restriction is in part enforced by
49 jsr166 1.8 * not permitting checked exceptions such as {@code IOExceptions}
50 dl 1.2 * to be thrown. However, computations may still encounter unchecked
51 dl 1.1 * exceptions, that are rethrown to callers attempting join
52     * them. These exceptions may additionally include
53     * RejectedExecutionExceptions stemming from internal resource
54     * exhaustion such as failure to allocate internal task queues.
55     *
56 dl 1.2 * <p>The primary method for awaiting completion and extracting
57     * results of a task is {@link #join}, but there are several variants:
58     * The {@link Future#get} methods support interruptible and/or timed
59 jsr166 1.8 * waits for completion and report results using {@code Future}
60 dl 1.2 * conventions. Method {@link #helpJoin} enables callers to actively
61     * execute other tasks while awaiting joins, which is sometimes more
62     * efficient but only applies when all subtasks are known to be
63     * strictly tree-structured. Method {@link #invoke} is semantically
64 jsr166 1.8 * equivalent to {@code fork(); join()} but always attempts to
65 dl 1.2 * begin execution in the current thread. The "<em>quiet</em>" forms
66     * of these methods do not extract results or report exceptions. These
67     * may be useful when a set of tasks are being executed, and you need
68     * to delay processing of results or exceptions until all complete.
69 jsr166 1.8 * Method {@code invokeAll} (available in multiple versions)
70 dl 1.2 * performs the most common form of parallel invocation: forking a set
71     * of tasks and joining them all.
72     *
73     * <p> The ForkJoinTask class is not usually directly subclassed.
74     * Instead, you subclass one of the abstract classes that support a
75     * particular style of fork/join processing. Normally, a concrete
76     * ForkJoinTask subclass declares fields comprising its parameters,
77 jsr166 1.8 * established in a constructor, and then defines a {@code compute}
78 dl 1.2 * method that somehow uses the control methods supplied by this base
79 jsr166 1.8 * class. While these methods have {@code public} access (to allow
80 dl 1.2 * instances of different task subclasses to call each others
81     * methods), some of them may only be called from within other
82 dl 1.13 * ForkJoinTasks (as may be determined using method {@link
83     * #inForkJoinPool}). Attempts to invoke them in other contexts
84 jsr166 1.14 * result in exceptions or errors, possibly including
85 dl 1.13 * ClassCastException.
86 dl 1.1 *
87 jsr166 1.8 * <p>Most base support methods are {@code final} because their
88 dl 1.1 * implementations are intrinsically tied to the underlying
89     * lightweight task scheduling framework, and so cannot be overridden.
90     * Developers creating new basic styles of fork/join processing should
91 jsr166 1.8 * minimally implement {@code protected} methods
92     * {@code exec}, {@code setRawResult}, and
93     * {@code getRawResult}, while also introducing an abstract
94 dl 1.2 * computational method that can be implemented in its subclasses,
95 jsr166 1.8 * possibly relying on other {@code protected} methods provided
96 dl 1.2 * by this class.
97 dl 1.1 *
98     * <p>ForkJoinTasks should perform relatively small amounts of
99 jsr166 1.9 * computations, otherwise splitting into smaller tasks. As a very
100 dl 1.1 * rough rule of thumb, a task should perform more than 100 and less
101     * than 10000 basic computational steps. If tasks are too big, then
102 jsr166 1.9 * parallelism cannot improve throughput. If too small, then memory
103 dl 1.1 * and internal task maintenance overhead may overwhelm processing.
104     *
105 jsr166 1.8 * <p>ForkJoinTasks are {@code Serializable}, which enables them
106 dl 1.2 * to be used in extensions such as remote execution frameworks. It is
107     * in general sensible to serialize tasks only before or after, but
108 dl 1.1 * not during execution. Serialization is not relied on during
109     * execution itself.
110 jsr166 1.12 *
111     * @since 1.7
112     * @author Doug Lea
113 dl 1.1 */
114     public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
115 dl 1.2
116 dl 1.1 /**
117 dl 1.2 * Run control status bits packed into a single int to minimize
118     * footprint and to ensure atomicity (via CAS). Status is
119     * initially zero, and takes on nonnegative values until
120 dl 1.1 * completed, upon which status holds COMPLETED. CANCELLED, or
121     * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
122     * blocking waits by other threads have SIGNAL_MASK bits set --
123     * bit 15 for external (nonFJ) waits, and the rest a count of
124     * waiting FJ threads. (This representation relies on
125     * ForkJoinPool max thread limits). Completion of a stolen task
126     * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
127     * though suboptimal for some purposes, we use basic builtin
128     * wait/notify to take advantage of "monitor inflation" in JVMs
129     * that we would otherwise need to emulate to avoid adding further
130     * per-task bookkeeping overhead. Note that bits 16-28 are
131     * currently unused. Also value 0x80000000 is available as spare
132     * completion value.
133     */
134 jsr166 1.9 volatile int status; // accessed directly by pool and workers
135 dl 1.1
136     static final int COMPLETION_MASK = 0xe0000000;
137     static final int NORMAL = 0xe0000000; // == mask
138     static final int CANCELLED = 0xc0000000;
139     static final int EXCEPTIONAL = 0xa0000000;
140     static final int SIGNAL_MASK = 0x0000ffff;
141     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
142     static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
143    
144     /**
145     * Table of exceptions thrown by tasks, to enable reporting by
146     * callers. Because exceptions are rare, we don't directly keep
147 jsr166 1.10 * them with task objects, but instead use a weak ref table. Note
148 dl 1.1 * that cancellation exceptions don't appear in the table, but are
149     * instead recorded as status values.
150 jsr166 1.10 * TODO: Use ConcurrentReferenceHashMap
151 dl 1.1 */
152     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
153     Collections.synchronizedMap
154     (new WeakHashMap<ForkJoinTask<?>, Throwable>());
155    
156     // within-package utilities
157    
158     /**
159 jsr166 1.10 * Gets current worker thread, or null if not a worker thread.
160 dl 1.1 */
161     static ForkJoinWorkerThread getWorker() {
162     Thread t = Thread.currentThread();
163 jsr166 1.14 return ((t instanceof ForkJoinWorkerThread) ?
164     (ForkJoinWorkerThread) t : null);
165 dl 1.1 }
166    
167     final boolean casStatus(int cmp, int val) {
168 jsr166 1.11 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
169 dl 1.1 }
170    
171     /**
172     * Workaround for not being able to rethrow unchecked exceptions.
173     */
174     static void rethrowException(Throwable ex) {
175     if (ex != null)
176 jsr166 1.11 UNSAFE.throwException(ex);
177 dl 1.1 }
178    
179     // Setting completion status
180    
181     /**
182 jsr166 1.10 * Marks completion and wakes up threads waiting to join this task.
183     *
184 dl 1.1 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
185     */
186     final void setCompletion(int completion) {
187 dl 1.2 ForkJoinPool pool = getPool();
188 dl 1.1 if (pool != null) {
189     int s; // Clear signal bits while setting completion status
190 jsr166 1.14 do {} while ((s = status) >= 0 && !casStatus(s, completion));
191 dl 1.1
192     if ((s & SIGNAL_MASK) != 0) {
193     if ((s &= INTERNAL_SIGNAL_MASK) != 0)
194     pool.updateRunningCount(s);
195 jsr166 1.14 synchronized (this) { notifyAll(); }
196 dl 1.1 }
197     }
198     else
199     externallySetCompletion(completion);
200     }
201    
202     /**
203     * Version of setCompletion for non-FJ threads. Leaves signal
204     * bits for unblocked threads to adjust, and always notifies.
205     */
206     private void externallySetCompletion(int completion) {
207     int s;
208 jsr166 1.14 do {} while ((s = status) >= 0 &&
209     !casStatus(s, (s & SIGNAL_MASK) | completion));
210     synchronized (this) { notifyAll(); }
211 dl 1.1 }
212    
213     /**
214 jsr166 1.14 * Sets status to indicate normal completion.
215 dl 1.1 */
216     final void setNormalCompletion() {
217     // Try typical fast case -- single CAS, no signal, not already done.
218     // Manually expand casStatus to improve chances of inlining it
219 jsr166 1.11 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
220 dl 1.1 setCompletion(NORMAL);
221     }
222    
223     // internal waiting and notification
224    
225     /**
226 jsr166 1.14 * Performs the actual monitor wait for awaitDone.
227 dl 1.1 */
228     private void doAwaitDone() {
229     // Minimize lock bias and in/de-flation effects by maximizing
230     // chances of waiting inside sync
231     try {
232     while (status >= 0)
233 jsr166 1.14 synchronized (this) { if (status >= 0) wait(); }
234 dl 1.1 } catch (InterruptedException ie) {
235     onInterruptedWait();
236     }
237     }
238    
239     /**
240 jsr166 1.14 * Performs the actual timed monitor wait for awaitDone.
241 dl 1.1 */
242     private void doAwaitDone(long startTime, long nanos) {
243 jsr166 1.14 synchronized (this) {
244 dl 1.1 try {
245     while (status >= 0) {
246     long nt = nanos - System.nanoTime() - startTime;
247     if (nt <= 0)
248     break;
249 jsr166 1.14 wait(nt / 1000000, (int) (nt % 1000000));
250 dl 1.1 }
251     } catch (InterruptedException ie) {
252     onInterruptedWait();
253     }
254     }
255     }
256    
257     // Awaiting completion
258    
259     /**
260     * Sets status to indicate there is joiner, then waits for join,
261     * surrounded with pool notifications.
262 jsr166 1.10 *
263 dl 1.1 * @return status upon exit
264     */
265 jsr166 1.14 private int awaitDone(ForkJoinWorkerThread w,
266     boolean maintainParallelism) {
267     ForkJoinPool pool = (w == null) ? null : w.pool;
268 dl 1.1 int s;
269     while ((s = status) >= 0) {
270 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
271 dl 1.1 if (pool == null || !pool.preJoin(this, maintainParallelism))
272     doAwaitDone();
273     if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
274     adjustPoolCountsOnUnblock(pool);
275     break;
276     }
277     }
278     return s;
279     }
280    
281     /**
282     * Timed version of awaitDone
283 jsr166 1.14 *
284 dl 1.1 * @return status upon exit
285     */
286 dl 1.3 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
287 jsr166 1.14 ForkJoinPool pool = (w == null) ? null : w.pool;
288 dl 1.1 int s;
289     while ((s = status) >= 0) {
290 jsr166 1.14 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
291 dl 1.1 long startTime = System.nanoTime();
292     if (pool == null || !pool.preJoin(this, false))
293     doAwaitDone(startTime, nanos);
294     if ((s = status) >= 0) {
295     adjustPoolCountsOnCancelledWait(pool);
296     s = status;
297     }
298     if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
299     adjustPoolCountsOnUnblock(pool);
300     break;
301     }
302     }
303     return s;
304     }
305    
306     /**
307 jsr166 1.10 * Notifies pool that thread is unblocked. Called by signalled
308 dl 1.1 * threads when woken by non-FJ threads (which is atypical).
309     */
310     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
311     int s;
312 jsr166 1.14 do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
313 dl 1.1 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
314     pool.updateRunningCount(s);
315     }
316    
317     /**
318 jsr166 1.10 * Notifies pool to adjust counts on cancelled or timed out wait.
319 dl 1.1 */
320     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
321     if (pool != null) {
322     int s;
323     while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
324     if (casStatus(s, s - 1)) {
325     pool.updateRunningCount(1);
326     break;
327     }
328     }
329     }
330     }
331    
332 dl 1.2 /**
333 jsr166 1.10 * Handles interruptions during waits.
334 dl 1.2 */
335 dl 1.1 private void onInterruptedWait() {
336 dl 1.2 ForkJoinWorkerThread w = getWorker();
337     if (w == null)
338     Thread.currentThread().interrupt(); // re-interrupt
339     else if (w.isTerminating())
340 dl 1.3 cancelIgnoringExceptions();
341 dl 1.2 // else if FJworker, ignore interrupt
342 dl 1.1 }
343    
344     // Recording and reporting exceptions
345    
346     private void setDoneExceptionally(Throwable rex) {
347     exceptionMap.put(this, rex);
348     setCompletion(EXCEPTIONAL);
349     }
350    
351     /**
352 jsr166 1.10 * Throws the exception associated with status s.
353     *
354 dl 1.1 * @throws the exception
355     */
356     private void reportException(int s) {
357     if ((s &= COMPLETION_MASK) < NORMAL) {
358     if (s == CANCELLED)
359     throw new CancellationException();
360     else
361     rethrowException(exceptionMap.get(this));
362     }
363     }
364    
365     /**
366 jsr166 1.10 * Returns result or throws exception using j.u.c.Future conventions.
367 jsr166 1.14 * Only call when {@code isDone} known to be true.
368 dl 1.1 */
369     private V reportFutureResult()
370     throws ExecutionException, InterruptedException {
371     int s = status & COMPLETION_MASK;
372     if (s < NORMAL) {
373     Throwable ex;
374     if (s == CANCELLED)
375     throw new CancellationException();
376     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
377     throw new ExecutionException(ex);
378     if (Thread.interrupted())
379     throw new InterruptedException();
380     }
381     return getRawResult();
382     }
383    
384     /**
385     * Returns result or throws exception using j.u.c.Future conventions
386 jsr166 1.10 * with timeouts.
387 dl 1.1 */
388     private V reportTimedFutureResult()
389     throws InterruptedException, ExecutionException, TimeoutException {
390     Throwable ex;
391     int s = status & COMPLETION_MASK;
392     if (s == NORMAL)
393     return getRawResult();
394     if (s == CANCELLED)
395     throw new CancellationException();
396     if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
397     throw new ExecutionException(ex);
398     if (Thread.interrupted())
399     throw new InterruptedException();
400     throw new TimeoutException();
401     }
402    
403     // internal execution methods
404    
405     /**
406     * Calls exec, recording completion, and rethrowing exception if
407 jsr166 1.10 * encountered. Caller should normally check status before calling.
408     *
409 dl 1.1 * @return true if completed normally
410     */
411     private boolean tryExec() {
412     try { // try block must contain only call to exec
413     if (!exec())
414     return false;
415     } catch (Throwable rex) {
416     setDoneExceptionally(rex);
417     rethrowException(rex);
418     return false; // not reached
419     }
420     setNormalCompletion();
421     return true;
422     }
423    
424     /**
425     * Main execution method used by worker threads. Invokes
426 jsr166 1.10 * base computation unless already complete.
427 dl 1.1 */
428     final void quietlyExec() {
429     if (status >= 0) {
430     try {
431     if (!exec())
432     return;
433 jsr166 1.14 } catch (Throwable rex) {
434 dl 1.1 setDoneExceptionally(rex);
435     return;
436     }
437     setNormalCompletion();
438     }
439     }
440    
441     /**
442 jsr166 1.10 * Calls exec(), recording but not rethrowing exception.
443     * Caller should normally check status before calling.
444     *
445 dl 1.1 * @return true if completed normally
446     */
447     private boolean tryQuietlyInvoke() {
448     try {
449     if (!exec())
450     return false;
451     } catch (Throwable rex) {
452     setDoneExceptionally(rex);
453     return false;
454     }
455     setNormalCompletion();
456     return true;
457     }
458    
459     /**
460 jsr166 1.10 * Cancels, ignoring any exceptions it throws.
461 dl 1.1 */
462 dl 1.3 final void cancelIgnoringExceptions() {
463 dl 1.1 try {
464     cancel(false);
465 jsr166 1.14 } catch (Throwable ignore) {
466 dl 1.1 }
467     }
468    
469 dl 1.3 /**
470     * Main implementation of helpJoin
471     */
472     private int busyJoin(ForkJoinWorkerThread w) {
473     int s;
474     ForkJoinTask<?> t;
475     while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
476     t.quietlyExec();
477 jsr166 1.14 return (s >= 0) ? awaitDone(w, false) : s; // block if no work
478 dl 1.3 }
479    
480 dl 1.1 // public methods
481    
482     /**
483     * Arranges to asynchronously execute this task. While it is not
484     * necessarily enforced, it is a usage error to fork a task more
485     * than once unless it has completed and been reinitialized. This
486 dl 1.2 * method may be invoked only from within ForkJoinTask
487 dl 1.13 * computations (as may be determined using method {@link
488     * #inForkJoinPool}). Attempts to invoke in other contexts result
489 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
490 dl 1.1 */
491     public final void fork() {
492 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
493     .pushTask(this);
494 dl 1.1 }
495    
496     /**
497     * Returns the result of the computation when it is ready.
498 jsr166 1.8 * This method differs from {@code get} in that abnormal
499 dl 1.1 * completion results in RuntimeExceptions or Errors, not
500     * ExecutionExceptions.
501     *
502     * @return the computed result
503     */
504     public final V join() {
505     ForkJoinWorkerThread w = getWorker();
506     if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
507     reportException(awaitDone(w, true));
508     return getRawResult();
509     }
510    
511     /**
512 dl 1.2 * Commences performing this task, awaits its completion if
513     * necessary, and return its result.
514 jsr166 1.10 *
515 dl 1.1 * @throws Throwable (a RuntimeException, Error, or unchecked
516 jsr166 1.10 * exception) if the underlying computation did so
517 dl 1.1 * @return the computed result
518     */
519     public final V invoke() {
520     if (status >= 0 && tryExec())
521     return getRawResult();
522     else
523     return join();
524     }
525    
526     /**
527 jsr166 1.8 * Forks both tasks, returning when {@code isDone} holds for
528 dl 1.2 * both of them or an exception is encountered. This method may be
529 dl 1.13 * invoked only from within ForkJoinTask computations (as may be
530     * determined using method {@link #inForkJoinPool}). Attempts to
531 jsr166 1.14 * invoke in other contexts result in exceptions or errors,
532 dl 1.4 * possibly including ClassCastException.
533 jsr166 1.10 *
534 dl 1.2 * @param t1 one task
535     * @param t2 the other task
536     * @throws NullPointerException if t1 or t2 are null
537 jsr166 1.10 * @throws RuntimeException or Error if either task did so
538 dl 1.1 */
539 dl 1.2 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
540     t2.fork();
541     t1.invoke();
542     t2.join();
543 dl 1.1 }
544    
545     /**
546 jsr166 1.8 * Forks the given tasks, returning when {@code isDone} holds
547 dl 1.2 * for all of them. If any task encounters an exception, others
548     * may be cancelled. This method may be invoked only from within
549 dl 1.13 * ForkJoinTask computations (as may be determined using method
550     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
551 jsr166 1.14 * result in exceptions or errors, possibly including
552 dl 1.13 * ClassCastException.
553 jsr166 1.14 *
554 dl 1.2 * @param tasks the array of tasks
555 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
556     * @throws RuntimeException or Error if any task did so
557 dl 1.1 */
558 dl 1.2 public static void invokeAll(ForkJoinTask<?>... tasks) {
559     Throwable ex = null;
560     int last = tasks.length - 1;
561     for (int i = last; i >= 0; --i) {
562     ForkJoinTask<?> t = tasks[i];
563     if (t == null) {
564     if (ex == null)
565     ex = new NullPointerException();
566     }
567     else if (i != 0)
568     t.fork();
569     else {
570     t.quietlyInvoke();
571     if (ex == null)
572     ex = t.getException();
573     }
574     }
575     for (int i = 1; i <= last; ++i) {
576     ForkJoinTask<?> t = tasks[i];
577     if (t != null) {
578     if (ex != null)
579     t.cancel(false);
580     else {
581     t.quietlyJoin();
582     if (ex == null)
583     ex = t.getException();
584     }
585     }
586 dl 1.1 }
587 dl 1.2 if (ex != null)
588     rethrowException(ex);
589 dl 1.1 }
590    
591     /**
592 dl 1.2 * Forks all tasks in the collection, returning when
593 jsr166 1.8 * {@code isDone} holds for all of them. If any task
594 dl 1.2 * encounters an exception, others may be cancelled. This method
595 dl 1.13 * may be invoked only from within ForkJoinTask computations (as
596     * may be determined using method {@link
597 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
598     * in exceptions or errors, possibly including ClassCastException.
599 jsr166 1.10 *
600 dl 1.2 * @param tasks the collection of tasks
601 jsr166 1.10 * @throws NullPointerException if tasks or any element are null
602     * @throws RuntimeException or Error if any task did so
603 dl 1.1 */
604 dl 1.2 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
605 jsr166 1.15 if (!(tasks instanceof List<?>)) {
606 jsr166 1.14 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
607 dl 1.2 return;
608     }
609 jsr166 1.15 @SuppressWarnings("unchecked")
610 dl 1.2 List<? extends ForkJoinTask<?>> ts =
611 jsr166 1.14 (List<? extends ForkJoinTask<?>>) tasks;
612 dl 1.2 Throwable ex = null;
613     int last = ts.size() - 1;
614     for (int i = last; i >= 0; --i) {
615     ForkJoinTask<?> t = ts.get(i);
616     if (t == null) {
617     if (ex == null)
618     ex = new NullPointerException();
619     }
620     else if (i != 0)
621     t.fork();
622     else {
623     t.quietlyInvoke();
624     if (ex == null)
625     ex = t.getException();
626     }
627     }
628     for (int i = 1; i <= last; ++i) {
629     ForkJoinTask<?> t = ts.get(i);
630     if (t != null) {
631     if (ex != null)
632     t.cancel(false);
633     else {
634     t.quietlyJoin();
635     if (ex == null)
636     ex = t.getException();
637     }
638     }
639     }
640     if (ex != null)
641     rethrowException(ex);
642 dl 1.1 }
643    
644     /**
645     * Returns true if the computation performed by this task has
646     * completed (or has been cancelled).
647 jsr166 1.10 *
648 dl 1.1 * @return true if this computation has completed
649     */
650     public final boolean isDone() {
651     return status < 0;
652     }
653    
654     /**
655     * Returns true if this task was cancelled.
656 jsr166 1.10 *
657 dl 1.1 * @return true if this task was cancelled
658     */
659     public final boolean isCancelled() {
660     return (status & COMPLETION_MASK) == CANCELLED;
661     }
662    
663     /**
664     * Asserts that the results of this task's computation will not be
665 jsr166 1.9 * used. If a cancellation occurs before attempting to execute this
666 jsr166 1.8 * task, then execution will be suppressed, {@code isCancelled}
667     * will report true, and {@code join} will result in a
668     * {@code CancellationException} being thrown. Otherwise, when
669 dl 1.1 * cancellation races with completion, there are no guarantees
670 jsr166 1.8 * about whether {@code isCancelled} will report true, whether
671     * {@code join} will return normally or via an exception, or
672 dl 1.1 * whether these behaviors will remain consistent upon repeated
673     * invocation.
674     *
675     * <p>This method may be overridden in subclasses, but if so, must
676     * still ensure that these minimal properties hold. In particular,
677     * the cancel method itself must not throw exceptions.
678     *
679     * <p> This method is designed to be invoked by <em>other</em>
680     * tasks. To terminate the current task, you can just return or
681     * throw an unchecked exception from its computation method, or
682 jsr166 1.8 * invoke {@code completeExceptionally}.
683 dl 1.1 *
684     * @param mayInterruptIfRunning this value is ignored in the
685     * default implementation because tasks are not in general
686 jsr166 1.14 * cancelled via interruption
687 dl 1.1 *
688     * @return true if this task is now cancelled
689     */
690     public boolean cancel(boolean mayInterruptIfRunning) {
691     setCompletion(CANCELLED);
692     return (status & COMPLETION_MASK) == CANCELLED;
693     }
694    
695     /**
696 jsr166 1.10 * Returns true if this task threw an exception or was cancelled.
697     *
698 dl 1.3 * @return true if this task threw an exception or was cancelled
699     */
700     public final boolean isCompletedAbnormally() {
701     return (status & COMPLETION_MASK) < NORMAL;
702     }
703    
704     /**
705     * Returns the exception thrown by the base computation, or a
706     * CancellationException if cancelled, or null if none or if the
707     * method has not yet completed.
708 jsr166 1.10 *
709 dl 1.3 * @return the exception, or null if none
710     */
711     public final Throwable getException() {
712     int s = status & COMPLETION_MASK;
713     if (s >= NORMAL)
714     return null;
715     if (s == CANCELLED)
716     return new CancellationException();
717     return exceptionMap.get(this);
718     }
719    
720     /**
721 dl 1.1 * Completes this task abnormally, and if not already aborted or
722     * cancelled, causes it to throw the given exception upon
723 jsr166 1.8 * {@code join} and related operations. This method may be used
724 dl 1.1 * to induce exceptions in asynchronous tasks, or to force
725 dl 1.2 * completion of tasks that would not otherwise complete. Its use
726     * in other situations is likely to be wrong. This method is
727 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
728 dl 1.2 * implementation to maintain guarantees.
729     *
730 dl 1.1 * @param ex the exception to throw. If this exception is
731     * not a RuntimeException or Error, the actual exception thrown
732     * will be a RuntimeException with cause ex.
733     */
734     public void completeExceptionally(Throwable ex) {
735     setDoneExceptionally((ex instanceof RuntimeException) ||
736 jsr166 1.14 (ex instanceof Error) ? ex :
737 dl 1.1 new RuntimeException(ex));
738     }
739    
740     /**
741     * Completes this task, and if not already aborted or cancelled,
742 jsr166 1.8 * returning a {@code null} result upon {@code join} and related
743 dl 1.1 * operations. This method may be used to provide results for
744     * asynchronous tasks, or to provide alternative handling for
745 dl 1.2 * tasks that would not otherwise complete normally. Its use in
746     * other situations is likely to be wrong. This method is
747 jsr166 1.8 * overridable, but overridden versions must invoke {@code super}
748 dl 1.2 * implementation to maintain guarantees.
749 dl 1.1 *
750 jsr166 1.10 * @param value the result value for this task
751 dl 1.1 */
752     public void complete(V value) {
753     try {
754     setRawResult(value);
755 jsr166 1.14 } catch (Throwable rex) {
756 dl 1.1 setDoneExceptionally(rex);
757     return;
758     }
759     setNormalCompletion();
760     }
761    
762 dl 1.3 public final V get() throws InterruptedException, ExecutionException {
763     ForkJoinWorkerThread w = getWorker();
764     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
765     awaitDone(w, true);
766     return reportFutureResult();
767     }
768    
769     public final V get(long timeout, TimeUnit unit)
770     throws InterruptedException, ExecutionException, TimeoutException {
771     ForkJoinWorkerThread w = getWorker();
772     if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
773     awaitDone(w, unit.toNanos(timeout));
774     return reportTimedFutureResult();
775     }
776    
777 dl 1.1 /**
778 dl 1.2 * Possibly executes other tasks until this task is ready, then
779     * returns the result of the computation. This method may be more
780 jsr166 1.8 * efficient than {@code join}, but is only applicable when
781 jsr166 1.9 * there are no potential dependencies between continuation of the
782 dl 1.2 * current task and that of any other task that might be executed
783     * while helping. (This usually holds for pure divide-and-conquer
784     * tasks). This method may be invoked only from within
785 dl 1.13 * ForkJoinTask computations (as may be determined using method
786     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
787 jsr166 1.14 * result in exceptions or errors, possibly including
788 dl 1.13 * ClassCastException.
789 jsr166 1.10 *
790 dl 1.2 * @return the computed result
791     */
792     public final V helpJoin() {
793 jsr166 1.14 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
794 dl 1.2 if (status < 0 || !w.unpushTask(this) || !tryExec())
795 dl 1.3 reportException(busyJoin(w));
796 dl 1.2 return getRawResult();
797     }
798    
799     /**
800     * Possibly executes other tasks until this task is ready. This
801     * method may be invoked only from within ForkJoinTask
802 dl 1.13 * computations (as may be determined using method {@link
803 jsr166 1.14 * #inForkJoinPool}). Attempts to invoke in other contexts result
804     * in exceptions or errors, possibly including ClassCastException.
805 dl 1.2 */
806     public final void quietlyHelpJoin() {
807     if (status >= 0) {
808     ForkJoinWorkerThread w =
809 jsr166 1.14 (ForkJoinWorkerThread) Thread.currentThread();
810 dl 1.2 if (!w.unpushTask(this) || !tryQuietlyInvoke())
811 dl 1.3 busyJoin(w);
812 dl 1.2 }
813     }
814    
815     /**
816     * Joins this task, without returning its result or throwing an
817     * exception. This method may be useful when processing
818     * collections of tasks when some have been cancelled or otherwise
819     * known to have aborted.
820     */
821     public final void quietlyJoin() {
822     if (status >= 0) {
823     ForkJoinWorkerThread w = getWorker();
824     if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
825     awaitDone(w, true);
826     }
827     }
828    
829     /**
830     * Commences performing this task and awaits its completion if
831     * necessary, without returning its result or throwing an
832     * exception. This method may be useful when processing
833     * collections of tasks when some have been cancelled or otherwise
834     * known to have aborted.
835     */
836     public final void quietlyInvoke() {
837     if (status >= 0 && !tryQuietlyInvoke())
838     quietlyJoin();
839     }
840    
841     /**
842 dl 1.3 * Possibly executes tasks until the pool hosting the current task
843     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
844     * designs in which many tasks are forked, but none are explicitly
845     * joined, instead executing them until all are processed.
846     */
847     public static void helpQuiesce() {
848 jsr166 1.14 ((ForkJoinWorkerThread) Thread.currentThread())
849     .helpQuiescePool();
850 dl 1.3 }
851    
852     /**
853 dl 1.1 * Resets the internal bookkeeping state of this task, allowing a
854 jsr166 1.8 * subsequent {@code fork}. This method allows repeated reuse of
855 dl 1.1 * this task, but only if reuse occurs when this task has either
856     * never been forked, or has been forked, then completed and all
857     * outstanding joins of this task have also completed. Effects
858     * under any other usage conditions are not guaranteed, and are
859     * almost surely wrong. This method may be useful when executing
860     * pre-constructed trees of subtasks in loops.
861     */
862     public void reinitialize() {
863     if ((status & COMPLETION_MASK) == EXCEPTIONAL)
864     exceptionMap.remove(this);
865     status = 0;
866     }
867    
868     /**
869 dl 1.2 * Returns the pool hosting the current task execution, or null
870 dl 1.13 * if this task is executing outside of any ForkJoinPool.
871 jsr166 1.10 *
872 jsr166 1.14 * @return the pool, or null if none
873 dl 1.1 */
874 dl 1.2 public static ForkJoinPool getPool() {
875     Thread t = Thread.currentThread();
876 jsr166 1.15 return (t instanceof ForkJoinWorkerThread) ?
877     ((ForkJoinWorkerThread) t).pool : null;
878 dl 1.1 }
879    
880     /**
881 jsr166 1.14 * Returns {@code true} if the current thread is executing as a
882 dl 1.13 * ForkJoinPool computation.
883 jsr166 1.14 *
884     * @return {@code true} if the current thread is executing as a
885 dl 1.13 * ForkJoinPool computation, or false otherwise
886     */
887     public static boolean inForkJoinPool() {
888     return Thread.currentThread() instanceof ForkJoinWorkerThread;
889     }
890    
891     /**
892 dl 1.2 * Tries to unschedule this task for execution. This method will
893     * typically succeed if this task is the most recently forked task
894     * by the current thread, and has not commenced executing in
895     * another thread. This method may be useful when arranging
896     * alternative local processing of tasks that could have been, but
897     * were not, stolen. This method may be invoked only from within
898 dl 1.13 * ForkJoinTask computations (as may be determined using method
899     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
900 jsr166 1.14 * result in exceptions or errors, possibly including
901 dl 1.13 * ClassCastException.
902 jsr166 1.10 *
903 dl 1.2 * @return true if unforked
904 dl 1.1 */
905 dl 1.2 public boolean tryUnfork() {
906 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
907     .unpushTask(this);
908 dl 1.1 }
909    
910     /**
911 dl 1.2 * Returns an estimate of the number of tasks that have been
912     * forked by the current worker thread but not yet executed. This
913     * value may be useful for heuristic decisions about whether to
914     * fork other tasks.
915 jsr166 1.10 *
916 dl 1.2 * @return the number of tasks
917     */
918     public static int getQueuedTaskCount() {
919 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
920     .getQueueSize();
921 dl 1.2 }
922    
923     /**
924 jsr166 1.10 * Returns an estimate of how many more locally queued tasks are
925 dl 1.1 * held by the current worker thread than there are other worker
926 dl 1.2 * threads that might steal them. This value may be useful for
927     * heuristic decisions about whether to fork other tasks. In many
928     * usages of ForkJoinTasks, at steady state, each worker should
929     * aim to maintain a small constant surplus (for example, 3) of
930     * tasks, and to process computations locally if this threshold is
931     * exceeded.
932 jsr166 1.10 *
933 dl 1.1 * @return the surplus number of tasks, which may be negative
934     */
935 dl 1.2 public static int getSurplusQueuedTaskCount() {
936 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
937 dl 1.1 .getEstimatedSurplusTaskCount();
938     }
939    
940 dl 1.2 // Extension methods
941 dl 1.1
942     /**
943 jsr166 1.8 * Returns the result that would be returned by {@code join},
944 dl 1.2 * even if this task completed abnormally, or null if this task is
945     * not known to have been completed. This method is designed to
946     * aid debugging, as well as to support extensions. Its use in any
947     * other context is discouraged.
948 dl 1.1 *
949 jsr166 1.10 * @return the result, or null if not completed
950 dl 1.1 */
951     public abstract V getRawResult();
952    
953     /**
954     * Forces the given value to be returned as a result. This method
955     * is designed to support extensions, and should not in general be
956     * called otherwise.
957     *
958     * @param value the value
959     */
960     protected abstract void setRawResult(V value);
961    
962     /**
963     * Immediately performs the base action of this task. This method
964     * is designed to support extensions, and should not in general be
965     * called otherwise. The return value controls whether this task
966     * is considered to be done normally. It may return false in
967     * asynchronous actions that require explicit invocations of
968 jsr166 1.8 * {@code complete} to become joinable. It may throw exceptions
969 dl 1.1 * to indicate abnormal exit.
970 jsr166 1.10 *
971 dl 1.1 * @return true if completed normally
972     * @throws Error or RuntimeException if encountered during computation
973     */
974     protected abstract boolean exec();
975    
976 dl 1.2 /**
977 dl 1.6 * Returns, but does not unschedule or execute, the task queued by
978     * the current thread but not yet executed, if one is
979     * available. There is no guarantee that this task will actually
980     * be polled or executed next. This method is designed primarily
981     * to support extensions, and is unlikely to be useful otherwise.
982     * This method may be invoked only from within ForkJoinTask
983 dl 1.13 * computations (as may be determined using method {@link
984     * #inForkJoinPool}). Attempts to invoke in other contexts result
985 jsr166 1.14 * in exceptions or errors, possibly including ClassCastException.
986 dl 1.2 *
987     * @return the next task, or null if none are available
988     */
989     protected static ForkJoinTask<?> peekNextLocalTask() {
990 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
991     .peekTask();
992 dl 1.2 }
993    
994     /**
995 dl 1.6 * Unschedules and returns, without executing, the next task
996     * queued by the current thread but not yet executed. This method
997     * is designed primarily to support extensions, and is unlikely to
998     * be useful otherwise. This method may be invoked only from
999 dl 1.13 * within ForkJoinTask computations (as may be determined using
1000     * method {@link #inForkJoinPool}). Attempts to invoke in other
1001 jsr166 1.14 * contexts result in exceptions or errors, possibly including
1002 dl 1.6 * ClassCastException.
1003 dl 1.2 *
1004     * @return the next task, or null if none are available
1005     */
1006     protected static ForkJoinTask<?> pollNextLocalTask() {
1007 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1008     .pollLocalTask();
1009 dl 1.2 }
1010 jsr166 1.7
1011 dl 1.2 /**
1012 dl 1.6 * Unschedules and returns, without executing, the next task
1013     * queued by the current thread but not yet executed, if one is
1014     * available, or if not available, a task that was forked by some
1015     * other thread, if available. Availability may be transient, so a
1016 jsr166 1.9 * {@code null} result does not necessarily imply quiescence
1017 dl 1.6 * of the pool this task is operating in. This method is designed
1018     * primarily to support extensions, and is unlikely to be useful
1019     * otherwise. This method may be invoked only from within
1020 dl 1.13 * ForkJoinTask computations (as may be determined using method
1021     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1022 jsr166 1.14 * result in exceptions or errors, possibly including
1023 dl 1.6 * ClassCastException.
1024 dl 1.4 *
1025 dl 1.2 * @return a task, or null if none are available
1026     */
1027     protected static ForkJoinTask<?> pollTask() {
1028 jsr166 1.14 return ((ForkJoinWorkerThread) Thread.currentThread())
1029     .pollTask();
1030 dl 1.2 }
1031    
1032 dl 1.1 // Serialization support
1033    
1034     private static final long serialVersionUID = -7721805057305804111L;
1035    
1036     /**
1037     * Save the state to a stream.
1038     *
1039     * @serialData the current run status and the exception thrown
1040 jsr166 1.10 * during execution, or null if none
1041 dl 1.1 * @param s the stream
1042     */
1043     private void writeObject(java.io.ObjectOutputStream s)
1044     throws java.io.IOException {
1045     s.defaultWriteObject();
1046     s.writeObject(getException());
1047     }
1048    
1049     /**
1050     * Reconstitute the instance from a stream.
1051 jsr166 1.10 *
1052 dl 1.1 * @param s the stream
1053     */
1054     private void readObject(java.io.ObjectInputStream s)
1055     throws java.io.IOException, ClassNotFoundException {
1056     s.defaultReadObject();
1057 dl 1.2 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1058     status |= EXTERNAL_SIGNAL; // conservatively set external signal
1059 dl 1.1 Object ex = s.readObject();
1060     if (ex != null)
1061 jsr166 1.14 setDoneExceptionally((Throwable) ex);
1062 dl 1.1 }
1063    
1064 jsr166 1.16 // Unsafe mechanics for jsr166y 3rd party package.
1065     private static sun.misc.Unsafe getUnsafe() {
1066 jsr166 1.5 try {
1067 jsr166 1.16 return sun.misc.Unsafe.getUnsafe();
1068 jsr166 1.5 } catch (SecurityException se) {
1069     try {
1070     return java.security.AccessController.doPrivileged
1071 jsr166 1.16 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
1072     public sun.misc.Unsafe run() throws Exception {
1073     return getUnsafeByReflection();
1074 jsr166 1.5 }});
1075     } catch (java.security.PrivilegedActionException e) {
1076 jsr166 1.16 throw new RuntimeException("Could not initialize intrinsics",
1077     e.getCause());
1078 jsr166 1.5 }
1079     }
1080     }
1081    
1082 jsr166 1.16 private static sun.misc.Unsafe getUnsafeByReflection()
1083 jsr166 1.5 throws NoSuchFieldException, IllegalAccessException {
1084 jsr166 1.16 java.lang.reflect.Field f =
1085     sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
1086 jsr166 1.5 f.setAccessible(true);
1087 jsr166 1.16 return (sun.misc.Unsafe) f.get(null);
1088 jsr166 1.5 }
1089    
1090 jsr166 1.16 private static long fieldOffset(String fieldName, Class<?> klazz) {
1091 dl 1.1 try {
1092 jsr166 1.16 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
1093     } catch (NoSuchFieldException e) {
1094     // Convert Exception to Error
1095     NoSuchFieldError error = new NoSuchFieldError(fieldName);
1096     error.initCause(e);
1097     throw error;
1098 jsr166 1.5 }
1099 dl 1.1 }
1100    
1101 jsr166 1.16 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1102     static final long statusOffset =
1103     fieldOffset("status", ForkJoinTask.class);
1104    
1105 dl 1.1 }