ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.1
Committed: Tue Jan 6 14:30:31 2009 UTC (15 years, 3 months ago) by dl
Branch: MAIN
Log Message:
Refactored and repackaged ForkJoin classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.io.Serializable;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * Abstract base class for tasks that run within a ForkJoinPool. A
17 * ForkJoinTask is a thread-like entity that is much lighter weight
18 * than a normal thread. Huge numbers of tasks and subtasks may be
19 * hosted by a small number of actual threads in a ForkJoinPool,
20 * at the price of some usage limitations.
21 *
22 * <p> ForkJoinTasks are forms of <tt>Futures</tt> supporting a
23 * limited range of use. The "lightness" of ForkJoinTasks is due to a
24 * set of restrictions (that are only partially statically
25 * enforceable) reflecting their intended use as computational tasks
26 * calculating pure functions or operating on purely isolated objects.
27 * The primary coordination mechanisms supported for ForkJoinTasks are
28 * <tt>fork</tt>, that arranges asynchronous execution, and
29 * <tt>join</tt>, that doesn't proceed until the task's result has
30 * been computed. (Cancellation is also supported). The computation
31 * defined in the <tt>compute</tt> method should avoid
32 * <tt>synchronized</tt> methods or blocks, and should minimize
33 * blocking synchronization apart from joining other tasks or using
34 * synchronizers such as Phasers that are advertised to cooperate with
35 * fork/join scheduling. Tasks should also not perform blocking IO,
36 * and should ideally access variables that are completely independent
37 * of those accessed by other running tasks. Minor breaches of these
38 * restrictions, for example using shared output streams, may be
39 * tolerable in practice, but frequent use may result in poor
40 * performance, and the potential to indefinitely stall if the number
41 * of threads not waiting for external synchronization becomes
42 * exhausted. This usage restriction is in part enforced by not
43 * permitting checked exceptions such as IOExceptions to be
44 * thrown. However, computations may still encounter unchecked
45 * exceptions, that are rethrown to callers attempting join
46 * them. These exceptions may additionally include
47 * RejectedExecutionExceptions stemming from internal resource
48 * exhaustion such as failure to allocate internal task queues.
49 *
50 * <p> The <tt>ForkJoinTask</tt> class is not usually directly
51 * subclassed. Instead, you subclass one of the abstract classes that
52 * support different styles of fork/join processing. Normally, a
53 * concrete ForkJoinTask subclass declares fields comprising its
54 * parameters, established in a constructor, and then defines a
55 * <tt>compute</tt> method that somehow uses the control methods
56 * supplied by this base class. While these methods have
57 * <tt>public</tt> access, some of them may only be called from within
58 * other ForkJoinTasks. Attempts to invoke them in other contexts
59 * result in exceptions or errors including ClassCastException. The
60 * only way to invoke a "main" driver task is to submit it to a
61 * ForkJoinPool. Once started, this will usually in turn start other
62 * subtasks.
63 *
64 * <p>Most base support methods are <tt>final</tt> because their
65 * implementations are intrinsically tied to the underlying
66 * lightweight task scheduling framework, and so cannot be overridden.
67 * Developers creating new basic styles of fork/join processing should
68 * minimally implement protected methods <tt>exec</tt>,
69 * <tt>setRawResult</tt>, and <tt>getRawResult</tt>, while also
70 * introducing an abstract computational method that can be
71 * implemented in its subclasses. To support such extensions,
72 * instances of ForkJoinTasks maintain an atomically updated
73 * <tt>short</tt> representing user-defined control state. Control
74 * state is guaranteed initially to be zero, and to be negative upon
75 * completion, but may otherwise be used for any other control
76 * purposes, such as maintaining join counts. The {@link
77 * ForkJoinWorkerThread} class supports additional inspection and
78 * tuning methods that can be useful when developing extensions.
79 *
80 * <p>ForkJoinTasks should perform relatively small amounts of
81 * computations, othewise splitting into smaller tasks. As a very
82 * rough rule of thumb, a task should perform more than 100 and less
83 * than 10000 basic computational steps. If tasks are too big, then
84 * parellelism cannot improve throughput. If too small, then memory
85 * and internal task maintenance overhead may overwhelm processing.
86 *
87 * <p>ForkJoinTasks are <tt>Serializable</tt>, which enables them to
88 * be used in extensions such as remote execution frameworks. However,
89 * it is in general safe to serialize tasks only before or after, but
90 * not during execution. Serialization is not relied on during
91 * execution itself.
92 */
93 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
94 /**
95 * Status field holding all run status. We pack this into a single
96 * int both to minimize footprint overhead and to ensure atomicity
97 * (updates are via CAS).
98 *
99 * Status is initially zero, and takes on nonnegative values until
100 * completed, upon which status holds COMPLETED. CANCELLED, or
101 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
102 * blocking waits by other threads have SIGNAL_MASK bits set --
103 * bit 15 for external (nonFJ) waits, and the rest a count of
104 * waiting FJ threads. (This representation relies on
105 * ForkJoinPool max thread limits). Completion of a stolen task
106 * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
107 * though suboptimal for some purposes, we use basic builtin
108 * wait/notify to take advantage of "monitor inflation" in JVMs
109 * that we would otherwise need to emulate to avoid adding further
110 * per-task bookkeeping overhead. Note that bits 16-28 are
111 * currently unused. Also value 0x80000000 is available as spare
112 * completion value.
113 */
114 volatile int status; // accessed directy by pool and workers
115
116 static final int COMPLETION_MASK = 0xe0000000;
117 static final int NORMAL = 0xe0000000; // == mask
118 static final int CANCELLED = 0xc0000000;
119 static final int EXCEPTIONAL = 0xa0000000;
120 static final int SIGNAL_MASK = 0x0000ffff;
121 static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
122 static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
123
124 /**
125 * Table of exceptions thrown by tasks, to enable reporting by
126 * callers. Because exceptions are rare, we don't directly keep
127 * them with task objects, but instead us a weak ref table. Note
128 * that cancellation exceptions don't appear in the table, but are
129 * instead recorded as status values.
130 * Todo: Use ConcurrentReferenceHashMap
131 */
132 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
133 Collections.synchronizedMap
134 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
135
136 // within-package utilities
137
138 /**
139 * Get current worker thread, or null if not a worker thread
140 */
141 static ForkJoinWorkerThread getWorker() {
142 Thread t = Thread.currentThread();
143 return ((t instanceof ForkJoinWorkerThread)?
144 (ForkJoinWorkerThread)t : null);
145 }
146
147 /**
148 * Get pool of current worker thread, or null if not a worker thread
149 */
150 static ForkJoinPool getWorkerPool() {
151 Thread t = Thread.currentThread();
152 return ((t instanceof ForkJoinWorkerThread)?
153 ((ForkJoinWorkerThread)t).pool : null);
154 }
155
156 final boolean casStatus(int cmp, int val) {
157 return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
158 }
159
160 /**
161 * Workaround for not being able to rethrow unchecked exceptions.
162 */
163 static void rethrowException(Throwable ex) {
164 if (ex != null)
165 _unsafe.throwException(ex);
166 }
167
168 // Setting completion status
169
170 /**
171 * Mark completion and wake up threads waiting to join this task.
172 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
173 */
174 final void setCompletion(int completion) {
175 ForkJoinPool pool = getWorkerPool();
176 if (pool != null) {
177 int s; // Clear signal bits while setting completion status
178 do;while ((s = status) >= 0 && !casStatus(s, completion));
179
180 if ((s & SIGNAL_MASK) != 0) {
181 if ((s &= INTERNAL_SIGNAL_MASK) != 0)
182 pool.updateRunningCount(s);
183 synchronized(this) { notifyAll(); }
184 }
185 }
186 else
187 externallySetCompletion(completion);
188 }
189
190 /**
191 * Version of setCompletion for non-FJ threads. Leaves signal
192 * bits for unblocked threads to adjust, and always notifies.
193 */
194 private void externallySetCompletion(int completion) {
195 int s;
196 do;while ((s = status) >= 0 &&
197 !casStatus(s, (s & SIGNAL_MASK) | completion));
198 synchronized(this) { notifyAll(); }
199 }
200
201 /**
202 * Sets status to indicate normal completion
203 */
204 final void setNormalCompletion() {
205 // Try typical fast case -- single CAS, no signal, not already done.
206 // Manually expand casStatus to improve chances of inlining it
207 if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
208 setCompletion(NORMAL);
209 }
210
211 // internal waiting and notification
212
213 /**
214 * Performs the actual monitor wait for awaitDone
215 */
216 private void doAwaitDone() {
217 // Minimize lock bias and in/de-flation effects by maximizing
218 // chances of waiting inside sync
219 try {
220 while (status >= 0)
221 synchronized(this) { if (status >= 0) wait(); }
222 } catch (InterruptedException ie) {
223 onInterruptedWait();
224 }
225 }
226
227 /**
228 * Performs the actual monitor wait for awaitDone
229 */
230 private void doAwaitDone(long startTime, long nanos) {
231 synchronized(this) {
232 try {
233 while (status >= 0) {
234 long nt = nanos - System.nanoTime() - startTime;
235 if (nt <= 0)
236 break;
237 wait(nt / 1000000, (int)(nt % 1000000));
238 }
239 } catch (InterruptedException ie) {
240 onInterruptedWait();
241 }
242 }
243 }
244
245 // Awaiting completion
246
247 /**
248 * Sets status to indicate there is joiner, then waits for join,
249 * surrounded with pool notifications.
250 * @return status upon exit
251 */
252 final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
253 ForkJoinPool pool = w == null? null : w.pool;
254 int s;
255 while ((s = status) >= 0) {
256 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
257 if (pool == null || !pool.preJoin(this, maintainParallelism))
258 doAwaitDone();
259 if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
260 adjustPoolCountsOnUnblock(pool);
261 break;
262 }
263 }
264 return s;
265 }
266
267 /**
268 * Timed version of awaitDone
269 * @return status upon exit
270 */
271 final int awaitDone(ForkJoinWorkerThread w, long nanos) {
272 ForkJoinPool pool = w == null? null : w.pool;
273 int s;
274 while ((s = status) >= 0) {
275 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
276 long startTime = System.nanoTime();
277 if (pool == null || !pool.preJoin(this, false))
278 doAwaitDone(startTime, nanos);
279 if ((s = status) >= 0) {
280 adjustPoolCountsOnCancelledWait(pool);
281 s = status;
282 }
283 if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
284 adjustPoolCountsOnUnblock(pool);
285 break;
286 }
287 }
288 return s;
289 }
290
291 /**
292 * Notify pool that thread is unblocked. Called by signalled
293 * threads when woken by non-FJ threads (which is atypical).
294 */
295 private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
296 int s;
297 do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
298 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
299 pool.updateRunningCount(s);
300 }
301
302 /**
303 * Notify pool to adjust counts on cancelled or timed out wait
304 */
305 private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
306 if (pool != null) {
307 int s;
308 while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
309 if (casStatus(s, s - 1)) {
310 pool.updateRunningCount(1);
311 break;
312 }
313 }
314 }
315 }
316
317 private void onInterruptedWait() {
318 Thread t = Thread.currentThread();
319 if (t instanceof ForkJoinWorkerThread) {
320 ForkJoinWorkerThread w = (ForkJoinWorkerThread)t;
321 if (w.isTerminating())
322 cancelIgnoreExceptions();
323 }
324 else { // re-interrupt
325 try {
326 t.interrupt();
327 } catch (SecurityException ignore) {
328 }
329 }
330 }
331
332 // Recording and reporting exceptions
333
334 private void setDoneExceptionally(Throwable rex) {
335 exceptionMap.put(this, rex);
336 setCompletion(EXCEPTIONAL);
337 }
338
339 /**
340 * Throws the exception associated with status s;
341 * @throws the exception
342 */
343 private void reportException(int s) {
344 if ((s &= COMPLETION_MASK) < NORMAL) {
345 if (s == CANCELLED)
346 throw new CancellationException();
347 else
348 rethrowException(exceptionMap.get(this));
349 }
350 }
351
352 /**
353 * Returns result or throws exception using j.u.c.Future conventions
354 * Only call when isDone known to be true.
355 */
356 private V reportFutureResult()
357 throws ExecutionException, InterruptedException {
358 int s = status & COMPLETION_MASK;
359 if (s < NORMAL) {
360 Throwable ex;
361 if (s == CANCELLED)
362 throw new CancellationException();
363 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
364 throw new ExecutionException(ex);
365 if (Thread.interrupted())
366 throw new InterruptedException();
367 }
368 return getRawResult();
369 }
370
371 /**
372 * Returns result or throws exception using j.u.c.Future conventions
373 * with timeouts
374 */
375 private V reportTimedFutureResult()
376 throws InterruptedException, ExecutionException, TimeoutException {
377 Throwable ex;
378 int s = status & COMPLETION_MASK;
379 if (s == NORMAL)
380 return getRawResult();
381 if (s == CANCELLED)
382 throw new CancellationException();
383 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
384 throw new ExecutionException(ex);
385 if (Thread.interrupted())
386 throw new InterruptedException();
387 throw new TimeoutException();
388 }
389
390 // internal execution methods
391
392 /**
393 * Calls exec, recording completion, and rethrowing exception if
394 * encountered. Caller should normally check status before calling
395 * @return true if completed normally
396 */
397 private boolean tryExec() {
398 try { // try block must contain only call to exec
399 if (!exec())
400 return false;
401 } catch (Throwable rex) {
402 setDoneExceptionally(rex);
403 rethrowException(rex);
404 return false; // not reached
405 }
406 setNormalCompletion();
407 return true;
408 }
409
410 /**
411 * Main execution method used by worker threads. Invokes
412 * base computation unless already complete
413 */
414 final void quietlyExec() {
415 if (status >= 0) {
416 try {
417 if (!exec())
418 return;
419 } catch(Throwable rex) {
420 setDoneExceptionally(rex);
421 return;
422 }
423 setNormalCompletion();
424 }
425 }
426
427 /**
428 * Calls exec, recording but not rethrowing exception
429 * Caller should normally check status before calling
430 * @return true if completed normally
431 */
432 private boolean tryQuietlyInvoke() {
433 try {
434 if (!exec())
435 return false;
436 } catch (Throwable rex) {
437 setDoneExceptionally(rex);
438 return false;
439 }
440 setNormalCompletion();
441 return true;
442 }
443
444 /**
445 * Cancel, ignoring any exceptions it throws
446 */
447 final void cancelIgnoreExceptions() {
448 try {
449 cancel(false);
450 } catch(Throwable ignore) {
451 }
452 }
453
454 // public methods
455
456 /**
457 * Arranges to asynchronously execute this task. While it is not
458 * necessarily enforced, it is a usage error to fork a task more
459 * than once unless it has completed and been reinitialized. This
460 * method may be invoked only from within other ForkJoinTask
461 * computations. Attempts to invoke in other contexts result in
462 * exceptions or errors including ClassCastException.
463 */
464 public final void fork() {
465 ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
466 }
467
468 /**
469 * Returns the result of the computation when it is ready.
470 * This method differs from <tt>get</tt> in that abnormal
471 * completion results in RuntimeExceptions or Errors, not
472 * ExecutionExceptions.
473 *
474 * @return the computed result
475 */
476 public final V join() {
477 ForkJoinWorkerThread w = getWorker();
478 if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
479 reportException(awaitDone(w, true));
480 return getRawResult();
481 }
482
483 public final V get() throws InterruptedException, ExecutionException {
484 ForkJoinWorkerThread w = getWorker();
485 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
486 awaitDone(w, true);
487 return reportFutureResult();
488 }
489
490 public final V get(long timeout, TimeUnit unit)
491 throws InterruptedException, ExecutionException, TimeoutException {
492 ForkJoinWorkerThread w = getWorker();
493 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
494 awaitDone(w, unit.toNanos(timeout));
495 return reportTimedFutureResult();
496 }
497
498 /**
499 * Possibly executes other tasks until this task is ready, then
500 * returns the result of the computation. This method may be more
501 * efficient than <tt>join</tt>, but is only applicable when there
502 * are no potemtial dependencies between continuation of the
503 * current task and that of any other task that might be executed
504 * while helping. (This usually holds for pure divide-and-conquer
505 * tasks).
506 * @return the computed result
507 */
508 public final V helpJoin() {
509 ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
510 if (status < 0 || !w.unpushTask(this) || !tryExec())
511 reportException(w.helpJoinTask(this));
512 return getRawResult();
513 }
514
515 /**
516 * Performs this task, awaits its completion if necessary, and
517 * return its result.
518 * @throws Throwable (a RuntimeException, Error, or unchecked
519 * exception) if the underlying computation did so.
520 * @return the computed result
521 */
522 public final V invoke() {
523 if (status >= 0 && tryExec())
524 return getRawResult();
525 else
526 return join();
527 }
528
529 /**
530 * Joins this task, without returning its result or throwing an
531 * exception. This method may be useful when processing
532 * collections of tasks when some have been cancelled or otherwise
533 * known to have aborted.
534 */
535 public final void quietlyJoin() {
536 if (status >= 0) {
537 ForkJoinWorkerThread w = getWorker();
538 if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
539 awaitDone(w, true);
540 }
541 }
542
543 /**
544 * Possibly executes other tasks until this task is ready.
545 */
546 public final void quietlyHelpJoin() {
547 if (status >= 0) {
548 ForkJoinWorkerThread w =
549 (ForkJoinWorkerThread)(Thread.currentThread());
550 if (!w.unpushTask(this) || !tryQuietlyInvoke())
551 w.helpJoinTask(this);
552 }
553 }
554
555 /**
556 * Performs this task and awaits its completion if necessary,
557 * without returning its result or throwing an exception. This
558 * method may be useful when processing collections of tasks when
559 * some have been cancelled or otherwise known to have aborted.
560 */
561 public final void quietlyInvoke() {
562 if (status >= 0 && !tryQuietlyInvoke())
563 quietlyJoin();
564 }
565
566 /**
567 * Returns true if the computation performed by this task has
568 * completed (or has been cancelled).
569 * @return true if this computation has completed
570 */
571 public final boolean isDone() {
572 return status < 0;
573 }
574
575 /**
576 * Returns true if this task was cancelled.
577 * @return true if this task was cancelled
578 */
579 public final boolean isCancelled() {
580 return (status & COMPLETION_MASK) == CANCELLED;
581 }
582
583 /**
584 * Returns true if this task threw an exception or was cancelled
585 * @return true if this task threw an exception or was cancelled
586 */
587 public final boolean completedAbnormally() {
588 return (status & COMPLETION_MASK) < NORMAL;
589 }
590
591 /**
592 * Returns the exception thrown by the base computation, or a
593 * CancellationException if cancelled, or null if none or if the
594 * method has not yet completed.
595 * @return the exception, or null if none
596 */
597 public final Throwable getException() {
598 int s = status & COMPLETION_MASK;
599 if (s >= NORMAL)
600 return null;
601 if (s == CANCELLED)
602 return new CancellationException();
603 return exceptionMap.get(this);
604 }
605
606 /**
607 * Asserts that the results of this task's computation will not be
608 * used. If a cancellation occurs before this task is processed,
609 * then its <tt>compute</tt> method will not be executed,
610 * <tt>isCancelled</tt> will report true, and <tt>join</tt> will
611 * result in a CancellationException being thrown. Otherwise, when
612 * cancellation races with completion, there are no guarantees
613 * about whether <tt>isCancelled</tt> will report true, whether
614 * <tt>join</tt> will return normally or via an exception, or
615 * whether these behaviors will remain consistent upon repeated
616 * invocation.
617 *
618 * <p>This method may be overridden in subclasses, but if so, must
619 * still ensure that these minimal properties hold. In particular,
620 * the cancel method itself must not throw exceptions.
621 *
622 * <p> This method is designed to be invoked by <em>other</em>
623 * tasks. To terminate the current task, you can just return or
624 * throw an unchecked exception from its computation method, or
625 * invoke <tt>completeExceptionally(someException)</tt>.
626 *
627 * @param mayInterruptIfRunning this value is ignored in the
628 * default implementation because tasks are not in general
629 * cancelled via interruption.
630 *
631 * @return true if this task is now cancelled
632 */
633 public boolean cancel(boolean mayInterruptIfRunning) {
634 setCompletion(CANCELLED);
635 return (status & COMPLETION_MASK) == CANCELLED;
636 }
637
638 /**
639 * Completes this task abnormally, and if not already aborted or
640 * cancelled, causes it to throw the given exception upon
641 * <tt>join</tt> and related operations. This method may be used
642 * to induce exceptions in asynchronous tasks, or to force
643 * completion of tasks that would not otherwise complete. This
644 * method is overridable, but overridden versions must invoke
645 * <tt>super</tt> implementation to maintain guarantees.
646 * @param ex the exception to throw. If this exception is
647 * not a RuntimeException or Error, the actual exception thrown
648 * will be a RuntimeException with cause ex.
649 */
650 public void completeExceptionally(Throwable ex) {
651 setDoneExceptionally((ex instanceof RuntimeException) ||
652 (ex instanceof Error)? ex :
653 new RuntimeException(ex));
654 }
655
656 /**
657 * Completes this task, and if not already aborted or cancelled,
658 * returning a <tt>null</tt> result upon <tt>join</tt> and related
659 * operations. This method may be used to provide results for
660 * asynchronous tasks, or to provide alternative handling for
661 * tasks that would not otherwise complete normally.
662 *
663 * @param value the result value for this task.
664 */
665 public void complete(V value) {
666 try {
667 setRawResult(value);
668 } catch(Throwable rex) {
669 setDoneExceptionally(rex);
670 return;
671 }
672 setNormalCompletion();
673 }
674
675 /**
676 * Resets the internal bookkeeping state of this task, allowing a
677 * subsequent <tt>fork</tt>. This method allows repeated reuse of
678 * this task, but only if reuse occurs when this task has either
679 * never been forked, or has been forked, then completed and all
680 * outstanding joins of this task have also completed. Effects
681 * under any other usage conditions are not guaranteed, and are
682 * almost surely wrong. This method may be useful when executing
683 * pre-constructed trees of subtasks in loops.
684 */
685 public void reinitialize() {
686 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
687 exceptionMap.remove(this);
688 status = 0;
689 }
690
691 /**
692 * Tries to unschedule this task for execution. This method will
693 * typically succeed if this task is the next task that would be
694 * executed by the current thread, and will typically fail (return
695 * false) otherwise. This method may be useful when arranging
696 * faster local processing of tasks that could have been, but were
697 * not, stolen.
698 * @return true if unforked
699 */
700 public boolean tryUnfork() {
701 return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
702 }
703
704 /**
705 * Forks both tasks, returning when <tt>isDone</tt> holds for both
706 * of them or an exception is encountered. This method may be
707 * invoked only from within other ForkJoinTask
708 * computations. Attempts to invoke in other contexts result in
709 * exceptions or errors including ClassCastException.
710 * @param t1 one task
711 * @param t2 the other task
712 * @throws NullPointerException if t1 or t2 are null
713 * @throws RuntimeException or Error if either task did so.
714 */
715 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
716 t2.fork();
717 t1.invoke();
718 t2.join();
719 }
720
721 /**
722 * Forks the given tasks, returning when <tt>isDone</tt> holds for
723 * all of them. If any task encounters an exception, others may be
724 * cancelled. This method may be invoked only from within other
725 * ForkJoinTask computations. Attempts to invoke in other contexts
726 * result in exceptions or errors including ClassCastException.
727 * @param tasks the array of tasks
728 * @throws NullPointerException if tasks or any element are null.
729 * @throws RuntimeException or Error if any task did so.
730 */
731 public static void invokeAll(ForkJoinTask<?>... tasks) {
732 Throwable ex = null;
733 int last = tasks.length - 1;
734 for (int i = last; i >= 0; --i) {
735 ForkJoinTask<?> t = tasks[i];
736 if (t == null) {
737 if (ex == null)
738 ex = new NullPointerException();
739 }
740 else if (i != 0)
741 t.fork();
742 else {
743 t.quietlyInvoke();
744 if (ex == null)
745 ex = t.getException();
746 }
747 }
748 for (int i = 1; i <= last; ++i) {
749 ForkJoinTask<?> t = tasks[i];
750 if (t != null) {
751 if (ex != null)
752 t.cancel(false);
753 else {
754 t.quietlyJoin();
755 if (ex == null)
756 ex = t.getException();
757 }
758 }
759 }
760 if (ex != null)
761 rethrowException(ex);
762 }
763
764 /**
765 * Forks all tasks in the collection, returning when
766 * <tt>isDone</tt> holds for all of them. If any task encounters
767 * an exception, others may be cancelled. This method may be
768 * invoked only from within other ForkJoinTask
769 * computations. Attempts to invoke in other contexts result in
770 * exceptions or errors including ClassCastException.
771 * @param tasks the collection of tasks
772 * @throws NullPointerException if tasks or any element are null.
773 * @throws RuntimeException or Error if any task did so.
774 */
775 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
776 if (!(tasks instanceof List)) {
777 invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
778 return;
779 }
780 List<? extends ForkJoinTask<?>> ts =
781 (List<? extends ForkJoinTask<?>>)tasks;
782 Throwable ex = null;
783 int last = ts.size() - 1;
784 for (int i = last; i >= 0; --i) {
785 ForkJoinTask<?> t = ts.get(i);
786 if (t == null) {
787 if (ex == null)
788 ex = new NullPointerException();
789 }
790 else if (i != 0)
791 t.fork();
792 else {
793 t.quietlyInvoke();
794 if (ex == null)
795 ex = t.getException();
796 }
797 }
798 for (int i = 1; i <= last; ++i) {
799 ForkJoinTask<?> t = ts.get(i);
800 if (t != null) {
801 if (ex != null)
802 t.cancel(false);
803 else {
804 t.quietlyJoin();
805 if (ex == null)
806 ex = t.getException();
807 }
808 }
809 }
810 if (ex != null)
811 rethrowException(ex);
812 }
813
814 /**
815 * Possibly executes tasks until the pool hosting the current task
816 * {@link ForkJoinPool#isQuiescent}. This method may be of use in
817 * designs in which many tasks are forked, but none are explicitly
818 * joined, instead executing them until all are processed.
819 */
820 public static void helpQuiesce() {
821 ((ForkJoinWorkerThread)(Thread.currentThread())).
822 helpQuiescePool();
823 }
824
825 /**
826 * Returns a estimate of how many more locally queued tasks are
827 * held by the current worker thread than there are other worker
828 * threads that might want to steal them. This value may be
829 * useful for heuristic decisions about whether to fork other
830 * tasks. In many usages of ForkJoinTasks, at steady state, each
831 * worker should aim to maintain a small constant surplus (for
832 * example, 3) of tasks, and to process computations locally if
833 * this threshold is exceeded.
834 * @return the surplus number of tasks, which may be negative
835 */
836 public static int surplus() {
837 return ((ForkJoinWorkerThread)(Thread.currentThread()))
838 .getEstimatedSurplusTaskCount();
839 }
840
841 // Extension kit
842
843 /**
844 * Returns the result that would be returned by <tt>join</tt>, or
845 * null if this task is not known to have been completed. This
846 * method is designed to aid debugging, as well as to support
847 * extensions. Its use in any other context is discouraged.
848 *
849 * @return the result, or null if not completed.
850 */
851 public abstract V getRawResult();
852
853 /**
854 * Forces the given value to be returned as a result. This method
855 * is designed to support extensions, and should not in general be
856 * called otherwise.
857 *
858 * @param value the value
859 */
860 protected abstract void setRawResult(V value);
861
862 /**
863 * Immediately performs the base action of this task. This method
864 * is designed to support extensions, and should not in general be
865 * called otherwise. The return value controls whether this task
866 * is considered to be done normally. It may return false in
867 * asynchronous actions that require explicit invocations of
868 * <tt>complete</tt> to become joinable. It may throw exceptions
869 * to indicate abnormal exit.
870 * @return true if completed normally
871 * @throws Error or RuntimeException if encountered during computation
872 */
873 protected abstract boolean exec();
874
875 // Serialization support
876
877 private static final long serialVersionUID = -7721805057305804111L;
878
879 /**
880 * Save the state to a stream.
881 *
882 * @serialData the current run status and the exception thrown
883 * during execution, or null if none.
884 * @param s the stream
885 */
886 private void writeObject(java.io.ObjectOutputStream s)
887 throws java.io.IOException {
888 s.defaultWriteObject();
889 s.writeObject(getException());
890 }
891
892 /**
893 * Reconstitute the instance from a stream.
894 * @param s the stream
895 */
896 private void readObject(java.io.ObjectInputStream s)
897 throws java.io.IOException, ClassNotFoundException {
898 s.defaultReadObject();
899 // status &= ~INTERNAL_SIGNAL_MASK; // todo: define policy
900 Object ex = s.readObject();
901 if (ex != null)
902 setDoneExceptionally((Throwable)ex);
903 }
904
905 // Temporary Unsafe mechanics for preliminary release
906
907 static final Unsafe _unsafe;
908 static final long statusOffset;
909
910 static {
911 try {
912 if (ForkJoinTask.class.getClassLoader() != null) {
913 Field f = Unsafe.class.getDeclaredField("theUnsafe");
914 f.setAccessible(true);
915 _unsafe = (Unsafe)f.get(null);
916 }
917 else
918 _unsafe = Unsafe.getUnsafe();
919 statusOffset = _unsafe.objectFieldOffset
920 (ForkJoinTask.class.getDeclaredField("status"));
921 } catch (Exception ex) { throw new Error(ex); }
922 }
923
924 }