ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.2
Committed: Wed Jan 7 16:07:37 2009 UTC (15 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.1: +350 -264 lines
Log Message:
Improved documentaion; moved methods to improve javadoc flow; regularized extension APIs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.io.Serializable;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * Abstract base class for tasks that run within a {@link
17 * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18 * lighter weight than a normal thread. Huge numbers of tasks and
19 * subtasks may be hosted by a small number of actual threads in a
20 * ForkJoinPool, at the price of some usage limitations.
21 *
22 * <p> A "main" ForkJoinTask begins execution when submitted to a
23 * {@link ForkJoinPool}. Once started, it will usually in turn start
24 * other subtasks. As indicated by the name of this class, many
25 * programs using ForkJoinTasks employ only methods <code>fork</code>
26 * and <code>join</code>, or derivatives such as
27 * <code>invokeAll</code>. However, this class also provides a number
28 * of other methods that can come into play in advanced usages, as
29 * well as extension mechanics that allow support of new forms of
30 * fork/join processing.
31 *
32 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33 * efficiency of ForkJoinTasks stems from a set of restrictions (that
34 * are only partially statically enforceable) reflecting their
35 * intended use as computational tasks calculating pure functions or
36 * operating on purely isolated objects. The primary coordination
37 * mechanisms are {@link #fork}, that arranges asynchronous execution,
38 * and {@link #join}, that doesn't proceed until the task's result has
39 * been computed. Computations should avoid <code>synchronized</code>
40 * methods or blocks, and should minimize other blocking
41 * synchronization apart from joining other tasks or using
42 * synchronizers such as Phasers that are advertised to cooperate with
43 * fork/join scheduling. Tasks should also not perform blocking IO,
44 * and should ideally access variables that are completely independent
45 * of those accessed by other running tasks. Minor breaches of these
46 * restrictions, for example using shared output streams, may be
47 * tolerable in practice, but frequent use may result in poor
48 * performance, and the potential to indefinitely stall if the number
49 * of threads not waiting for IO or other external synchronization
50 * becomes exhausted. This usage restriction is in part enforced by
51 * not permitting checked exceptions such as <code>IOExceptions</code>
52 * to be thrown. However, computations may still encounter unchecked
53 * exceptions, that are rethrown to callers attempting join
54 * them. These exceptions may additionally include
55 * RejectedExecutionExceptions stemming from internal resource
56 * exhaustion such as failure to allocate internal task queues.
57 *
58 * <p>The primary method for awaiting completion and extracting
59 * results of a task is {@link #join}, but there are several variants:
60 * The {@link Future#get} methods support interruptible and/or timed
61 * waits for completion and report results using <code>Future</code>
62 * conventions. Method {@link #helpJoin} enables callers to actively
63 * execute other tasks while awaiting joins, which is sometimes more
64 * efficient but only applies when all subtasks are known to be
65 * strictly tree-structured. Method {@link #invoke} is semantically
66 * equivalent to <code>fork(); join()</code> but always attempts to
67 * begin execution in the current thread. The "<em>quiet</em>" forms
68 * of these methods do not extract results or report exceptions. These
69 * may be useful when a set of tasks are being executed, and you need
70 * to delay processing of results or exceptions until all complete.
71 * Method <code>invokeAll</code> (available in multiple versions)
72 * performs the most common form of parallel invocation: forking a set
73 * of tasks and joining them all.
74 *
75 * <p> The ForkJoinTask class is not usually directly subclassed.
76 * Instead, you subclass one of the abstract classes that support a
77 * particular style of fork/join processing. Normally, a concrete
78 * ForkJoinTask subclass declares fields comprising its parameters,
79 * established in a constructor, and then defines a <code>compute</code>
80 * method that somehow uses the control methods supplied by this base
81 * class. While these methods have <code>public</code> access (to allow
82 * instances of different task subclasses to call each others
83 * methods), some of them may only be called from within other
84 * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 * exceptions or errors including ClassCastException.
86 *
87 * <p>Most base support methods are <code>final</code> because their
88 * implementations are intrinsically tied to the underlying
89 * lightweight task scheduling framework, and so cannot be overridden.
90 * Developers creating new basic styles of fork/join processing should
91 * minimally implement <code>protected</code> methods
92 * <code>exec</code>, <code>setRawResult</code>, and
93 * <code>getRawResult</code>, while also introducing an abstract
94 * computational method that can be implemented in its subclasses,
95 * possibly relying on other <code>protected</code> methods provided
96 * by this class.
97 *
98 * <p>ForkJoinTasks should perform relatively small amounts of
99 * computations, othewise splitting into smaller tasks. As a very
100 * rough rule of thumb, a task should perform more than 100 and less
101 * than 10000 basic computational steps. If tasks are too big, then
102 * parellelism cannot improve throughput. If too small, then memory
103 * and internal task maintenance overhead may overwhelm processing.
104 *
105 * <p>ForkJoinTasks are <code>Serializable</code>, which enables them
106 * to be used in extensions such as remote execution frameworks. It is
107 * in general sensible to serialize tasks only before or after, but
108 * not during execution. Serialization is not relied on during
109 * execution itself.
110 */
111 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
112
113 /**
114 * Run control status bits packed into a single int to minimize
115 * footprint and to ensure atomicity (via CAS). Status is
116 * initially zero, and takes on nonnegative values until
117 * completed, upon which status holds COMPLETED. CANCELLED, or
118 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
119 * blocking waits by other threads have SIGNAL_MASK bits set --
120 * bit 15 for external (nonFJ) waits, and the rest a count of
121 * waiting FJ threads. (This representation relies on
122 * ForkJoinPool max thread limits). Completion of a stolen task
123 * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
124 * though suboptimal for some purposes, we use basic builtin
125 * wait/notify to take advantage of "monitor inflation" in JVMs
126 * that we would otherwise need to emulate to avoid adding further
127 * per-task bookkeeping overhead. Note that bits 16-28 are
128 * currently unused. Also value 0x80000000 is available as spare
129 * completion value.
130 */
131 volatile int status; // accessed directy by pool and workers
132
133 static final int COMPLETION_MASK = 0xe0000000;
134 static final int NORMAL = 0xe0000000; // == mask
135 static final int CANCELLED = 0xc0000000;
136 static final int EXCEPTIONAL = 0xa0000000;
137 static final int SIGNAL_MASK = 0x0000ffff;
138 static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
139 static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
140
141 /**
142 * Table of exceptions thrown by tasks, to enable reporting by
143 * callers. Because exceptions are rare, we don't directly keep
144 * them with task objects, but instead us a weak ref table. Note
145 * that cancellation exceptions don't appear in the table, but are
146 * instead recorded as status values.
147 * Todo: Use ConcurrentReferenceHashMap
148 */
149 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
150 Collections.synchronizedMap
151 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
152
153 // within-package utilities
154
155 /**
156 * Get current worker thread, or null if not a worker thread
157 */
158 static ForkJoinWorkerThread getWorker() {
159 Thread t = Thread.currentThread();
160 return ((t instanceof ForkJoinWorkerThread)?
161 (ForkJoinWorkerThread)t : null);
162 }
163
164 final boolean casStatus(int cmp, int val) {
165 return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
166 }
167
168 /**
169 * Workaround for not being able to rethrow unchecked exceptions.
170 */
171 static void rethrowException(Throwable ex) {
172 if (ex != null)
173 _unsafe.throwException(ex);
174 }
175
176 // Setting completion status
177
178 /**
179 * Mark completion and wake up threads waiting to join this task.
180 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
181 */
182 final void setCompletion(int completion) {
183 ForkJoinPool pool = getPool();
184 if (pool != null) {
185 int s; // Clear signal bits while setting completion status
186 do;while ((s = status) >= 0 && !casStatus(s, completion));
187
188 if ((s & SIGNAL_MASK) != 0) {
189 if ((s &= INTERNAL_SIGNAL_MASK) != 0)
190 pool.updateRunningCount(s);
191 synchronized(this) { notifyAll(); }
192 }
193 }
194 else
195 externallySetCompletion(completion);
196 }
197
198 /**
199 * Version of setCompletion for non-FJ threads. Leaves signal
200 * bits for unblocked threads to adjust, and always notifies.
201 */
202 private void externallySetCompletion(int completion) {
203 int s;
204 do;while ((s = status) >= 0 &&
205 !casStatus(s, (s & SIGNAL_MASK) | completion));
206 synchronized(this) { notifyAll(); }
207 }
208
209 /**
210 * Sets status to indicate normal completion
211 */
212 final void setNormalCompletion() {
213 // Try typical fast case -- single CAS, no signal, not already done.
214 // Manually expand casStatus to improve chances of inlining it
215 if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
216 setCompletion(NORMAL);
217 }
218
219 // internal waiting and notification
220
221 /**
222 * Performs the actual monitor wait for awaitDone
223 */
224 private void doAwaitDone() {
225 // Minimize lock bias and in/de-flation effects by maximizing
226 // chances of waiting inside sync
227 try {
228 while (status >= 0)
229 synchronized(this) { if (status >= 0) wait(); }
230 } catch (InterruptedException ie) {
231 onInterruptedWait();
232 }
233 }
234
235 /**
236 * Performs the actual monitor wait for awaitDone
237 */
238 private void doAwaitDone(long startTime, long nanos) {
239 synchronized(this) {
240 try {
241 while (status >= 0) {
242 long nt = nanos - System.nanoTime() - startTime;
243 if (nt <= 0)
244 break;
245 wait(nt / 1000000, (int)(nt % 1000000));
246 }
247 } catch (InterruptedException ie) {
248 onInterruptedWait();
249 }
250 }
251 }
252
253 // Awaiting completion
254
255 /**
256 * Sets status to indicate there is joiner, then waits for join,
257 * surrounded with pool notifications.
258 * @return status upon exit
259 */
260 final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
261 ForkJoinPool pool = w == null? null : w.pool;
262 int s;
263 while ((s = status) >= 0) {
264 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
265 if (pool == null || !pool.preJoin(this, maintainParallelism))
266 doAwaitDone();
267 if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
268 adjustPoolCountsOnUnblock(pool);
269 break;
270 }
271 }
272 return s;
273 }
274
275 /**
276 * Timed version of awaitDone
277 * @return status upon exit
278 */
279 final int awaitDone(ForkJoinWorkerThread w, long nanos) {
280 ForkJoinPool pool = w == null? null : w.pool;
281 int s;
282 while ((s = status) >= 0) {
283 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
284 long startTime = System.nanoTime();
285 if (pool == null || !pool.preJoin(this, false))
286 doAwaitDone(startTime, nanos);
287 if ((s = status) >= 0) {
288 adjustPoolCountsOnCancelledWait(pool);
289 s = status;
290 }
291 if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
292 adjustPoolCountsOnUnblock(pool);
293 break;
294 }
295 }
296 return s;
297 }
298
299 /**
300 * Notify pool that thread is unblocked. Called by signalled
301 * threads when woken by non-FJ threads (which is atypical).
302 */
303 private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
304 int s;
305 do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
306 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
307 pool.updateRunningCount(s);
308 }
309
310 /**
311 * Notify pool to adjust counts on cancelled or timed out wait
312 */
313 private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
314 if (pool != null) {
315 int s;
316 while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
317 if (casStatus(s, s - 1)) {
318 pool.updateRunningCount(1);
319 break;
320 }
321 }
322 }
323 }
324
325 /**
326 * Handle interruptions during waits.
327 */
328 private void onInterruptedWait() {
329 ForkJoinWorkerThread w = getWorker();
330 if (w == null)
331 Thread.currentThread().interrupt(); // re-interrupt
332 else if (w.isTerminating())
333 cancelIgnoreExceptions();
334 // else if FJworker, ignore interrupt
335 }
336
337 // Recording and reporting exceptions
338
339 private void setDoneExceptionally(Throwable rex) {
340 exceptionMap.put(this, rex);
341 setCompletion(EXCEPTIONAL);
342 }
343
344 /**
345 * Throws the exception associated with status s;
346 * @throws the exception
347 */
348 private void reportException(int s) {
349 if ((s &= COMPLETION_MASK) < NORMAL) {
350 if (s == CANCELLED)
351 throw new CancellationException();
352 else
353 rethrowException(exceptionMap.get(this));
354 }
355 }
356
357 /**
358 * Returns result or throws exception using j.u.c.Future conventions
359 * Only call when isDone known to be true.
360 */
361 private V reportFutureResult()
362 throws ExecutionException, InterruptedException {
363 int s = status & COMPLETION_MASK;
364 if (s < NORMAL) {
365 Throwable ex;
366 if (s == CANCELLED)
367 throw new CancellationException();
368 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
369 throw new ExecutionException(ex);
370 if (Thread.interrupted())
371 throw new InterruptedException();
372 }
373 return getRawResult();
374 }
375
376 /**
377 * Returns result or throws exception using j.u.c.Future conventions
378 * with timeouts
379 */
380 private V reportTimedFutureResult()
381 throws InterruptedException, ExecutionException, TimeoutException {
382 Throwable ex;
383 int s = status & COMPLETION_MASK;
384 if (s == NORMAL)
385 return getRawResult();
386 if (s == CANCELLED)
387 throw new CancellationException();
388 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
389 throw new ExecutionException(ex);
390 if (Thread.interrupted())
391 throw new InterruptedException();
392 throw new TimeoutException();
393 }
394
395 // internal execution methods
396
397 /**
398 * Calls exec, recording completion, and rethrowing exception if
399 * encountered. Caller should normally check status before calling
400 * @return true if completed normally
401 */
402 private boolean tryExec() {
403 try { // try block must contain only call to exec
404 if (!exec())
405 return false;
406 } catch (Throwable rex) {
407 setDoneExceptionally(rex);
408 rethrowException(rex);
409 return false; // not reached
410 }
411 setNormalCompletion();
412 return true;
413 }
414
415 /**
416 * Main execution method used by worker threads. Invokes
417 * base computation unless already complete
418 */
419 final void quietlyExec() {
420 if (status >= 0) {
421 try {
422 if (!exec())
423 return;
424 } catch(Throwable rex) {
425 setDoneExceptionally(rex);
426 return;
427 }
428 setNormalCompletion();
429 }
430 }
431
432 /**
433 * Calls exec, recording but not rethrowing exception
434 * Caller should normally check status before calling
435 * @return true if completed normally
436 */
437 private boolean tryQuietlyInvoke() {
438 try {
439 if (!exec())
440 return false;
441 } catch (Throwable rex) {
442 setDoneExceptionally(rex);
443 return false;
444 }
445 setNormalCompletion();
446 return true;
447 }
448
449 /**
450 * Cancel, ignoring any exceptions it throws
451 */
452 final void cancelIgnoreExceptions() {
453 try {
454 cancel(false);
455 } catch(Throwable ignore) {
456 }
457 }
458
459 // public methods
460
461 /**
462 * Arranges to asynchronously execute this task. While it is not
463 * necessarily enforced, it is a usage error to fork a task more
464 * than once unless it has completed and been reinitialized. This
465 * method may be invoked only from within ForkJoinTask
466 * computations. Attempts to invoke in other contexts result in
467 * exceptions or errors including ClassCastException.
468 */
469 public final void fork() {
470 ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
471 }
472
473 /**
474 * Returns the result of the computation when it is ready.
475 * This method differs from <code>get</code> in that abnormal
476 * completion results in RuntimeExceptions or Errors, not
477 * ExecutionExceptions.
478 *
479 * @return the computed result
480 */
481 public final V join() {
482 ForkJoinWorkerThread w = getWorker();
483 if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
484 reportException(awaitDone(w, true));
485 return getRawResult();
486 }
487
488 public final V get() throws InterruptedException, ExecutionException {
489 ForkJoinWorkerThread w = getWorker();
490 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
491 awaitDone(w, true);
492 return reportFutureResult();
493 }
494
495 public final V get(long timeout, TimeUnit unit)
496 throws InterruptedException, ExecutionException, TimeoutException {
497 ForkJoinWorkerThread w = getWorker();
498 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
499 awaitDone(w, unit.toNanos(timeout));
500 return reportTimedFutureResult();
501 }
502
503 /**
504 * Commences performing this task, awaits its completion if
505 * necessary, and return its result.
506 * @throws Throwable (a RuntimeException, Error, or unchecked
507 * exception) if the underlying computation did so.
508 * @return the computed result
509 */
510 public final V invoke() {
511 if (status >= 0 && tryExec())
512 return getRawResult();
513 else
514 return join();
515 }
516
517 /**
518 * Forks both tasks, returning when <code>isDone</code> holds for
519 * both of them or an exception is encountered. This method may be
520 * invoked only from within ForkJoinTask computations. Attempts to
521 * invoke in other contexts result in exceptions or errors
522 * including ClassCastException.
523 * @param t1 one task
524 * @param t2 the other task
525 * @throws NullPointerException if t1 or t2 are null
526 * @throws RuntimeException or Error if either task did so.
527 */
528 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
529 t2.fork();
530 t1.invoke();
531 t2.join();
532 }
533
534 /**
535 * Forks the given tasks, returning when <code>isDone</code> holds
536 * for all of them. If any task encounters an exception, others
537 * may be cancelled. This method may be invoked only from within
538 * ForkJoinTask computations. Attempts to invoke in other contexts
539 * result in exceptions or errors including ClassCastException.
540 * @param tasks the array of tasks
541 * @throws NullPointerException if tasks or any element are null.
542 * @throws RuntimeException or Error if any task did so.
543 */
544 public static void invokeAll(ForkJoinTask<?>... tasks) {
545 Throwable ex = null;
546 int last = tasks.length - 1;
547 for (int i = last; i >= 0; --i) {
548 ForkJoinTask<?> t = tasks[i];
549 if (t == null) {
550 if (ex == null)
551 ex = new NullPointerException();
552 }
553 else if (i != 0)
554 t.fork();
555 else {
556 t.quietlyInvoke();
557 if (ex == null)
558 ex = t.getException();
559 }
560 }
561 for (int i = 1; i <= last; ++i) {
562 ForkJoinTask<?> t = tasks[i];
563 if (t != null) {
564 if (ex != null)
565 t.cancel(false);
566 else {
567 t.quietlyJoin();
568 if (ex == null)
569 ex = t.getException();
570 }
571 }
572 }
573 if (ex != null)
574 rethrowException(ex);
575 }
576
577 /**
578 * Forks all tasks in the collection, returning when
579 * <code>isDone</code> holds for all of them. If any task
580 * encounters an exception, others may be cancelled. This method
581 * may be invoked only from within ForkJoinTask
582 * computations. Attempts to invoke in other contexts resul!t in
583 * exceptions or errors including ClassCastException.
584 * @param tasks the collection of tasks
585 * @throws NullPointerException if tasks or any element are null.
586 * @throws RuntimeException or Error if any task did so.
587 */
588 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
589 if (!(tasks instanceof List)) {
590 invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
591 return;
592 }
593 List<? extends ForkJoinTask<?>> ts =
594 (List<? extends ForkJoinTask<?>>)tasks;
595 Throwable ex = null;
596 int last = ts.size() - 1;
597 for (int i = last; i >= 0; --i) {
598 ForkJoinTask<?> t = ts.get(i);
599 if (t == null) {
600 if (ex == null)
601 ex = new NullPointerException();
602 }
603 else if (i != 0)
604 t.fork();
605 else {
606 t.quietlyInvoke();
607 if (ex == null)
608 ex = t.getException();
609 }
610 }
611 for (int i = 1; i <= last; ++i) {
612 ForkJoinTask<?> t = ts.get(i);
613 if (t != null) {
614 if (ex != null)
615 t.cancel(false);
616 else {
617 t.quietlyJoin();
618 if (ex == null)
619 ex = t.getException();
620 }
621 }
622 }
623 if (ex != null)
624 rethrowException(ex);
625 }
626
627 /**
628 * Returns true if the computation performed by this task has
629 * completed (or has been cancelled).
630 * @return true if this computation has completed
631 */
632 public final boolean isDone() {
633 return status < 0;
634 }
635
636 /**
637 * Returns true if this task was cancelled.
638 * @return true if this task was cancelled
639 */
640 public final boolean isCancelled() {
641 return (status & COMPLETION_MASK) == CANCELLED;
642 }
643
644 /**
645 * Returns true if this task threw an exception or was cancelled
646 * @return true if this task threw an exception or was cancelled
647 */
648 public final boolean isCompletedAbnormally() {
649 return (status & COMPLETION_MASK) < NORMAL;
650 }
651
652 /**
653 * Returns the exception thrown by the base computation, or a
654 * CancellationException if cancelled, or null if none or if the
655 * method has not yet completed.
656 * @return the exception, or null if none
657 */
658 public final Throwable getException() {
659 int s = status & COMPLETION_MASK;
660 if (s >= NORMAL)
661 return null;
662 if (s == CANCELLED)
663 return new CancellationException();
664 return exceptionMap.get(this);
665 }
666
667 /**
668 * Asserts that the results of this task's computation will not be
669 * used. If a cancellation occurs before atempting to execute this
670 * task, then execution will be suppressed, <code>isCancelled</code>
671 * will report true, and <code>join</code> will result in a
672 * <code>CancellationException</code> being thrown. Otherwise, when
673 * cancellation races with completion, there are no guarantees
674 * about whether <code>isCancelled</code> will report true, whether
675 * <code>join</code> will return normally or via an exception, or
676 * whether these behaviors will remain consistent upon repeated
677 * invocation.
678 *
679 * <p>This method may be overridden in subclasses, but if so, must
680 * still ensure that these minimal properties hold. In particular,
681 * the cancel method itself must not throw exceptions.
682 *
683 * <p> This method is designed to be invoked by <em>other</em>
684 * tasks. To terminate the current task, you can just return or
685 * throw an unchecked exception from its computation method, or
686 * invoke <code>completeExceptionally</code>.
687 *
688 * @param mayInterruptIfRunning this value is ignored in the
689 * default implementation because tasks are not in general
690 * cancelled via interruption.
691 *
692 * @return true if this task is now cancelled
693 */
694 public boolean cancel(boolean mayInterruptIfRunning) {
695 setCompletion(CANCELLED);
696 return (status & COMPLETION_MASK) == CANCELLED;
697 }
698
699 /**
700 * Completes this task abnormally, and if not already aborted or
701 * cancelled, causes it to throw the given exception upon
702 * <code>join</code> and related operations. This method may be used
703 * to induce exceptions in asynchronous tasks, or to force
704 * completion of tasks that would not otherwise complete. Its use
705 * in other situations is likely to be wrong. This method is
706 * overridable, but overridden versions must invoke <code>super</code>
707 * implementation to maintain guarantees.
708 *
709 * @param ex the exception to throw. If this exception is
710 * not a RuntimeException or Error, the actual exception thrown
711 * will be a RuntimeException with cause ex.
712 */
713 public void completeExceptionally(Throwable ex) {
714 setDoneExceptionally((ex instanceof RuntimeException) ||
715 (ex instanceof Error)? ex :
716 new RuntimeException(ex));
717 }
718
719 /**
720 * Completes this task, and if not already aborted or cancelled,
721 * returning a <code>null</code> result upon <code>join</code> and related
722 * operations. This method may be used to provide results for
723 * asynchronous tasks, or to provide alternative handling for
724 * tasks that would not otherwise complete normally. Its use in
725 * other situations is likely to be wrong. This method is
726 * overridable, but overridden versions must invoke <code>super</code>
727 * implementation to maintain guarantees.
728 *
729 * @param value the result value for this task.
730 */
731 public void complete(V value) {
732 try {
733 setRawResult(value);
734 } catch(Throwable rex) {
735 setDoneExceptionally(rex);
736 return;
737 }
738 setNormalCompletion();
739 }
740
741 /**
742 * Possibly executes other tasks until this task is ready, then
743 * returns the result of the computation. This method may be more
744 * efficient than <code>join</code>, but is only applicable when
745 * there are no potemtial dependencies between continuation of the
746 * current task and that of any other task that might be executed
747 * while helping. (This usually holds for pure divide-and-conquer
748 * tasks). This method may be invoked only from within
749 * ForkJoinTask computations. Attempts to invoke in other contexts
750 * resul!t in exceptions or errors including ClassCastException.
751 * @return the computed result
752 */
753 public final V helpJoin() {
754 ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
755 if (status < 0 || !w.unpushTask(this) || !tryExec())
756 reportException(w.helpJoinTask(this));
757 return getRawResult();
758 }
759
760 /**
761 * Possibly executes other tasks until this task is ready. This
762 * method may be invoked only from within ForkJoinTask
763 * computations. Attempts to invoke in other contexts resul!t in
764 * exceptions or errors including ClassCastException.
765 */
766 public final void quietlyHelpJoin() {
767 if (status >= 0) {
768 ForkJoinWorkerThread w =
769 (ForkJoinWorkerThread)(Thread.currentThread());
770 if (!w.unpushTask(this) || !tryQuietlyInvoke())
771 w.helpJoinTask(this);
772 }
773 }
774
775 /**
776 * Joins this task, without returning its result or throwing an
777 * exception. This method may be useful when processing
778 * collections of tasks when some have been cancelled or otherwise
779 * known to have aborted.
780 */
781 public final void quietlyJoin() {
782 if (status >= 0) {
783 ForkJoinWorkerThread w = getWorker();
784 if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
785 awaitDone(w, true);
786 }
787 }
788
789 /**
790 * Commences performing this task and awaits its completion if
791 * necessary, without returning its result or throwing an
792 * exception. This method may be useful when processing
793 * collections of tasks when some have been cancelled or otherwise
794 * known to have aborted.
795 */
796 public final void quietlyInvoke() {
797 if (status >= 0 && !tryQuietlyInvoke())
798 quietlyJoin();
799 }
800
801 /**
802 * Resets the internal bookkeeping state of this task, allowing a
803 * subsequent <code>fork</code>. This method allows repeated reuse of
804 * this task, but only if reuse occurs when this task has either
805 * never been forked, or has been forked, then completed and all
806 * outstanding joins of this task have also completed. Effects
807 * under any other usage conditions are not guaranteed, and are
808 * almost surely wrong. This method may be useful when executing
809 * pre-constructed trees of subtasks in loops.
810 */
811 public void reinitialize() {
812 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
813 exceptionMap.remove(this);
814 status = 0;
815 }
816
817 /**
818 * Returns the pool hosting the current task execution, or null
819 * if this task is executing outside of any pool.
820 * @return the pool, or null if none.
821 */
822 public static ForkJoinPool getPool() {
823 Thread t = Thread.currentThread();
824 return ((t instanceof ForkJoinWorkerThread)?
825 ((ForkJoinWorkerThread)t).pool : null);
826 }
827
828 /**
829 * Tries to unschedule this task for execution. This method will
830 * typically succeed if this task is the most recently forked task
831 * by the current thread, and has not commenced executing in
832 * another thread. This method may be useful when arranging
833 * alternative local processing of tasks that could have been, but
834 * were not, stolen. This method may be invoked only from within
835 * ForkJoinTask computations. Attempts to invoke in other contexts
836 * result in exceptions or errors including ClassCastException.
837 * @return true if unforked
838 */
839 public boolean tryUnfork() {
840 return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
841 }
842
843 /**
844 * Possibly executes tasks until the pool hosting the current task
845 * {@link ForkJoinPool#isQuiescent}. This method may be of use in
846 * designs in which many tasks are forked, but none are explicitly
847 * joined, instead executing them until all are processed.
848 */
849 public static void helpQuiesce() {
850 ((ForkJoinWorkerThread)(Thread.currentThread())).
851 helpQuiescePool();
852 }
853
854 /**
855 * Returns an estimate of the number of tasks that have been
856 * forked by the current worker thread but not yet executed. This
857 * value may be useful for heuristic decisions about whether to
858 * fork other tasks.
859 * @return the number of tasks
860 */
861 public static int getQueuedTaskCount() {
862 return ((ForkJoinWorkerThread)(Thread.currentThread())).
863 getQueueSize();
864 }
865
866 /**
867 * Returns a estimate of how many more locally queued tasks are
868 * held by the current worker thread than there are other worker
869 * threads that might steal them. This value may be useful for
870 * heuristic decisions about whether to fork other tasks. In many
871 * usages of ForkJoinTasks, at steady state, each worker should
872 * aim to maintain a small constant surplus (for example, 3) of
873 * tasks, and to process computations locally if this threshold is
874 * exceeded.
875 * @return the surplus number of tasks, which may be negative
876 */
877 public static int getSurplusQueuedTaskCount() {
878 return ((ForkJoinWorkerThread)(Thread.currentThread()))
879 .getEstimatedSurplusTaskCount();
880 }
881
882 // Extension methods
883
884 /**
885 * Returns the result that would be returned by <code>join</code>,
886 * even if this task completed abnormally, or null if this task is
887 * not known to have been completed. This method is designed to
888 * aid debugging, as well as to support extensions. Its use in any
889 * other context is discouraged.
890 *
891 * @return the result, or null if not completed.
892 */
893 public abstract V getRawResult();
894
895 /**
896 * Forces the given value to be returned as a result. This method
897 * is designed to support extensions, and should not in general be
898 * called otherwise.
899 *
900 * @param value the value
901 */
902 protected abstract void setRawResult(V value);
903
904 /**
905 * Immediately performs the base action of this task. This method
906 * is designed to support extensions, and should not in general be
907 * called otherwise. The return value controls whether this task
908 * is considered to be done normally. It may return false in
909 * asynchronous actions that require explicit invocations of
910 * <code>complete</code> to become joinable. It may throw exceptions
911 * to indicate abnormal exit.
912 * @return true if completed normally
913 * @throws Error or RuntimeException if encountered during computation
914 */
915 protected abstract boolean exec();
916
917 /**
918 * Returns, but does not unschedule or execute, the task most
919 * recently forked by the current thread but not yet executed, if
920 * one is available. There is no guarantee that this task will
921 * actually be polled or executed next.
922 * This method is designed primarily to support extensions,
923 * and is unlikely to be useful otherwise.
924 *
925 * @return the next task, or null if none are available
926 */
927 protected static ForkJoinTask<?> peekNextLocalTask() {
928 return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
929 }
930
931 /**
932 * Unschedules and returns, without executing, the task most
933 * recently forked by the current thread but not yet executed.
934 * This method is designed primarily to support extensions,
935 * and is unlikely to be useful otherwise.
936 *
937 * @return the next task, or null if none are available
938 */
939 protected static ForkJoinTask<?> pollNextLocalTask() {
940 return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
941 }
942
943 /**
944 * Unschedules and returns, without executing, the task most
945 * recently forked by the current thread but not yet executed, if
946 * one is available, or if not available, a task that was forked
947 * by some other thread, if available. Availability may be
948 * transient, so a <code>null</code> result does not necessarily
949 * imply quiecence of the pool this task is operating in.
950 * This method is designed primarily to support extensions,
951 * and is unlikely to be useful otherwise.
952 *
953 * @return a task, or null if none are available
954 */
955 protected static ForkJoinTask<?> pollTask() {
956 return ((ForkJoinWorkerThread)(Thread.currentThread())).
957 getLocalOrStolenTask();
958 }
959
960 // Serialization support
961
962 private static final long serialVersionUID = -7721805057305804111L;
963
964 /**
965 * Save the state to a stream.
966 *
967 * @serialData the current run status and the exception thrown
968 * during execution, or null if none.
969 * @param s the stream
970 */
971 private void writeObject(java.io.ObjectOutputStream s)
972 throws java.io.IOException {
973 s.defaultWriteObject();
974 s.writeObject(getException());
975 }
976
977 /**
978 * Reconstitute the instance from a stream.
979 * @param s the stream
980 */
981 private void readObject(java.io.ObjectInputStream s)
982 throws java.io.IOException, ClassNotFoundException {
983 s.defaultReadObject();
984 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
985 status |= EXTERNAL_SIGNAL; // conservatively set external signal
986 Object ex = s.readObject();
987 if (ex != null)
988 setDoneExceptionally((Throwable)ex);
989 }
990
991 // Temporary Unsafe mechanics for preliminary release
992
993 static final Unsafe _unsafe;
994 static final long statusOffset;
995
996 static {
997 try {
998 if (ForkJoinTask.class.getClassLoader() != null) {
999 Field f = Unsafe.class.getDeclaredField("theUnsafe");
1000 f.setAccessible(true);
1001 _unsafe = (Unsafe)f.get(null);
1002 }
1003 else
1004 _unsafe = Unsafe.getUnsafe();
1005 statusOffset = _unsafe.objectFieldOffset
1006 (ForkJoinTask.class.getDeclaredField("status"));
1007 } catch (Exception ex) { throw new Error(ex); }
1008 }
1009
1010 }